Discussion:
trouble upgrading from 0.8.2.1 to 0.9.0.0: invalid message
(too old to reply)
Dave Peterson
2016-01-15 02:04:14 UTC
Permalink
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by following
the instructions here:

http://kafka.apache.org/documentation.html#upgrade

After upgrading one broker, with inter.broker.protocol.version=0.8.2.X
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests. When I revert the broker to 0.8.2.1 the errors go away. Has
anyone run into this problem, or have any suggestions?

Thanks,
Dave
Ismael Juma
2016-01-15 09:06:27 UTC
Permalink
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by following
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with inter.broker.protocol.version=0.8.2.X
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests.
I haven't seen other reports of this issue yet. Also, we have a system test
that covers this scenario:

https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py

Just to double-check, what is the version of the producer that you are
using to send produce requests to the 0.9.0.0 broker when you get the error?

Ismael
Dave Peterson
2016-01-15 19:52:36 UTC
Permalink
Hi Ismael,

I'm using bruce (https://github.com/ifwe/bruce) to send the produce
requests, with a RequiredAcks value of 1. Everything works fine when
all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0
cluster from scratch rather than trying to upgrade, everything works
fine. The problem only occurs after upgrading one broker in the
3-broker cluster.

The topic I am sending to has 8 partitions numbered 0-7. Doing
further experimentation I see that the ACK error 2 occurs only when
I send to partition 7. No problems occur when sending to partitions
0-6. If it helps I can send output from "kafka-topics.sh --describe"
as well as tcpdump output showing the produce requests and responses.

For comparison I tried using the 0.9.0.0 version of
kafka-console-producer.sh to send messages. With the default
RequiredAcks value of 0, it worked although I don't know which
partition it sent to. With a RequiredAcks value of 1 I get the
output shown below.

Thanks,
Dave



[2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
thread: (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'throttle_time_ms': java.nio.BufferUnderflowException
at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by following
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with inter.broker.protocol.version=0.8.2.X
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests.
I haven't seen other reports of this issue yet. Also, we have a system test
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
Just to double-check, what is the version of the producer that you are
using to send produce requests to the 0.9.0.0 broker when you get the error?
Ismael
Ismael Juma
2016-01-19 10:50:51 UTC
Permalink
Hi Dave,

Do you get any errors logged in the broker when you get ACK error 2
(InvalidMessage) while producing requests to a mixed version cluster? It
would be helpful to see them.

With regards to the kafka-console-producer.sh error, did you use the
0.9.0.0 console producer with a mixed version cluster (ie some brokers were
on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
won't work correctly. All the brokers should be upgraded before the clients
are upgraded (otherwise the 0.8.2.1 broker will send a response that the
newer clients cannot handle).

Ismael
Post by Dave Peterson
Hi Ismael,
I'm using bruce (https://github.com/ifwe/bruce) to send the produce
requests, with a RequiredAcks value of 1. Everything works fine when
all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0
cluster from scratch rather than trying to upgrade, everything works
fine. The problem only occurs after upgrading one broker in the
3-broker cluster.
The topic I am sending to has 8 partitions numbered 0-7. Doing
further experimentation I see that the ACK error 2 occurs only when
I send to partition 7. No problems occur when sending to partitions
0-6. If it helps I can send output from "kafka-topics.sh --describe"
as well as tcpdump output showing the produce requests and responses.
For comparison I tried using the 0.9.0.0 version of
kafka-console-producer.sh to send messages. With the default
RequiredAcks value of 0, it worked although I don't know which
partition it sent to. With a RequiredAcks value of 1 I get the
output shown below.
Thanks,
Dave
[2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
thread: (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'throttle_time_ms': java.nio.BufferUnderflowException
at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by
following
Post by Ismael Juma
Post by Dave Peterson
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with inter.broker.protocol.version=0.8.2.X
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests.
I haven't seen other reports of this issue yet. Also, we have a system
test
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
Post by Ismael Juma
Just to double-check, what is the version of the producer that you are
using to send produce requests to the 0.9.0.0 broker when you get the error?
Ismael
Dave Peterson
2016-01-19 16:08:57 UTC
Permalink
I'll try again and look for logged errors. The problem I saw with
kafka-console-producer.sh did occur with a mixed version cluster,
so that is likely the problem. I'll try again with the 0.8.2.1 version.

Thanks,
Dave
Post by Ismael Juma
Hi Dave,
Do you get any errors logged in the broker when you get ACK error 2
(InvalidMessage) while producing requests to a mixed version cluster? It
would be helpful to see them.
With regards to the kafka-console-producer.sh error, did you use the
0.9.0.0 console producer with a mixed version cluster (ie some brokers were
on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
won't work correctly. All the brokers should be upgraded before the clients
are upgraded (otherwise the 0.8.2.1 broker will send a response that the
newer clients cannot handle).
Ismael
Post by Dave Peterson
Hi Ismael,
I'm using bruce (https://github.com/ifwe/bruce) to send the produce
requests, with a RequiredAcks value of 1. Everything works fine when
all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0
cluster from scratch rather than trying to upgrade, everything works
fine. The problem only occurs after upgrading one broker in the
3-broker cluster.
The topic I am sending to has 8 partitions numbered 0-7. Doing
further experimentation I see that the ACK error 2 occurs only when
I send to partition 7. No problems occur when sending to partitions
0-6. If it helps I can send output from "kafka-topics.sh --describe"
as well as tcpdump output showing the produce requests and responses.
For comparison I tried using the 0.9.0.0 version of
kafka-console-producer.sh to send messages. With the default
RequiredAcks value of 0, it worked although I don't know which
partition it sent to. With a RequiredAcks value of 1 I get the
output shown below.
Thanks,
Dave
[2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
thread: (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'throttle_time_ms': java.nio.BufferUnderflowException
at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by
following
Post by Ismael Juma
Post by Dave Peterson
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with inter.broker.protocol.version=0.8.2.X
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests.
I haven't seen other reports of this issue yet. Also, we have a system
test
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
Post by Ismael Juma
Just to double-check, what is the version of the producer that you are
using to send produce requests to the 0.9.0.0 broker when you get the error?
Ismael
Dave Peterson
2016-01-21 03:57:02 UTC
Permalink
Hi Ismael,

I looked again at the problem where I get ACK error 2 (InvalidMessage).
When the error occurs, I see the error message with stack trace shown below.
For the 8-partition topic "shown_news_stories" which I am sending messages
to, only partition 7 has its lead replica on the broker running Kafka
0.9.0.0.
For each of partitions 0-6, the lead replica is on one of the other two
brokers
(which run Kafka 0.8.2.1).

Interestingly, none of the messages currently going to the topic use message
compaction (i.e. they all have empty keys), although at some time in the
past
I may have sent a few messages with keys. Message compaction is being
used for other topics. So, the 0.9.0.0 version of the broker seems to
think the
topic is compacted while the 0.8.2.1 broker apparently doesn't think so.
Does
this shed any light on things?

Also I notice the error message says "Compacted topic", which suggests that
compaction is a property of the topic, and not individual messages as
determined by key or lack thereof. I thought it was ok to send messages
both
with and without a key to the same topic, thus having compaction enabled for
only a subset of the messages. Is this incorrect?

Thanks,
Dave


[2016-01-20 19:21:44,923] ERROR [Replica Manager on Broker 172341926]:
Error processing append operation on partition [shown_news_stories,7]
(kafka.server.ReplicaManager)
kafka.message.InvalidMessageException: Compacted topic cannot accept
message without key.
at
kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250)
at kafka.log.Log.liftedTree1$1(Log.scala:327)
at kafka.log.Log.append(Log.scala:326)
at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Do you get any errors logged in the broker when you get ACK error 2
(InvalidMessage) while producing requests to a mixed version cluster? It
would be helpful to see them.
With regards to the kafka-console-producer.sh error, did you use the
0.9.0.0 console producer with a mixed version cluster (ie some brokers were
on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
won't work correctly. All the brokers should be upgraded before the clients
are upgraded (otherwise the 0.8.2.1 broker will send a response that the
newer clients cannot handle).
Ismael
Post by Dave Peterson
Hi Ismael,
I'm using bruce (https://github.com/ifwe/bruce) to send the produce
requests, with a RequiredAcks value of 1. Everything works fine when
all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0
cluster from scratch rather than trying to upgrade, everything works
fine. The problem only occurs after upgrading one broker in the
3-broker cluster.
The topic I am sending to has 8 partitions numbered 0-7. Doing
further experimentation I see that the ACK error 2 occurs only when
I send to partition 7. No problems occur when sending to partitions
0-6. If it helps I can send output from "kafka-topics.sh --describe"
as well as tcpdump output showing the produce requests and responses.
For comparison I tried using the 0.9.0.0 version of
kafka-console-producer.sh to send messages. With the default
RequiredAcks value of 0, it worked although I don't know which
partition it sent to. With a RequiredAcks value of 1 I get the
output shown below.
Thanks,
Dave
[2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
thread: (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.protocol.types.SchemaException: Error reading
field
Post by Dave Peterson
'throttle_time_ms': java.nio.BufferUnderflowException
at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
Post by Dave Peterson
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by
following
Post by Ismael Juma
Post by Dave Peterson
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with
inter.broker.protocol.version=0.8.2.X
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests.
I haven't seen other reports of this issue yet. Also, we have a system
test
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
Post by Dave Peterson
Post by Ismael Juma
Just to double-check, what is the version of the producer that you are
using to send produce requests to the 0.9.0.0 broker when you get the error?
Ismael
Joel Koshy
2016-01-21 07:43:46 UTC
Permalink
Hi Dave,

This change was introduced in
https://issues.apache.org/jira/browse/KAFKA-1755 for compacted topics.
Post by Dave Peterson
Interestingly, none of the messages currently going to the topic use message
compaction (i.e. they all have empty keys), although at some time in the
past
I may have sent a few messages with keys. Message compaction is being
used for other topics. So, the 0.9.0.0 version of the broker seems to
think the
topic is compacted while the 0.8.2.1 broker apparently doesn't think so.
Does
this shed any light on things?
Also I notice the error message says "Compacted topic", which suggests that
compaction is a property of the topic, and not individual messages as
Yes - compaction is a topic-level property. You can use --describe to
verify that the topic is compacted or not and if you didn't intend it to be
compacted you can alter the configuration.

I thought it was ok to send messages
Post by Dave Peterson
both
with and without a key to the same topic, thus having compaction enabled for
only a subset of the messages. Is this incorrect?
In 0.9 you cannot send unkeyed messages to compacted topics. In 0.8.x this
would actually cause the log compaction thread to subsequently complain and
quit (and stop compacting all compacted topics). We did consider the
possibility of allowing producers to send both keyed and unkeyed but after
discussion we felt it would be better to fail fast and prevent unkeyed
messages from getting in. This was on the premise that supporting mixed
messages and only compacting a subset that have keys may not work very well
since the non-keyed messages would stick around indefinitely; however let
me know if you think differently on this and we can revisit.

Joel
Post by Dave Peterson
Thanks,
Dave
Error processing append operation on partition [shown_news_stories,7]
(kafka.server.ReplicaManager)
kafka.message.InvalidMessageException: Compacted topic cannot accept
message without key.
at
kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250)
at kafka.log.Log.liftedTree1$1(Log.scala:327)
at kafka.log.Log.append(Log.scala:326)
at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
at
kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Do you get any errors logged in the broker when you get ACK error 2
(InvalidMessage) while producing requests to a mixed version cluster? It
would be helpful to see them.
With regards to the kafka-console-producer.sh error, did you use the
0.9.0.0 console producer with a mixed version cluster (ie some brokers
were
Post by Ismael Juma
on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
won't work correctly. All the brokers should be upgraded before the
clients
Post by Ismael Juma
are upgraded (otherwise the 0.8.2.1 broker will send a response that the
newer clients cannot handle).
Ismael
Post by Dave Peterson
Hi Ismael,
I'm using bruce (https://github.com/ifwe/bruce) to send the produce
requests, with a RequiredAcks value of 1. Everything works fine when
all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0
cluster from scratch rather than trying to upgrade, everything works
fine. The problem only occurs after upgrading one broker in the
3-broker cluster.
The topic I am sending to has 8 partitions numbered 0-7. Doing
further experimentation I see that the ACK error 2 occurs only when
I send to partition 7. No problems occur when sending to partitions
0-6. If it helps I can send output from "kafka-topics.sh --describe"
as well as tcpdump output showing the produce requests and responses.
For comparison I tried using the 0.9.0.0 version of
kafka-console-producer.sh to send messages. With the default
RequiredAcks value of 0, it worked although I don't know which
partition it sent to. With a RequiredAcks value of 1 I get the
output shown below.
Thanks,
Dave
[2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
thread: (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.protocol.types.SchemaException: Error reading
field
Post by Dave Peterson
'throttle_time_ms': java.nio.BufferUnderflowException
at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
Post by Ismael Juma
Post by Dave Peterson
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by
following
Post by Ismael Juma
Post by Dave Peterson
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with
inter.broker.protocol.version=0.8.2.X
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
set, I get ACK error 2 (InvalidMessage) when I try to send produce
requests.
I haven't seen other reports of this issue yet. Also, we have a
system
Post by Ismael Juma
Post by Dave Peterson
test
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
Post by Ismael Juma
Post by Dave Peterson
Post by Ismael Juma
Just to double-check, what is the version of the producer that you
are
Post by Ismael Juma
Post by Dave Peterson
Post by Ismael Juma
using to send produce requests to the 0.9.0.0 broker when you get the error?
Ismael
Ismael Juma
2016-01-21 14:05:01 UTC
Permalink
Thanks Dave and Joel. I created a PR to add this note to the Upgrade Notes:

https://github.com/apache/kafka/pull/798

Please take a look.

Ismael
Post by Ismael Juma
Hi Dave,
This change was introduced in
https://issues.apache.org/jira/browse/KAFKA-1755 for compacted topics.
Post by Dave Peterson
Interestingly, none of the messages currently going to the topic use message
compaction (i.e. they all have empty keys), although at some time in the
past
I may have sent a few messages with keys. Message compaction is being
used for other topics. So, the 0.9.0.0 version of the broker seems to
think the
topic is compacted while the 0.8.2.1 broker apparently doesn't think so.
Does
this shed any light on things?
Also I notice the error message says "Compacted topic", which suggests
that
Post by Dave Peterson
compaction is a property of the topic, and not individual messages as
Yes - compaction is a topic-level property. You can use --describe to
verify that the topic is compacted or not and if you didn't intend it to be
compacted you can alter the configuration.
I thought it was ok to send messages
Post by Dave Peterson
both
with and without a key to the same topic, thus having compaction enabled for
only a subset of the messages. Is this incorrect?
In 0.9 you cannot send unkeyed messages to compacted topics. In 0.8.x this
would actually cause the log compaction thread to subsequently complain and
quit (and stop compacting all compacted topics). We did consider the
possibility of allowing producers to send both keyed and unkeyed but after
discussion we felt it would be better to fail fast and prevent unkeyed
messages from getting in. This was on the premise that supporting mixed
messages and only compacting a subset that have keys may not work very well
since the non-keyed messages would stick around indefinitely; however let
me know if you think differently on this and we can revisit.
Joel
Post by Dave Peterson
Thanks,
Dave
Error processing append operation on partition [shown_news_stories,7]
(kafka.server.ReplicaManager)
kafka.message.InvalidMessageException: Compacted topic cannot accept
message without key.
at
kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250)
Post by Dave Peterson
at kafka.log.Log.liftedTree1$1(Log.scala:327)
at kafka.log.Log.append(Log.scala:326)
at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
Post by Dave Peterson
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
Post by Dave Peterson
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
Post by Dave Peterson
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
Post by Dave Peterson
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
Post by Dave Peterson
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
Post by Dave Peterson
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
Post by Dave Peterson
at
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
Post by Dave Peterson
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
scala.collection.AbstractTraversable.map(Traversable.scala:105)
Post by Dave Peterson
at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
at
kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Do you get any errors logged in the broker when you get ACK error 2
(InvalidMessage) while producing requests to a mixed version cluster?
It
Post by Dave Peterson
Post by Ismael Juma
would be helpful to see them.
With regards to the kafka-console-producer.sh error, did you use the
0.9.0.0 console producer with a mixed version cluster (ie some brokers
were
Post by Ismael Juma
on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
won't work correctly. All the brokers should be upgraded before the
clients
Post by Ismael Juma
are upgraded (otherwise the 0.8.2.1 broker will send a response that
the
Post by Dave Peterson
Post by Ismael Juma
newer clients cannot handle).
Ismael
Post by Dave Peterson
Hi Ismael,
I'm using bruce (https://github.com/ifwe/bruce) to send the produce
requests, with a RequiredAcks value of 1. Everything works fine when
all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0
cluster from scratch rather than trying to upgrade, everything works
fine. The problem only occurs after upgrading one broker in the
3-broker cluster.
The topic I am sending to has 8 partitions numbered 0-7. Doing
further experimentation I see that the ACK error 2 occurs only when
I send to partition 7. No problems occur when sending to partitions
0-6. If it helps I can send output from "kafka-topics.sh --describe"
as well as tcpdump output showing the produce requests and responses.
For comparison I tried using the 0.9.0.0 version of
kafka-console-producer.sh to send messages. With the default
RequiredAcks value of 0, it worked although I don't know which
partition it sent to. With a RequiredAcks value of 1 I get the
output shown below.
Thanks,
Dave
[2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
thread: (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.protocol.types.SchemaException: Error reading
field
Post by Dave Peterson
'throttle_time_ms': java.nio.BufferUnderflowException
at
org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
at java.lang.Thread.run(Thread.java:745)
Post by Ismael Juma
Hi Dave,
Post by Dave Peterson
I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by
following
Post by Ismael Juma
Post by Dave Peterson
http://kafka.apache.org/documentation.html#upgrade
After upgrading one broker, with
inter.broker.protocol.version=0.8.2.X
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
set, I get ACK error 2 (InvalidMessage) when I try to send
produce
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
requests.
I haven't seen other reports of this issue yet. Also, we have a
system
Post by Ismael Juma
Post by Dave Peterson
test
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
Post by Ismael Juma
Just to double-check, what is the version of the producer that you
are
Post by Ismael Juma
Post by Dave Peterson
Post by Ismael Juma
using to send produce requests to the 0.9.0.0 broker when you get
the
Post by Dave Peterson
Post by Ismael Juma
Post by Dave Peterson
Post by Ismael Juma
error?
Ismael
Dave Peterson
2016-01-25 04:41:46 UTC
Permalink
Post by Joel Koshy
Yes - compaction is a topic-level property. You can use --describe to
verify that the topic is compacted or not and if you didn't intend it to be
compacted you can alter the configuration.
I tried using the --describe option with kafka-topics.sh and didn't
see any information on which topics are compacted. Do all brokers
need to be upgraded to 0.9.0.0 for this information to appear?
I was trying it with only one broker upgraded, and the others
were still running 0.8.2.1. Also the 0.9.0.0 broker had
inter.broker.protocol.version=0.8.2.X set in its server.properties
file. Also what is the command syntax for altering the config?

In 0.9 you cannot send unkeyed messages to compacted topics. In 0.8.x this
Post by Joel Koshy
would actually cause the log compaction thread to subsequently complain and
quit (and stop compacting all compacted topics). We did consider the
possibility of allowing producers to send both keyed and unkeyed but after
discussion we felt it would be better to fail fast and prevent unkeyed
messages from getting in. This was on the premise that supporting mixed
messages and only compacting a subset that have keys may not work very well
since the non-keyed messages would stick around indefinitely; however let
me know if you think differently on this and we can revisit.
Ok, thanks for the information. Letting compaction be a property
of the topic totally makes sense to me. I can imagine it would
simplify the design.

The protocol documentation describes error 2 as indicating that
the message content does not match its CRC, which is a bit
confusing. In the case of invalid CRC, a natural response would
be to resend. For instance, Bruce increments a failed delivery
count on the message, discards it if the count exceeds a
configurable threshold, or otherwise resends it. If error 2 is
received because an unkeyed message was sent to a
compacted topic, then the preferred action would be to
immediately discard. It would be helpful to avoid the ambiguity
by using a distinct error code for the latter case.

To further assist producers, it might also make sense to include
compaction info in metadata responses. This would allow the
producer to drop a message before even attempting to send it
in the case where its compaction status is wrong for the topic.
However I understand that this would involve substantially more
work, including incrementing the API version for metadata
requests and responses, and therefore may not be worth the
effort. If one were to make such a protocol change, it could be
done by adding a "flags" bitfield to the following production,

TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]

choosing a bit to indicate compaction status, and reserving the
remaining bits for future use.

Out of curiosity, how does a broker react on receipt of a keyed
message for a noncompacted topic? I would expect the
behavior to be the same as for the opposite case.

Thanks,
Dave

Loading...