Discussion:
Offsets/Lags for global state stores not shown
Patrik Kleindl
2018-11-06 10:03:15 UTC
Permalink
Hello

Am I doing something wrong or is it by design that global state stores and
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't get
any output for the group:
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.

If I query for the state I get a response that members are present:
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.

COORDINATOR (ID) ASSIGNMENT-STRATEGY
STATE #MEMBERS
broker:9092 (1) stream Stable 2

This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.

Code to reproduce:
builder.globalTable(TOPIC_NAME, Materialized
.<String, String, KeyValueStore<Bytes, byte[]>>as(STORENAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));

Nothing fancy.

Logs:
2018-11-05 21:25:56 INFO AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO ConsumerCoordinator:256 - Setting newly assigned
partitions []

The store works after this, but it is not shown.

Any input is appreciated

best regards

Patrik

PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility
Matthias J. Sax
2018-11-06 19:07:28 UTC
Permalink
The topics of global stores are not included by design.

The "problem" is, that each instance needs to consume *all*
topic-partitions from and thus topis, we thus they cannot be include
into the consumer group that would assign each partition to exactly one
instance. Hence, an additional consumer is used that uses partition
assignment (instead of subscription) and this consumer does not commit
any offset to Kafka.

Note that global stores are bootstrapped before processing begins
though, and are expected to be low throughput topic anyway.


-Matthias
Post by Patrik Kleindl
Hello
Am I doing something wrong or is it by design that global state stores and
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't get
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.
COORDINATOR (ID) ASSIGNMENT-STRATEGY
STATE #MEMBERS
broker:9092 (1) stream Stable 2
This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.
builder.globalTable(TOPIC_NAME, Materialized
.<String, String, KeyValueStore<Bytes, byte[]>>as(STORENAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
Nothing fancy.
2018-11-05 21:25:56 INFO AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO ConsumerCoordinator:256 - Setting newly assigned
partitions []
The store works after this, but it is not shown.
Any input is appreciated
best regards
Patrik
PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility
Patrik Kleindl
2018-11-07 09:32:09 UTC
Permalink
Thanks for the response.
How "low" is the expected low throughput? We are are using GlobalKTables
for IQ on several Topics, but with single-digit million unique messages and
usually fewer changes per day.

Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?

We have not used this approach more because it doesn't not work for global
stores inside a streams application, but it might be beneficial to split
that up again.

best regards

Patrik
Post by Matthias J. Sax
The topics of global stores are not included by design.
The "problem" is, that each instance needs to consume *all*
topic-partitions from and thus topis, we thus they cannot be include
into the consumer group that would assign each partition to exactly one
instance. Hence, an additional consumer is used that uses partition
assignment (instead of subscription) and this consumer does not commit
any offset to Kafka.
Note that global stores are bootstrapped before processing begins
though, and are expected to be low throughput topic anyway.
-Matthias
Post by Patrik Kleindl
Hello
Am I doing something wrong or is it by design that global state stores
and
Post by Patrik Kleindl
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't
get
Post by Patrik Kleindl
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.
COORDINATOR (ID) ASSIGNMENT-STRATEGY
STATE #MEMBERS
broker:9092 (1) stream Stable 2
This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.
builder.globalTable(TOPIC_NAME, Materialized
.<String, String, KeyValueStore<Bytes,
byte[]>>as(STORENAME)
Post by Patrik Kleindl
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
Nothing fancy.
2018-11-05 21:25:56 INFO AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO ConsumerCoordinator:256 - Setting newly
assigned
Post by Patrik Kleindl
partitions []
The store works after this, but it is not shown.
Any input is appreciated
best regards
Patrik
PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility
Matthias J. Sax
2018-11-18 19:21:26 UTC
Permalink
Because each instance needs to consume all data, it's limited by what a
single instance can consume -- a hard bound is the network. Note,
network is shared, so don't take the maximum network speed into account.
Also, it's not the number of unique messaged, but the number of updates
that is important for this.
Post by Patrik Kleindl
Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?
I guess, for your use case, this might be ok. There is one difference on
startup: if there is no local state build up, in the GlobalKTable case,
before you can start querying, the GlobalKTable will be fully populated
from the topic. For the KTable case, you can query from the very
beginning on, while data is put into the table.

Also, for this approach, if you add other processing, this processing
would not be parallelized but duplicated.


-Matthias
Post by Patrik Kleindl
Thanks for the response.
How "low" is the expected low throughput? We are are using GlobalKTables
for IQ on several Topics, but with single-digit million unique messages and
usually fewer changes per day.
Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?
We have not used this approach more because it doesn't not work for global
stores inside a streams application, but it might be beneficial to split
that up again.
best regards
Patrik
Post by Matthias J. Sax
The topics of global stores are not included by design.
The "problem" is, that each instance needs to consume *all*
topic-partitions from and thus topis, we thus they cannot be include
into the consumer group that would assign each partition to exactly one
instance. Hence, an additional consumer is used that uses partition
assignment (instead of subscription) and this consumer does not commit
any offset to Kafka.
Note that global stores are bootstrapped before processing begins
though, and are expected to be low throughput topic anyway.
-Matthias
Post by Patrik Kleindl
Hello
Am I doing something wrong or is it by design that global state stores
and
Post by Patrik Kleindl
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't
get
Post by Patrik Kleindl
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.
COORDINATOR (ID) ASSIGNMENT-STRATEGY
STATE #MEMBERS
broker:9092 (1) stream Stable 2
This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.
builder.globalTable(TOPIC_NAME, Materialized
.<String, String, KeyValueStore<Bytes,
byte[]>>as(STORENAME)
Post by Patrik Kleindl
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
Nothing fancy.
2018-11-05 21:25:56 INFO AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO ConsumerCoordinator:256 - Setting newly
assigned
Post by Patrik Kleindl
partitions []
The store works after this, but it is not shown.
Any input is appreciated
best regards
Patrik
PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility
Patrik Kleindl
2018-11-18 20:12:51 UTC
Permalink
Thanks for the reply.
It would be interesting who else is using IQ with or without GlobalKTables and what problems and solutions they have come up with.
Best regards
Patrik
Post by Matthias J. Sax
Because each instance needs to consume all data, it's limited by what a
single instance can consume -- a hard bound is the network. Note,
network is shared, so don't take the maximum network speed into account.
Also, it's not the number of unique messaged, but the number of updates
that is important for this.
Post by Patrik Kleindl
Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?
I guess, for your use case, this might be ok. There is one difference on
startup: if there is no local state build up, in the GlobalKTable case,
before you can start querying, the GlobalKTable will be fully populated
from the topic. For the KTable case, you can query from the very
beginning on, while data is put into the table.
Also, for this approach, if you add other processing, this processing
would not be parallelized but duplicated.
-Matthias
Post by Patrik Kleindl
Thanks for the response.
How "low" is the expected low throughput? We are are using GlobalKTables
for IQ on several Topics, but with single-digit million unique messages and
usually fewer changes per day.
Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?
We have not used this approach more because it doesn't not work for global
stores inside a streams application, but it might be beneficial to split
that up again.
best regards
Patrik
Post by Matthias J. Sax
The topics of global stores are not included by design.
The "problem" is, that each instance needs to consume *all*
topic-partitions from and thus topis, we thus they cannot be include
into the consumer group that would assign each partition to exactly one
instance. Hence, an additional consumer is used that uses partition
assignment (instead of subscription) and this consumer does not commit
any offset to Kafka.
Note that global stores are bootstrapped before processing begins
though, and are expected to be low throughput topic anyway.
-Matthias
Post by Patrik Kleindl
Hello
Am I doing something wrong or is it by design that global state stores
and
Post by Patrik Kleindl
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't
get
Post by Patrik Kleindl
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.
COORDINATOR (ID) ASSIGNMENT-STRATEGY
STATE #MEMBERS
broker:9092 (1) stream Stable 2
This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.
builder.globalTable(TOPIC_NAME, Materialized
.<String, String, KeyValueStore<Bytes,
byte[]>>as(STORENAME)
Post by Patrik Kleindl
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
Nothing fancy.
2018-11-05 21:25:56 INFO AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO ConsumerCoordinator:256 - Setting newly
assigned
Post by Patrik Kleindl
partitions []
The store works after this, but it is not shown.
Any input is appreciated
best regards
Patrik
PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility
Continue reading on narkive:
Loading...