Discussion:
Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table
Patrik Kleindl
2018-10-25 17:36:33 UTC
Permalink
Hello

Recently we noticed a lot of warning messages in the logs which pointed to
this method (we are running 2.0):

KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}

This was triggered for every record from a stream with an existing key but
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.

Our alternativ is to send the stream back to a named topic and build a new
table from it, but this is rather cumbersome and requires a separate topic
which also can't be cleaned up by the streams reset tool.

Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?

best regards

Patrik
Matthias J. Sax
2018-10-25 22:07:18 UTC
Permalink
Patrik,

`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.

If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.

If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.

Hope this helps.

-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which pointed to
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing key but
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.
Our alternativ is to send the stream back to a named topic and build a new
table from it, but this is rather cumbersome and requires a separate topic
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?
best regards
Patrik
Patrik Kleindl
2018-10-26 08:27:49 UTC
Permalink
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect the null values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no simple one-step stream-to-table operation exists?
Best regards
Patrik
Post by Matthias J. Sax
Patrik,
`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which pointed to
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing key but
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.
Our alternativ is to send the stream back to a named topic and build a new
table from it, but this is rather cumbersome and requires a separate topic
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?
best regards
Patrik
John Roesler
2018-10-26 16:30:39 UTC
Permalink
Hi Patrik,

Just to drop one observation in... Streaming to a topic and then consuming
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.

They both require a store to collect the table state, and in both cases,
the stores need to have a changelog topic. For the "reduce" version, it's
an internal changelog topic, and for the "topic-to-table" version, the
store can use the intermediate topic as its changelog.

This doesn't address your ergonomic concern, but it seemed worth pointing
out that (as far as I can tell), there doesn't seem to be a difference in
overhead.

Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect the
null values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no simple
one-step stream-to-table operation exists?
Best regards
Patrik
Post by Matthias J. Sax
Patrik,
`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which pointed
to
Post by Matthias J. Sax
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing key
but
Post by Matthias J. Sax
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.
Our alternativ is to send the stream back to a named topic and build a
new
Post by Matthias J. Sax
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a separate
topic
Post by Matthias J. Sax
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?
best regards
Patrik
Matthias J. Sax
2018-10-26 22:35:56 UTC
Permalink
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and thus
supported via KTable#toStream()).

Does this make sense?

Having said this: you _can_ write a KStream into a topic an read it back
as KTable. But it's semantically questionable to do so, IMHO. Maybe it
makes sense for your specific application, but in general I don't think
it does make sense.


-Matthias
Post by John Roesler
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then consuming
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both cases,
the stores need to have a changelog topic. For the "reduce" version, it's
an internal changelog topic, and for the "topic-to-table" version, the
store can use the intermediate topic as its changelog.
This doesn't address your ergonomic concern, but it seemed worth pointing
out that (as far as I can tell), there doesn't seem to be a difference in
overhead.
Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect the
null values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no simple
one-step stream-to-table operation exists?
Best regards
Patrik
Post by Matthias J. Sax
Patrik,
`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which pointed
to
Post by Matthias J. Sax
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing key
but
Post by Matthias J. Sax
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.
Our alternativ is to send the stream back to a named topic and build a
new
Post by Matthias J. Sax
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a separate
topic
Post by Matthias J. Sax
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?
best regards
Patrik
John Roesler
2018-10-27 21:49:58 UTC
Permalink
Hi again Patrik,

Actually, this is a good question... Can you share some context about why
you need to convert a stream to a table (including nulls as retractions)?

Thanks,
-John
Post by Matthias J. Sax
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and thus
supported via KTable#toStream()).
Does this make sense?
Having said this: you _can_ write a KStream into a topic an read it back
as KTable. But it's semantically questionable to do so, IMHO. Maybe it
makes sense for your specific application, but in general I don't think
it does make sense.
-Matthias
Post by John Roesler
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then
consuming
Post by John Roesler
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both cases,
the stores need to have a changelog topic. For the "reduce" version, it's
an internal changelog topic, and for the "topic-to-table" version, the
store can use the intermediate topic as its changelog.
This doesn't address your ergonomic concern, but it seemed worth pointing
out that (as far as I can tell), there doesn't seem to be a difference in
overhead.
Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect
the
Post by John Roesler
Post by Patrik Kleindl
null values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no
simple
Post by John Roesler
Post by Patrik Kleindl
one-step stream-to-table operation exists?
Best regards
Patrik
Post by Matthias J. Sax
Patrik,
`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which
pointed
Post by John Roesler
Post by Patrik Kleindl
to
Post by Matthias J. Sax
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(),
context().partition(),
Post by John Roesler
Post by Patrik Kleindl
Post by Matthias J. Sax
Post by Patrik Kleindl
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing key
but
Post by Matthias J. Sax
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.
Our alternativ is to send the stream back to a named topic and build a
new
Post by Matthias J. Sax
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a separate
topic
Post by Matthias J. Sax
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to
achieve
Post by John Roesler
Post by Patrik Kleindl
Post by Matthias J. Sax
Post by Patrik Kleindl
this directly?
best regards
Patrik
Patrik Kleindl
2018-10-29 09:08:23 UTC
Permalink
Hi John and Matthias
thanks for the questions, maybe explaining our use case helps a bit:
We are receiving CDC records (row-level insert/update/delete) in one topic
per table. The key is derived from the DB records, the value is null in
case of deletes. Those would be the immutable facts I guess.
These topics are first streamed through a deduplication Transformer to drop
changes on irrelevant fields.
The results are translated to KTables and joined to each other to represent
the same result as the SQLs on the database, but faster. At this stage the
delete/null records matter because if a record gets deleted then we want it
to drop out of the join too. -> Our reduce-approach produced unexpected
results here.
We took the deduplication step separately because in some cases we only
need the the KStream for processing.
If you see a simpler/cleaner approach here I'm open to suggestions, of
course.

Regarding the overhead:
1) Named topics create management/maintenance overhead because they have to
be created/treated separately (auto-create is not an option) and be
considered in future changes, topology changes/resets and so on. The
internal topic removes most of those issues.
2) One of our developers came up with the question if the traffic to/from
the broker was actually the same in both scenarios, we expect that the same
is written to the broker for the named topic as well as the reduce-case,
but if the KTable is maintained inside a streams topology, does it have to
read back everything it sends to the broker or can it keep the table
internally? I hope it is understandable what I mean, otherwise I can try
the explain it more clearly.

best regards

Patrik
Post by John Roesler
Hi again Patrik,
Actually, this is a good question... Can you share some context about why
you need to convert a stream to a table (including nulls as retractions)?
Thanks,
-John
Post by Matthias J. Sax
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and thus
supported via KTable#toStream()).
Does this make sense?
Having said this: you _can_ write a KStream into a topic an read it back
as KTable. But it's semantically questionable to do so, IMHO. Maybe it
makes sense for your specific application, but in general I don't think
it does make sense.
-Matthias
Post by John Roesler
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then
consuming
Post by John Roesler
it as a table does create overhead, but so does reducing a stream to a
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both
cases,
Post by Matthias J. Sax
Post by John Roesler
the stores need to have a changelog topic. For the "reduce" version,
it's
Post by Matthias J. Sax
Post by John Roesler
an internal changelog topic, and for the "topic-to-table" version, the
store can use the intermediate topic as its changelog.
This doesn't address your ergonomic concern, but it seemed worth
pointing
Post by Matthias J. Sax
Post by John Roesler
out that (as far as I can tell), there doesn't seem to be a difference
in
Post by Matthias J. Sax
Post by John Roesler
overhead.
Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does respect
the
Post by John Roesler
Post by Patrik Kleindl
null values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no
simple
Post by John Roesler
Post by Patrik Kleindl
one-step stream-to-table operation exists?
Best regards
Patrik
Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <
Patrik,
`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` --
the
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values
with
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
the surrogate-delete-marker that you can evaluate in
`Reducer#apply()`
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which
pointed
Post by John Roesler
Post by Patrik Kleindl
to
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value.
key=[{}]
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(),
context().partition(),
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing
key
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
but
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records
from
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
working.
Our alternativ is to send the stream back to a named topic and
build a
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
new
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a separate
topic
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to
achieve
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
this directly?
best regards
Patrik
Guozhang Wang
2018-11-19 00:57:47 UTC
Permalink
Hi Patrik,

Thanks for explaining your use case to us. While we can still discuss how
KStream should interpret null-values in aggregations, one workaround atm:
if you deduplication logic can be written as a transformValues operation,
you can do the following:


builder.table("source-topic").transformValues(...
Materialized.as("store-name"))

Note that in a recent PR that we are merging, the source KTable from
builder.table() would not be materialized if users do not specify a
materialized store name, only the value-transformed KTable will be
materialized:

https://github.com/apache/kafka/pull/5779


Would that work for you?

Guozhang
Post by Patrik Kleindl
Hi John and Matthias
We are receiving CDC records (row-level insert/update/delete) in one topic
per table. The key is derived from the DB records, the value is null in
case of deletes. Those would be the immutable facts I guess.
These topics are first streamed through a deduplication Transformer to drop
changes on irrelevant fields.
The results are translated to KTables and joined to each other to represent
the same result as the SQLs on the database, but faster. At this stage the
delete/null records matter because if a record gets deleted then we want it
to drop out of the join too. -> Our reduce-approach produced unexpected
results here.
We took the deduplication step separately because in some cases we only
need the the KStream for processing.
If you see a simpler/cleaner approach here I'm open to suggestions, of
course.
1) Named topics create management/maintenance overhead because they have to
be created/treated separately (auto-create is not an option) and be
considered in future changes, topology changes/resets and so on. The
internal topic removes most of those issues.
2) One of our developers came up with the question if the traffic to/from
the broker was actually the same in both scenarios, we expect that the same
is written to the broker for the named topic as well as the reduce-case,
but if the KTable is maintained inside a streams topology, does it have to
read back everything it sends to the broker or can it keep the table
internally? I hope it is understandable what I mean, otherwise I can try
the explain it more clearly.
best regards
Patrik
Post by John Roesler
Hi again Patrik,
Actually, this is a good question... Can you share some context about why
you need to convert a stream to a table (including nulls as retractions)?
Thanks,
-John
Post by Matthias J. Sax
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and thus
supported via KTable#toStream()).
Does this make sense?
Having said this: you _can_ write a KStream into a topic an read it
back
Post by John Roesler
Post by Matthias J. Sax
as KTable. But it's semantically questionable to do so, IMHO. Maybe it
makes sense for your specific application, but in general I don't think
it does make sense.
-Matthias
Post by John Roesler
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then
consuming
Post by John Roesler
it as a table does create overhead, but so does reducing a stream to
a
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both
cases,
Post by Matthias J. Sax
Post by John Roesler
the stores need to have a changelog topic. For the "reduce" version,
it's
Post by Matthias J. Sax
Post by John Roesler
an internal changelog topic, and for the "topic-to-table" version,
the
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
store can use the intermediate topic as its changelog.
This doesn't address your ergonomic concern, but it seemed worth
pointing
Post by Matthias J. Sax
Post by John Roesler
out that (as far as I can tell), there doesn't seem to be a
difference
Post by John Roesler
in
Post by Matthias J. Sax
Post by John Roesler
overhead.
Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does
respect
Post by John Roesler
Post by Matthias J. Sax
the
Post by John Roesler
Post by Patrik Kleindl
null values as deletes, correct? But at the price of some overhead.
Is there any (historical, technical or emotional;-)) reason that no
simple
Post by John Roesler
Post by Patrik Kleindl
one-step stream-to-table operation exists?
Best regards
Patrik
Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <
Patrik,
`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` --
the
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values
with
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
the surrogate-delete-marker that you can evaluate in
`Reducer#apply()`
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which
pointed
Post by John Roesler
Post by Patrik Kleindl
to
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value.
key=[{}]
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(),
context().partition(),
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an existing
key
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
but
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records
from
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
working.
Our alternativ is to send the stream back to a named topic and
build a
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
new
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a
separate
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
topic
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to
achieve
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
this directly?
best regards
Patrik
--
-- Guozhang
John Roesler
2018-11-20 17:19:03 UTC
Permalink
Hi again, Patrik,

You'll probably be interested in this recent Jira:
https://issues.apache.org/jira/browse/KAFKA-7658

You have a good point about the overhead of going through an intermediate
topic... I can see how explicit topic management is an operational burden,
and you're also right that the changelog topic only gets read on state
restoration. That was an oversight on my part.

I think that with KAFKA-7658 and https://github.com/apache/kafka/pull/5779,
you'll have two good options in the future.

To solve your problem *right now*, you can circumvent the null filtering by
wrapping the values of your stream. For example, immediately before the
reduce, you could mapValues and wrap the values with Optional. Then, your
reduce function can unwrap the Optional and return null if it's empty. Does
that make sense?

This comes with an important caveat, though, which is part of the
motivation for this roadblock to begin with:
if your incoming data gets repartitioned in your topology, then the order
of records for the key is not deterministic. This would break the semantics
of your reduce-to-latest function, and, indeed, any non-commutative reduce
function.

For example, if you have a topic like:
dummykey1: {realkey: A, value: 4}
dummykey2: {realkey: A, value: 5}

and you do a groupBy( select realkey )
and then reduce( keep latest value)

Then, if dummykey1 and dummykey2 are in different partitions, the result
would be either A:4 or A:5, depending on which input partition processed
first.

We have discussed several times solutions to resolve this issue, but it's
quite complex in the details.

Nevertheless, if you're careful and ensure that you don't have multiple
threads producing the same key into the input topic, and also that you
don't have a repartition in the middle, then this should work for you.

Hope this helps!
-john
Post by John Roesler
Hi Patrik,
Thanks for explaining your use case to us. While we can still discuss how
if you deduplication logic can be written as a transformValues operation,
builder.table("source-topic").transformValues(...
Materialized.as("store-name"))
Note that in a recent PR that we are merging, the source KTable from
builder.table() would not be materialized if users do not specify a
materialized store name, only the value-transformed KTable will be
https://github.com/apache/kafka/pull/5779
Would that work for you?
Guozhang
Post by Patrik Kleindl
Hi John and Matthias
We are receiving CDC records (row-level insert/update/delete) in one
topic
Post by Patrik Kleindl
per table. The key is derived from the DB records, the value is null in
case of deletes. Those would be the immutable facts I guess.
These topics are first streamed through a deduplication Transformer to
drop
Post by Patrik Kleindl
changes on irrelevant fields.
The results are translated to KTables and joined to each other to
represent
Post by Patrik Kleindl
the same result as the SQLs on the database, but faster. At this stage
the
Post by Patrik Kleindl
delete/null records matter because if a record gets deleted then we want
it
Post by Patrik Kleindl
to drop out of the join too. -> Our reduce-approach produced unexpected
results here.
We took the deduplication step separately because in some cases we only
need the the KStream for processing.
If you see a simpler/cleaner approach here I'm open to suggestions, of
course.
1) Named topics create management/maintenance overhead because they have
to
Post by Patrik Kleindl
be created/treated separately (auto-create is not an option) and be
considered in future changes, topology changes/resets and so on. The
internal topic removes most of those issues.
2) One of our developers came up with the question if the traffic to/from
the broker was actually the same in both scenarios, we expect that the
same
Post by Patrik Kleindl
is written to the broker for the named topic as well as the reduce-case,
but if the KTable is maintained inside a streams topology, does it have
to
Post by Patrik Kleindl
read back everything it sends to the broker or can it keep the table
internally? I hope it is understandable what I mean, otherwise I can try
the explain it more clearly.
best regards
Patrik
Post by John Roesler
Hi again Patrik,
Actually, this is a good question... Can you share some context about
why
Post by Patrik Kleindl
Post by John Roesler
you need to convert a stream to a table (including nulls as
retractions)?
Post by Patrik Kleindl
Post by John Roesler
Thanks,
-John
Post by Matthias J. Sax
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and
thus
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
supported via KTable#toStream()).
Does this make sense?
Having said this: you _can_ write a KStream into a topic an read it
back
Post by John Roesler
Post by Matthias J. Sax
as KTable. But it's semantically questionable to do so, IMHO. Maybe
it
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
makes sense for your specific application, but in general I don't
think
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
it does make sense.
-Matthias
Post by John Roesler
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then
consuming
Post by John Roesler
it as a table does create overhead, but so does reducing a stream
to
Post by Patrik Kleindl
a
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both
cases,
Post by Matthias J. Sax
Post by John Roesler
the stores need to have a changelog topic. For the "reduce"
version,
Post by Patrik Kleindl
Post by John Roesler
it's
Post by Matthias J. Sax
Post by John Roesler
an internal changelog topic, and for the "topic-to-table" version,
the
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
store can use the intermediate topic as its changelog.
This doesn't address your ergonomic concern, but it seemed worth
pointing
Post by Matthias J. Sax
Post by John Roesler
out that (as far as I can tell), there doesn't seem to be a
difference
Post by John Roesler
in
Post by Matthias J. Sax
Post by John Roesler
overhead.
Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does
respect
Post by John Roesler
Post by Matthias J. Sax
the
Post by John Roesler
Post by Patrik Kleindl
null values as deletes, correct? But at the price of some
overhead.
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Is there any (historical, technical or emotional;-)) reason that
no
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
simple
Post by John Roesler
Post by Patrik Kleindl
one-step stream-to-table operation exists?
Best regards
Patrik
Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <
Patrik,
`null` values in a KStream don't have delete semantics (it's not
a
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the
result
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
KTable, your `Reducer#apply()` implementation must return `null`
--
Post by Patrik Kleindl
Post by John Roesler
the
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce()
to
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null`
values
Post by Patrik Kleindl
Post by John Roesler
with
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
the surrogate-delete-marker that you can evaluate in
`Reducer#apply()`
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which
pointed
Post by John Roesler
Post by Patrik Kleindl
to
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to
proceed
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value.
key=[{}]
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(),
context().partition(),
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an
existing
Post by Patrik Kleindl
Post by John Roesler
key
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
but
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a
KTable.
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of
records
Post by Patrik Kleindl
Post by John Roesler
from
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
working.
Our alternativ is to send the stream back to a named topic and
build a
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
new
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a
separate
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
topic
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to
achieve
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
this directly?
best regards
Patrik
--
-- Guozhang
Patrik Kleindl
2018-11-20 20:00:35 UTC
Permalink
Hi John and Guozhang
Thanks to both of you.
I will check with our developers if they want to adopt your suggestions.
Using the same ValueTransformer for deduplication on both streams and tables might simplify things.
We have eased the operational burden a bit by improving our topic provisioning so we can also hold out a bit.
KAFKA-7658 sounds great and made me chuckle because I just asked for this, now I see that there were some discussions/emotions regarding this a lot earlier ;-)
Best regards
Patrik
Post by John Roesler
Hi again, Patrik,
https://issues.apache.org/jira/browse/KAFKA-7658
You have a good point about the overhead of going through an intermediate
topic... I can see how explicit topic management is an operational burden,
and you're also right that the changelog topic only gets read on state
restoration. That was an oversight on my part.
I think that with KAFKA-7658 and https://github.com/apache/kafka/pull/5779,
you'll have two good options in the future.
To solve your problem *right now*, you can circumvent the null filtering by
wrapping the values of your stream. For example, immediately before the
reduce, you could mapValues and wrap the values with Optional. Then, your
reduce function can unwrap the Optional and return null if it's empty. Does
that make sense?
This comes with an important caveat, though, which is part of the
if your incoming data gets repartitioned in your topology, then the order
of records for the key is not deterministic. This would break the semantics
of your reduce-to-latest function, and, indeed, any non-commutative reduce
function.
dummykey1: {realkey: A, value: 4}
dummykey2: {realkey: A, value: 5}
and you do a groupBy( select realkey )
and then reduce( keep latest value)
Then, if dummykey1 and dummykey2 are in different partitions, the result
would be either A:4 or A:5, depending on which input partition processed
first.
We have discussed several times solutions to resolve this issue, but it's
quite complex in the details.
Nevertheless, if you're careful and ensure that you don't have multiple
threads producing the same key into the input topic, and also that you
don't have a repartition in the middle, then this should work for you.
Hope this helps!
-john
Post by John Roesler
Hi Patrik,
Thanks for explaining your use case to us. While we can still discuss how
if you deduplication logic can be written as a transformValues operation,
builder.table("source-topic").transformValues(...
Materialized.as("store-name"))
Note that in a recent PR that we are merging, the source KTable from
builder.table() would not be materialized if users do not specify a
materialized store name, only the value-transformed KTable will be
https://github.com/apache/kafka/pull/5779
Would that work for you?
Guozhang
Post by Patrik Kleindl
Hi John and Matthias
We are receiving CDC records (row-level insert/update/delete) in one
topic
Post by Patrik Kleindl
per table. The key is derived from the DB records, the value is null in
case of deletes. Those would be the immutable facts I guess.
These topics are first streamed through a deduplication Transformer to
drop
Post by Patrik Kleindl
changes on irrelevant fields.
The results are translated to KTables and joined to each other to
represent
Post by Patrik Kleindl
the same result as the SQLs on the database, but faster. At this stage
the
Post by Patrik Kleindl
delete/null records matter because if a record gets deleted then we want
it
Post by Patrik Kleindl
to drop out of the join too. -> Our reduce-approach produced unexpected
results here.
We took the deduplication step separately because in some cases we only
need the the KStream for processing.
If you see a simpler/cleaner approach here I'm open to suggestions, of
course.
1) Named topics create management/maintenance overhead because they have
to
Post by Patrik Kleindl
be created/treated separately (auto-create is not an option) and be
considered in future changes, topology changes/resets and so on. The
internal topic removes most of those issues.
2) One of our developers came up with the question if the traffic to/from
the broker was actually the same in both scenarios, we expect that the
same
Post by Patrik Kleindl
is written to the broker for the named topic as well as the reduce-case,
but if the KTable is maintained inside a streams topology, does it have
to
Post by Patrik Kleindl
read back everything it sends to the broker or can it keep the table
internally? I hope it is understandable what I mean, otherwise I can try
the explain it more clearly.
best regards
Patrik
Post by John Roesler
Hi again Patrik,
Actually, this is a good question... Can you share some context about
why
Post by Patrik Kleindl
Post by John Roesler
you need to convert a stream to a table (including nulls as
retractions)?
Post by Patrik Kleindl
Post by John Roesler
Thanks,
-John
Post by Matthias J. Sax
I don't know your overall application setup. However, a KStream
semantically models immutable facts and there is not update semantic.
Thus, it seems semantically questionable, to allow changing the
semantics from facts to updates (the other way is easier IMHO, and
thus
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
supported via KTable#toStream()).
Does this make sense?
Having said this: you _can_ write a KStream into a topic an read it
back
Post by John Roesler
Post by Matthias J. Sax
as KTable. But it's semantically questionable to do so, IMHO. Maybe
it
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
makes sense for your specific application, but in general I don't
think
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
it does make sense.
-Matthias
Post by John Roesler
Hi Patrik,
Just to drop one observation in... Streaming to a topic and then
consuming
Post by John Roesler
it as a table does create overhead, but so does reducing a stream
to
Post by Patrik Kleindl
a
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
table, and I think it's actually the same in either case.
They both require a store to collect the table state, and in both
cases,
Post by Matthias J. Sax
Post by John Roesler
the stores need to have a changelog topic. For the "reduce"
version,
Post by Patrik Kleindl
Post by John Roesler
it's
Post by Matthias J. Sax
Post by John Roesler
an internal changelog topic, and for the "topic-to-table" version,
the
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
store can use the intermediate topic as its changelog.
This doesn't address your ergonomic concern, but it seemed worth
pointing
Post by Matthias J. Sax
Post by John Roesler
out that (as far as I can tell), there doesn't seem to be a
difference
Post by John Roesler
in
Post by Matthias J. Sax
Post by John Roesler
overhead.
Hope this helps!
-John
Post by Patrik Kleindl
Hello Matthias,
thank you for the explanation.
Streaming back to a topic and consuming this as a KTable does
respect
Post by John Roesler
Post by Matthias J. Sax
the
Post by John Roesler
Post by Patrik Kleindl
null values as deletes, correct? But at the price of some
overhead.
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Is there any (historical, technical or emotional;-)) reason that
no
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
simple
Post by John Roesler
Post by Patrik Kleindl
one-step stream-to-table operation exists?
Best regards
Patrik
Am 26.10.2018 um 00:07 schrieb Matthias J. Sax <
Patrik,
`null` values in a KStream don't have delete semantics (it's not
a
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
changelog stream). That's why we drop them in the KStream#reduce
implemenation.
If you want to explicitly remove results for a key from the
result
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
KTable, your `Reducer#apply()` implementation must return `null`
--
Post by Patrik Kleindl
Post by John Roesler
the
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.
If you want to use `null` from your KStream to trigger reduce()
to
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null`
values
Post by Patrik Kleindl
Post by John Roesler
with
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
the surrogate-delete-marker that you can evaluate in
`Reducer#apply()`
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
to return `null` for this case.
Hope this helps.
-Matthias
Post by Patrik Kleindl
Hello
Recently we noticed a lot of warning messages in the logs which
pointed
Post by John Roesler
Post by Patrik Kleindl
to
Post by Patrik Kleindl
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to
proceed
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value.
key=[{}]
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(),
context().partition(),
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}
This was triggered for every record from a stream with an
existing
Post by Patrik Kleindl
Post by John Roesler
key
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
but
Post by Patrik Kleindl
a null value which we put through groupBy/reduce to get a
KTable.
Post by Patrik Kleindl
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of
records
Post by Patrik Kleindl
Post by John Roesler
from
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
working.
Our alternativ is to send the stream back to a named topic and
build a
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
new
Post by Patrik Kleindl
table from it, but this is rather cumbersome and requires a
separate
Post by John Roesler
Post by Matthias J. Sax
Post by John Roesler
Post by Patrik Kleindl
topic
Post by Patrik Kleindl
which also can't be cleaned up by the streams reset tool.
Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to
achieve
Post by John Roesler
Post by Patrik Kleindl
Post by Patrik Kleindl
this directly?
best regards
Patrik
--
-- Guozhang
Loading...