Discussion:
KafkaProducer from kafka.clients hangs when some partitions are not available
Xiaoyu Wang
2015-02-20 18:48:48 UTC
Permalink
Hello,

I am experimenting sending data to kafka using KafkaProducer and found that
when a partition is completely offline, e.g. a topic with replication
factor = 1 and some broker is down, KafkaProducer seems to be hanging
forever. Not even exit with the timeout setting. Can you take a look?

I checked code and found that the partitioner create partition based on the
total partition number - including those offline partitions. Is it possible
that we change ProducerClient to ignore offline partitions?


Thanks,

-Xiaoyu
Xiaoyu Wang
2015-02-20 19:35:14 UTC
Permalink
Update:

I am using kafka.clients 0.8.2-beta. Below are the test steps

1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using KafkaProducer with
required ack = 1
6. producer hangs and does not exit.

Offline partitions were take care of when the partitions is null (code
attached below). However, the timeout setting does not seem to work. Not
sure what caused KafkaProducer to hang.

// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and found
that when a partition is completely offline, e.g. a topic with replication
factor = 1 and some broker is down, KafkaProducer seems to be hanging
forever. Not even exit with the timeout setting. Can you take a look?
I checked code and found that the partitioner create partition based on
the total partition number - including those offline partitions. Is it
possible that we change ProducerClient to ignore offline partitions?
Thanks,
-Xiaoyu
Xiaoyu Wang
2015-02-20 20:45:38 UTC
Permalink
Found the problem - it is a bug with Partitions of kafka client. Can you
guys confirm and patch in kafka clients?

for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using KafkaProducer with
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is null (code
attached below). However, the timeout setting does not seem to work. Not
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and found
that when a partition is completely offline, e.g. a topic with replication
factor = 1 and some broker is down, KafkaProducer seems to be hanging
forever. Not even exit with the timeout setting. Can you take a look?
I checked code and found that the partitioner create partition based on
the total partition number - including those offline partitions. Is it
possible that we change ProducerClient to ignore offline partitions?
Thanks,
-Xiaoyu
Jun Rao
2015-02-24 00:01:13 UTC
Permalink
The logic in that code is to cycle through all partitions and return as
soon as we see a partition with the leader. I do see an issue that if there
are multiple threads sending messages to the same producer concurrently, we
may not cycle through all partitions and therefore we could return an
unavailable partition even when available partitions are present.

Do you see this issue with just a single thread producing messages? The
current logic seems to work correctly in that case.

Thanks,

Jun
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka client. Can you
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using KafkaProducer with
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is null (code
attached below). However, the timeout setting does not seem to work. Not
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and found
that when a partition is completely offline, e.g. a topic with
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to be hanging
forever. Not even exit with the timeout setting. Can you take a look?
I checked code and found that the partitioner create partition based on
the total partition number - including those offline partitions. Is it
possible that we change ProducerClient to ignore offline partitions?
Thanks,
-Xiaoyu
Xiaoyu Wang
2015-02-24 15:56:46 UTC
Permalink
Jun,

I am trying to test how KafkaProducer behaves with topic replication factor
= 1

1. One broker is offline BEFORE KafkaProducer starts sending messages.
Because of the bug I mentioned, KafkaProducer sends to the offline
partition and hangs forever.
2. One broker goes offline WHILE KafkaProducer is sending messages.
KafkaProducer seems to be hanging forever in this case. I am still looking.
Do you mind take a look?

Thanks
Post by Jun Rao
The logic in that code is to cycle through all partitions and return as
soon as we see a partition with the leader. I do see an issue that if there
are multiple threads sending messages to the same producer concurrently, we
may not cycle through all partitions and therefore we could return an
unavailable partition even when available partitions are present.
Do you see this issue with just a single thread producing messages? The
current logic seems to work correctly in that case.
Thanks,
Jun
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka client. Can you
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using KafkaProducer with
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is null (code
attached below). However, the timeout setting does not seem to work.
Not
Post by Xiaoyu Wang
Post by Xiaoyu Wang
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and found
that when a partition is completely offline, e.g. a topic with
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to be hanging
forever. Not even exit with the timeout setting. Can you take a look?
I checked code and found that the partitioner create partition based
on
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
the total partition number - including those offline partitions. Is it
possible that we change ProducerClient to ignore offline partitions?
Thanks,
-Xiaoyu
Jun Rao
2015-02-24 16:30:50 UTC
Permalink
Hi, Xiaoyu,

1. Could you explain a bit more what the bug is? The code does try to avoid
picking an unavailable partition. There does seem to be an issue when there
are more than one thread producing data to the same producer instance. This
is being tracked in KAFKA-1984. How many producing threads do you have in
your test?

Thanks,

Jun
Post by Xiaoyu Wang
Jun,
I am trying to test how KafkaProducer behaves with topic replication factor
= 1
1. One broker is offline BEFORE KafkaProducer starts sending messages.
Because of the bug I mentioned, KafkaProducer sends to the offline
partition and hangs forever.
2. One broker goes offline WHILE KafkaProducer is sending messages.
KafkaProducer seems to be hanging forever in this case. I am still looking.
Do you mind take a look?
Thanks
Post by Jun Rao
The logic in that code is to cycle through all partitions and return as
soon as we see a partition with the leader. I do see an issue that if
there
Post by Jun Rao
are multiple threads sending messages to the same producer concurrently,
we
Post by Jun Rao
may not cycle through all partitions and therefore we could return an
unavailable partition even when available partitions are present.
Do you see this issue with just a single thread producing messages? The
current logic seems to work correctly in that case.
Thanks,
Jun
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka client. Can
you
Post by Jun Rao
Post by Xiaoyu Wang
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Jun Rao
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using KafkaProducer
with
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is null
(code
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
attached below). However, the timeout setting does not seem to work.
Not
Post by Xiaoyu Wang
Post by Xiaoyu Wang
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and
found
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
that when a partition is completely offline, e.g. a topic with
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to be
hanging
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
forever. Not even exit with the timeout setting. Can you take a
look?
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I checked code and found that the partitioner create partition based
on
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
the total partition number - including those offline partitions. Is
it
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
possible that we change ProducerClient to ignore offline partitions?
Thanks,
-Xiaoyu
Xiaoyu Wang
2015-02-24 16:37:41 UTC
Permalink
Hi Jun,

If I understand it correctly. the highlighted line is for avoiding
offline partitions, is it?

for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null) {
return partition; --> should be changed to return the actual
partition number?
}
}
Post by Jun Rao
Hi, Xiaoyu,
1. Could you explain a bit more what the bug is? The code does try to avoid
picking an unavailable partition. There does seem to be an issue when there
are more than one thread producing data to the same producer instance. This
is being tracked in KAFKA-1984. How many producing threads do you have in
your test?
Thanks,
Jun
Post by Xiaoyu Wang
Jun,
I am trying to test how KafkaProducer behaves with topic replication
factor
Post by Xiaoyu Wang
= 1
1. One broker is offline BEFORE KafkaProducer starts sending messages.
Because of the bug I mentioned, KafkaProducer sends to the offline
partition and hangs forever.
2. One broker goes offline WHILE KafkaProducer is sending messages.
KafkaProducer seems to be hanging forever in this case. I am still looking.
Do you mind take a look?
Thanks
Post by Jun Rao
The logic in that code is to cycle through all partitions and return as
soon as we see a partition with the leader. I do see an issue that if
there
Post by Jun Rao
are multiple threads sending messages to the same producer
concurrently,
Post by Xiaoyu Wang
we
Post by Jun Rao
may not cycle through all partitions and therefore we could return an
unavailable partition even when available partitions are present.
Do you see this issue with just a single thread producing messages? The
current logic seems to work correctly in that case.
Thanks,
Jun
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka client. Can
you
Post by Jun Rao
Post by Xiaoyu Wang
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Jun Rao
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using KafkaProducer
with
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is null
(code
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
attached below). However, the timeout setting does not seem to
work.
Post by Xiaoyu Wang
Post by Jun Rao
Not
Post by Xiaoyu Wang
Post by Xiaoyu Wang
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and
found
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
that when a partition is completely offline, e.g. a topic with
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to be
hanging
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
forever. Not even exit with the timeout setting. Can you take a
look?
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I checked code and found that the partitioner create partition
based
Post by Xiaoyu Wang
Post by Jun Rao
on
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
the total partition number - including those offline partitions.
Is
Post by Xiaoyu Wang
it
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
possible that we change ProducerClient to ignore offline
partitions?
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Thanks,
-Xiaoyu
Jun Rao
2015-02-24 17:03:43 UTC
Permalink
Ah, yes. You are right. That's a more obvious bug. Will fix that in
KAFKA-1984.

Thanks,

Jun
Post by Xiaoyu Wang
Hi Jun,
If I understand it correctly. the highlighted line is for avoiding
offline partitions, is it?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null) {
return partition; --> should be changed to return the actual
partition number?
}
}
Post by Jun Rao
Hi, Xiaoyu,
1. Could you explain a bit more what the bug is? The code does try to
avoid
Post by Jun Rao
picking an unavailable partition. There does seem to be an issue when
there
Post by Jun Rao
are more than one thread producing data to the same producer instance.
This
Post by Jun Rao
is being tracked in KAFKA-1984. How many producing threads do you have in
your test?
Thanks,
Jun
Post by Xiaoyu Wang
Jun,
I am trying to test how KafkaProducer behaves with topic replication
factor
Post by Xiaoyu Wang
= 1
1. One broker is offline BEFORE KafkaProducer starts sending
messages.
Post by Jun Rao
Post by Xiaoyu Wang
Because of the bug I mentioned, KafkaProducer sends to the offline
partition and hangs forever.
2. One broker goes offline WHILE KafkaProducer is sending messages.
KafkaProducer seems to be hanging forever in this case. I am still looking.
Do you mind take a look?
Thanks
Post by Jun Rao
The logic in that code is to cycle through all partitions and return
as
Post by Jun Rao
Post by Xiaoyu Wang
Post by Jun Rao
soon as we see a partition with the leader. I do see an issue that if
there
Post by Jun Rao
are multiple threads sending messages to the same producer
concurrently,
Post by Xiaoyu Wang
we
Post by Jun Rao
may not cycle through all partitions and therefore we could return an
unavailable partition even when available partitions are present.
Do you see this issue with just a single thread producing messages?
The
Post by Jun Rao
Post by Xiaoyu Wang
Post by Jun Rao
current logic seems to work correctly in that case.
Thanks,
Jun
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka client.
Can
Post by Jun Rao
Post by Xiaoyu Wang
you
Post by Jun Rao
Post by Xiaoyu Wang
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Jun Rao
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using
KafkaProducer
Post by Jun Rao
Post by Xiaoyu Wang
with
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is null
(code
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
attached below). However, the timeout setting does not seem to
work.
Post by Xiaoyu Wang
Post by Jun Rao
Not
Post by Xiaoyu Wang
Post by Xiaoyu Wang
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer and
found
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
that when a partition is completely offline, e.g. a topic with
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to be
hanging
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
forever. Not even exit with the timeout setting. Can you take a
look?
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I checked code and found that the partitioner create partition
based
Post by Xiaoyu Wang
Post by Jun Rao
on
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
the total partition number - including those offline partitions.
Is
Post by Xiaoyu Wang
it
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
possible that we change ProducerClient to ignore offline
partitions?
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Thanks,
-Xiaoyu
Xiaoyu Wang
2015-02-24 18:14:04 UTC
Permalink
Jun,

Can you also take a look at the second problem I am having?
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I am trying to test how KafkaProducer behaves with topic replication
factor
Post by Xiaoyu Wang
= 1
1. One broker is offline BEFORE KafkaProducer starts sending
messages.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Because of the bug I mentioned, KafkaProducer sends to the offline
partition and hangs forever.
*> > > 2. One broker goes offline WHILE KafkaProducer is sending
messages. > > > KafkaProducer seems to be hanging forever in this case.
I am still > > > looking.*
Post by Xiaoyu Wang
Ah, yes. You are right. That's a more obvious bug. Will fix that in
KAFKA-1984.
Thanks,
Jun
Post by Xiaoyu Wang
Hi Jun,
If I understand it correctly. the highlighted line is for avoiding
offline partitions, is it?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(partition).leader() != null) {
return partition; --> should be changed to return the actual
partition number?
}
}
Post by Xiaoyu Wang
Hi, Xiaoyu,
1. Could you explain a bit more what the bug is? The code does try to
avoid
Post by Xiaoyu Wang
picking an unavailable partition. There does seem to be an issue when
there
Post by Xiaoyu Wang
are more than one thread producing data to the same producer instance.
This
Post by Xiaoyu Wang
is being tracked in KAFKA-1984. How many producing threads do you have
in
Post by Xiaoyu Wang
Post by Xiaoyu Wang
your test?
Thanks,
Jun
Post by Xiaoyu Wang
Jun,
I am trying to test how KafkaProducer behaves with topic replication
factor
Post by Xiaoyu Wang
= 1
1. One broker is offline BEFORE KafkaProducer starts sending
messages.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Because of the bug I mentioned, KafkaProducer sends to the
offline
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
partition and hangs forever.
2. One broker goes offline WHILE KafkaProducer is sending
messages.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
KafkaProducer seems to be hanging forever in this case. I am still looking.
Do you mind take a look?
Thanks
Post by Jun Rao
The logic in that code is to cycle through all partitions and
return
Post by Xiaoyu Wang
as
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
soon as we see a partition with the leader. I do see an issue that
if
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
there
Post by Jun Rao
are multiple threads sending messages to the same producer
concurrently,
Post by Xiaoyu Wang
we
Post by Jun Rao
may not cycle through all partitions and therefore we could return
an
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
unavailable partition even when available partitions are present.
Do you see this issue with just a single thread producing messages?
The
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
current logic seems to work correctly in that case.
Thanks,
Jun
On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka client.
Can
Post by Xiaoyu Wang
Post by Xiaoyu Wang
you
Post by Jun Rao
Post by Xiaoyu Wang
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Jun Rao
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test steps
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using
KafkaProducer
Post by Xiaoyu Wang
Post by Xiaoyu Wang
with
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is
null
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
(code
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
attached below). However, the timeout setting does not seem to
work.
Post by Xiaoyu Wang
Post by Jun Rao
Not
Post by Xiaoyu Wang
Post by Xiaoyu Wang
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer
and
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
found
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
that when a partition is completely offline, e.g. a topic with
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to be
hanging
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
forever. Not even exit with the timeout setting. Can you take
a
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
look?
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I checked code and found that the partitioner create partition
based
Post by Xiaoyu Wang
Post by Jun Rao
on
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
the total partition number - including those offline
partitions.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Is
Post by Xiaoyu Wang
it
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
possible that we change ProducerClient to ignore offline
partitions?
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Thanks,
-Xiaoyu
Jun Rao
2015-02-24 20:52:03 UTC
Permalink
Xiaoyu,

For 1, I have a patch for 0.8.2 in
https://issues.apache.org/jira/browse/KAFKA-1984. Could you test it out and
see if it fixes your issue?

For 2, I did some local testing. The only issue I saw is that producer can
block on close since there are still unsent messages in the bufferpool.
This is a known issue and is being tracked in
https://issues.apache.org/jira/browse/KAFKA-1788. Could you confirm whether
your producer blocks during send or during close (you can figure it out by
taking a thread dump)?

Thanks,

Jun
Post by Xiaoyu Wang
Jun,
Can you also take a look at the second problem I am having?
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I am trying to test how KafkaProducer behaves with topic replication
factor
Post by Xiaoyu Wang
= 1
1. One broker is offline BEFORE KafkaProducer starts sending
messages.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Because of the bug I mentioned, KafkaProducer sends to the
offline
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
partition and hangs forever.
*> > > 2. One broker goes offline WHILE KafkaProducer is sending
messages. > > > KafkaProducer seems to be hanging forever in this case.
I am still > > > looking.*
Post by Xiaoyu Wang
Ah, yes. You are right. That's a more obvious bug. Will fix that in
KAFKA-1984.
Thanks,
Jun
Post by Xiaoyu Wang
Hi Jun,
If I understand it correctly. the highlighted line is for avoiding
offline partitions, is it?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null) {
return partition; --> should be changed to return the actual
partition number?
}
}
Post by Xiaoyu Wang
Hi, Xiaoyu,
1. Could you explain a bit more what the bug is? The code does try to
avoid
Post by Xiaoyu Wang
picking an unavailable partition. There does seem to be an issue when
there
Post by Xiaoyu Wang
are more than one thread producing data to the same producer
instance.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
This
Post by Xiaoyu Wang
is being tracked in KAFKA-1984. How many producing threads do you
have
Post by Xiaoyu Wang
in
Post by Xiaoyu Wang
Post by Xiaoyu Wang
your test?
Thanks,
Jun
Post by Xiaoyu Wang
Jun,
I am trying to test how KafkaProducer behaves with topic
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor
Post by Xiaoyu Wang
= 1
1. One broker is offline BEFORE KafkaProducer starts sending
messages.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Because of the bug I mentioned, KafkaProducer sends to the
offline
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
partition and hangs forever.
2. One broker goes offline WHILE KafkaProducer is sending
messages.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
KafkaProducer seems to be hanging forever in this case. I am
still
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
looking.
Do you mind take a look?
Thanks
Post by Jun Rao
The logic in that code is to cycle through all partitions and
return
Post by Xiaoyu Wang
as
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
soon as we see a partition with the leader. I do see an issue
that
Post by Xiaoyu Wang
if
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
there
Post by Jun Rao
are multiple threads sending messages to the same producer
concurrently,
Post by Xiaoyu Wang
we
Post by Jun Rao
may not cycle through all partitions and therefore we could
return
Post by Xiaoyu Wang
an
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
unavailable partition even when available partitions are present.
Do you see this issue with just a single thread producing
messages?
Post by Xiaoyu Wang
Post by Xiaoyu Wang
The
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
current logic seems to work correctly in that case.
Thanks,
Jun
On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
Found the problem - it is a bug with Partitions of kafka
client.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Can
Post by Xiaoyu Wang
Post by Xiaoyu Wang
you
Post by Jun Rao
Post by Xiaoyu Wang
guys confirm and patch in kafka clients?
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Jun Rao
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null) {
return partitions.get(partition).partition();
}
}
On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
I am using kafka.clients 0.8.2-beta. Below are the test
steps
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
1. setup local kafka clusters with 2 brokers, 0 and 1
2. create topic X with replication fact 1 and 4 partitions
3. verify that each broker has two partitions
4. shutdown broker 1
5. start a producer sending data to topic X using
KafkaProducer
Post by Xiaoyu Wang
Post by Xiaoyu Wang
with
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
required ack = 1
6. producer hangs and does not exit.
Offline partitions were take care of when the partitions is
null
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
(code
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
attached below). However, the timeout setting does not seem
to
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
work.
Post by Xiaoyu Wang
Post by Jun Rao
Not
Post by Xiaoyu Wang
Post by Xiaoyu Wang
sure what caused KafkaProducer to hang.
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int partition = Utils.abs(counter.getAndIncrement()) %
numPartitions;
Post by Xiaoyu Wang
Post by Xiaoyu Wang
if (partitions.get(partition).leader() != null)
return partition;
}
// no partitions are available, give a non-available
partition
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
return Utils.abs(counter.getAndIncrement()) % numPartitions;
On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
Post by Xiaoyu Wang
Hello,
I am experimenting sending data to kafka using KafkaProducer
and
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
found
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
that when a partition is completely offline, e.g. a topic
with
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
replication
Post by Xiaoyu Wang
Post by Xiaoyu Wang
factor = 1 and some broker is down, KafkaProducer seems to
be
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
hanging
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
forever. Not even exit with the timeout setting. Can you
take
Post by Xiaoyu Wang
a
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
look?
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
I checked code and found that the partitioner create
partition
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
based
Post by Xiaoyu Wang
Post by Jun Rao
on
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
the total partition number - including those offline
partitions.
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Is
Post by Xiaoyu Wang
it
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
possible that we change ProducerClient to ignore offline
partitions?
Post by Xiaoyu Wang
Post by Jun Rao
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Post by Xiaoyu Wang
Thanks,
-Xiaoyu
Loading...