Discussion:
Immutable Record with Kafka Stream
(too old to reply)
Kohki Nishio
2017-02-24 15:20:13 UTC
Permalink
Raw Message
Hello Kafka experts

I'm trying to do windowed aggregation with Kafka Stream, however I'm
getting multiple messages for the same time window, I know this is an
expected behavior, however I really want to have a single message for given
time window.

my test code looks like below

builder.stream("test-stream")
.groupByKey()
.aggregate(
new DataPointsInitializer,
new DataPointsAggregator,
TimeWindows.of(60000).until(60000),
new DataPointsSerde,
"test-stream")
.toStream()
.print()

But if data arrives like this (it has its own time field)

01:38:20,Metric1,10
01:38:21,Metric1,10

< long pause >

01:38:22,Metric1,10

Then I get output like this

[KTABLE-TOSTREAM-0000000002]: [***@1487813880000] , Map(10.0 -> 2)
[KTABLE-TOSTREAM-0000000002]: [***@1487813880000] , Map(10.0 -> 3)

I want to drop the last one so that I don't have duplicate messages, Thanks
--
Kohki Nishio
Eno Thereska
2017-02-24 16:33:09 UTC
Permalink
Raw Message
Hi Kohki,

As you mentioned, this is expected behavior. However, if you are willing to tolerate some more latency, you can improve the chance that a message with the same key is overwritten by increasing the commit time. By default it is 30 seconds, but you can increase it:

streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 40000);

This will make the dedup cache work better (for documentation see http://docs.confluent.io/3.1.2/streams/developer-guide.html#memory-management <http://docs.confluent.io/3.1.2/streams/developer-guide.html#memory-management>). However, this does not guarantee deduplicates do not happen.

Thanks
Eno
Post by Kohki Nishio
Hello Kafka experts
I'm trying to do windowed aggregation with Kafka Stream, however I'm
getting multiple messages for the same time window, I know this is an
expected behavior, however I really want to have a single message for given
time window.
my test code looks like below
builder.stream("test-stream")
.groupByKey()
.aggregate(
new DataPointsInitializer,
new DataPointsAggregator,
TimeWindows.of(60000).until(60000),
new DataPointsSerde,
"test-stream")
.toStream()
.print()
But if data arrives like this (it has its own time field)
01:38:20,Metric1,10
01:38:21,Metric1,10
< long pause >
01:38:22,Metric1,10
Then I get output like this
I want to drop the last one so that I don't have duplicate messages, Thanks
--
Kohki Nishio
Kohki Nishio
2017-02-24 17:59:12 UTC
Permalink
Raw Message
Thanks for the info, however there's an alarming functionality, duplicate
message is a tricky thing to manage.. I thought 'retention-period' could
work for that purpose, however here's the result

My TimeWindow is

TimeWindows.of(60000).until(60000),

And here's the input

00:00:00,metric,1
00:01:00,metric,1
00:03:00,metric,1
00:04:00,metric,1
00:05:00,metric,1
00:06:00,metric,1

<long pause>

00:00:00,metric,2

<long pause>

00:00:00,metric,3

The output below

[***@1487894400000] , Map(1.0 -> 1)
[***@1487894460000] , Map(1.0 -> 1)
[***@1487894580000] , Map(1.0 -> 1)
[***@1487894640000] , Map(1.0 -> 1)
[***@1487894700000] , Map(1.0 -> 1)
[***@1487894760000] , Map(1.0 -> 1)
[***@1487894400000] , Map(2.0 -> 1, 1.0 -> 1) <======== ??
[***@1487894400000] , Map(2.0 -> 1, 1.0 -> 1, 3.0 -> 1) <====== ??

I don't understand why the last two happens ... I'm looking into the source
code, however I wonder if I'm doing something wrong ..
Post by Eno Thereska
Hi Kohki,
As you mentioned, this is expected behavior. However, if you are willing
to tolerate some more latency, you can improve the chance that a message
with the same key is overwritten by increasing the commit time. By default
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 40000);
This will make the dedup cache work better (for documentation see
http://docs.confluent.io/3.1.2/streams/developer-guide.
html#memory-management <http://docs.confluent.io/3.1.
2/streams/developer-guide.html#memory-management>). However, this does
not guarantee deduplicates do not happen.
Thanks
Eno
Post by Kohki Nishio
Hello Kafka experts
I'm trying to do windowed aggregation with Kafka Stream, however I'm
getting multiple messages for the same time window, I know this is an
expected behavior, however I really want to have a single message for
given
Post by Kohki Nishio
time window.
my test code looks like below
builder.stream("test-stream")
.groupByKey()
.aggregate(
new DataPointsInitializer,
new DataPointsAggregator,
TimeWindows.of(60000).until(60000),
new DataPointsSerde,
"test-stream")
.toStream()
.print()
But if data arrives like this (it has its own time field)
01:38:20,Metric1,10
01:38:21,Metric1,10
< long pause >
01:38:22,Metric1,10
Then I get output like this
I want to drop the last one so that I don't have duplicate messages,
Thanks
Post by Kohki Nishio
--
Kohki Nishio
--
Kohki Nishio
Guozhang Wang
2017-02-24 18:52:01 UTC
Permalink
Raw Message
Hi Kohki,

Note that Streams execute operations based on the "timestamp" of the
record, i.e. in your case it is the "event time" not the processing time.
When you received


00:00:00,metric,2


After the long pause, it is considered as a "late arrived record" which
happens at 00:00:00 but received late. Hence it will still be aggregated
under the old window bucket of 1487894400000 based on its timestamp and
hence the result, containing two records.


Does that make sense?

Guozhang
Post by Kohki Nishio
Thanks for the info, however there's an alarming functionality, duplicate
message is a tricky thing to manage.. I thought 'retention-period' could
work for that purpose, however here's the result
My TimeWindow is
TimeWindows.of(60000).until(60000),
And here's the input
00:00:00,metric,1
00:01:00,metric,1
00:03:00,metric,1
00:04:00,metric,1
00:05:00,metric,1
00:06:00,metric,1
<long pause>
00:00:00,metric,2
<long pause>
00:00:00,metric,3
The output below
I don't understand why the last two happens ... I'm looking into the source
code, however I wonder if I'm doing something wrong ..
Post by Eno Thereska
Hi Kohki,
As you mentioned, this is expected behavior. However, if you are willing
to tolerate some more latency, you can improve the chance that a message
with the same key is overwritten by increasing the commit time. By
default
Post by Eno Thereska
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
40000);
Post by Eno Thereska
This will make the dedup cache work better (for documentation see
http://docs.confluent.io/3.1.2/streams/developer-guide.
html#memory-management <http://docs.confluent.io/3.1.
2/streams/developer-guide.html#memory-management>). However, this does
not guarantee deduplicates do not happen.
Thanks
Eno
Post by Kohki Nishio
Hello Kafka experts
I'm trying to do windowed aggregation with Kafka Stream, however I'm
getting multiple messages for the same time window, I know this is an
expected behavior, however I really want to have a single message for
given
Post by Kohki Nishio
time window.
my test code looks like below
builder.stream("test-stream")
.groupByKey()
.aggregate(
new DataPointsInitializer,
new DataPointsAggregator,
TimeWindows.of(60000).until(60000),
new DataPointsSerde,
"test-stream")
.toStream()
.print()
But if data arrives like this (it has its own time field)
01:38:20,Metric1,10
01:38:21,Metric1,10
< long pause >
01:38:22,Metric1,10
Then I get output like this
I want to drop the last one so that I don't have duplicate messages,
Thanks
Post by Kohki Nishio
--
Kohki Nishio
--
Kohki Nishio
--
-- Guozhang
Kohki Nishio
2017-02-24 19:11:27 UTC
Permalink
Raw Message
Guozhang, thanks for the reply, but I'm having trouble understanding,
here's the statement from the document

Windowing operations are available in the Kafka Streams DSL
<http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-dsl>,
where users can specify a *retention period* for the window. This allows
Kafka Streams to retain old window buckets for a period of time in order to
wait for the late arrival of records whose timestamps fall within the
window interval. If a record arrives after the retention period has passed,
the record cannot be processed and is dropped.
And I believe I can set retention period by using 'until'

TimeWindows.of(60000).until(60000)


After receiving a data from (00:06:00), I don't know why it still continue
receiving data from time of 00:00:00, what is 'until' supposed to do ?

Thanks
-Kohki
Jozef.koval
2017-02-24 22:16:51 UTC
Permalink
Raw Message
Hi Kohki,

Kafka streams windows use so called "segments" internally and their retention time cannot be lower than some minimum. Your configuration is set to less than this minimum, therefore is not accepted. Even Windows#until javadoc specifies it:

* Set the window maintain duration (retention time) in milliseconds.

* This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.

For more info consider reading [this](https://github.com/confluentinc/examples/issues/76) issue.

Regards, Jozef


Sent from [ProtonMail](https://protonmail.ch), encrypted email based in Switzerland.



-------- Original Message --------
Subject: Re: Immutable Record with Kafka Stream
Local Time: February 24, 2017 7:11 PM
UTC Time: February 24, 2017 7:11 PM
From: ***@gmail.com
To: ***@kafka.apache.org

Guozhang, thanks for the reply, but I'm having trouble understanding,
here's the statement from the document

Windowing operations are available in the Kafka Streams DSL
<http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-dsl>,
where users can specify a *retention period* for the window. This allows
Kafka Streams to retain old window buckets for a period of time in order to
wait for the late arrival of records whose timestamps fall within the
window interval. If a record arrives after the retention period has passed,
the record cannot be processed and is dropped.
And I believe I can set retention period by using 'until'

TimeWindows.of(60000).until(60000)


After receiving a data from (00:06:00), I don't know why it still continue
receiving data from time of 00:00:00, what is 'until' supposed to do ?

Thanks
-Kohki
Matthias J. Sax
2017-02-24 22:39:54 UTC
Permalink
Raw Message
First, I want to mention that you do no see "duplicate" -- you see late
updates. Kafka Streams embraces "change" and there is no such thing as a
final aggregate, but each agg output record is an update/refinement of
the result.

Strict filtering of "late updates" is hard in Kafka Streams

If you want to have such filtering, you would need to use

aggregate(...).toStream().transform()

with an attached state for transform() to implement this filter
manually. The state holds all emitted record per key. If a records
arrives, you check if its in the state of not. If not, you add it to the
state and emit it. If yes, you just drop the record.

However, this will still not be perfect, because each time a commit is
triggered, the current window is flushed even if "stream time" did not
pass "window end" timestamp -- thus, the window is not completed yet.

Thus, you would also need to consider current "stream time" that you can
indirectly access via .punctuate(). Thus, for incomplete windows you
might want to filter those "intermediate results" and not add to the
store. This is hard to get right (I am even not sure if it is possible
at all to get right).

Even if this works however, this will only give you no duplicates (in
the strong sense of duplicate) as long as no error occurs. Kafka Streams
does not (yet) support exactly once processing and thus, in case of a
failure, you might get duplicate outputs.

I am not sure what kind of alerting you are doing, but you should
remember if you did raise an alert in some other way, and if an late
update (or real duplicate) occurs don't alert a second time.

Hope this helps.



-Matthias
Post by Eno Thereska
Hi Kohki,
* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
For more info consider reading [this](https://github.com/confluentinc/examples/issues/76) issue.
Regards, Jozef
Sent from [ProtonMail](https://protonmail.ch), encrypted email based in Switzerland.
-------- Original Message --------
Subject: Re: Immutable Record with Kafka Stream
Local Time: February 24, 2017 7:11 PM
UTC Time: February 24, 2017 7:11 PM
Guozhang, thanks for the reply, but I'm having trouble understanding,
here's the statement from the document
Windowing operations are available in the Kafka Streams DSL
<http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-dsl>,
where users can specify a *retention period* for the window. This allows
Kafka Streams to retain old window buckets for a period of time in order to
wait for the late arrival of records whose timestamps fall within the
window interval. If a record arrives after the retention period has passed,
the record cannot be processed and is dropped.
And I believe I can set retention period by using 'until'
TimeWindows.of(60000).until(60000)
After receiving a data from (00:06:00), I don't know why it still continue
receiving data from time of 00:00:00, what is 'until' supposed to do ?
Thanks
-Kohki
Kohki Nishio
2017-02-26 18:29:21 UTC
Permalink
Raw Message
I'm planning to do aggregation over metrics, and when 'flush' happens, it
emits an aggregation to the downstream (e.g. alarming)

Let say the first message saying some average number is very high and it
triggers an alarm and later on user comes to the system and checks the
number, it might have already been updated with normal value due to the
late update. This is quite difficult to manage, all of downstream need to
be stateful .. I'd like the alarming system to be stateless, I wonder how
this should be handled ..

-Kohki
Post by Matthias J. Sax
First, I want to mention that you do no see "duplicate" -- you see late
updates. Kafka Streams embraces "change" and there is no such thing as a
final aggregate, but each agg output record is an update/refinement of
the result.
Strict filtering of "late updates" is hard in Kafka Streams
If you want to have such filtering, you would need to use
aggregate(...).toStream().transform()
with an attached state for transform() to implement this filter
manually. The state holds all emitted record per key. If a records
arrives, you check if its in the state of not. If not, you add it to the
state and emit it. If yes, you just drop the record.
However, this will still not be perfect, because each time a commit is
triggered, the current window is flushed even if "stream time" did not
pass "window end" timestamp -- thus, the window is not completed yet.
Thus, you would also need to consider current "stream time" that you can
indirectly access via .punctuate(). Thus, for incomplete windows you
might want to filter those "intermediate results" and not add to the
store. This is hard to get right (I am even not sure if it is possible
at all to get right).
Even if this works however, this will only give you no duplicates (in
the strong sense of duplicate) as long as no error occurs. Kafka Streams
does not (yet) support exactly once processing and thus, in case of a
failure, you might get duplicate outputs.
I am not sure what kind of alerting you are doing, but you should
remember if you did raise an alert in some other way, and if an late
update (or real duplicate) occurs don't alert a second time.
Hope this helps.
-Matthias
Post by Eno Thereska
Hi Kohki,
Kafka streams windows use so called "segments" internally and their
retention time cannot be lower than some minimum. Your configuration is set
to less than this minimum, therefore is not accepted. Even Windows#until
Post by Eno Thereska
* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a
window will be maintained.
Post by Eno Thereska
For more info consider reading [this](https://github.com/
confluentinc/examples/issues/76) issue.
Post by Eno Thereska
Regards, Jozef
Sent from [ProtonMail](https://protonmail.ch), encrypted email based in
Switzerland.
Post by Eno Thereska
-------- Original Message --------
Subject: Re: Immutable Record with Kafka Stream
Local Time: February 24, 2017 7:11 PM
UTC Time: February 24, 2017 7:11 PM
Guozhang, thanks for the reply, but I'm having trouble understanding,
here's the statement from the document
Windowing operations are available in the Kafka Streams DSL
<http://docs.confluent.io/3.0.0/streams/developer-guide.
html#streams-developer-guide-dsl>,
Post by Eno Thereska
where users can specify a *retention period* for the window. This allows
Kafka Streams to retain old window buckets for a period of time in
order to
Post by Eno Thereska
wait for the late arrival of records whose timestamps fall within the
window interval. If a record arrives after the retention period has
passed,
Post by Eno Thereska
the record cannot be processed and is dropped.
And I believe I can set retention period by using 'until'
TimeWindows.of(60000).until(60000)
After receiving a data from (00:06:00), I don't know why it still
continue
Post by Eno Thereska
receiving data from time of 00:00:00, what is 'until' supposed to do ?
Thanks
-Kohki
--
Kohki Nishio
Guozhang Wang
2017-02-27 04:39:07 UTC
Permalink
Raw Message
Kohki,

From your use case it seems that you'd like to have an "explicit" trigger
on when aggregate data can be forwarded to down stream.

But before we dig deep into that feature, I'm wondering for your monitoring
cases, do you want to completely ignore any future updates when a record
has been sent to down stream, i.e. alarming? I could understand that you
want the alarming to be stateless, but not sure if you'd like to avoid
getting any more data once its "state has been set".

What I'd propose for your case, is to remember the data that triggers the
alarm, with some topology like:


KTable table = stream.aggregate(/* windowed aggregation */);

table.tostream.foreach(/* if there is a record indicating anomaly, report
it to alarming system with the value with the reporting time */);


----

So if you have an update stream from "table" as:

At time t0: window1 @ high value (should trigger alarm)
At time t1: window1 @ normal value (should not trigger alarm)

Then your "table1" would contain the final updated results, while your
alarm system can just contain a subset of its changelogs that kept all the
alarmed records, and can be interpret as:

"window1 was abnormal as reported at t0" ..


Guozhang
Post by Kohki Nishio
I'm planning to do aggregation over metrics, and when 'flush' happens, it
emits an aggregation to the downstream (e.g. alarming)
Let say the first message saying some average number is very high and it
triggers an alarm and later on user comes to the system and checks the
number, it might have already been updated with normal value due to the
late update. This is quite difficult to manage, all of downstream need to
be stateful .. I'd like the alarming system to be stateless, I wonder how
this should be handled ..
-Kohki
Post by Matthias J. Sax
First, I want to mention that you do no see "duplicate" -- you see late
updates. Kafka Streams embraces "change" and there is no such thing as a
final aggregate, but each agg output record is an update/refinement of
the result.
Strict filtering of "late updates" is hard in Kafka Streams
If you want to have such filtering, you would need to use
aggregate(...).toStream().transform()
with an attached state for transform() to implement this filter
manually. The state holds all emitted record per key. If a records
arrives, you check if its in the state of not. If not, you add it to the
state and emit it. If yes, you just drop the record.
However, this will still not be perfect, because each time a commit is
triggered, the current window is flushed even if "stream time" did not
pass "window end" timestamp -- thus, the window is not completed yet.
Thus, you would also need to consider current "stream time" that you can
indirectly access via .punctuate(). Thus, for incomplete windows you
might want to filter those "intermediate results" and not add to the
store. This is hard to get right (I am even not sure if it is possible
at all to get right).
Even if this works however, this will only give you no duplicates (in
the strong sense of duplicate) as long as no error occurs. Kafka Streams
does not (yet) support exactly once processing and thus, in case of a
failure, you might get duplicate outputs.
I am not sure what kind of alerting you are doing, but you should
remember if you did raise an alert in some other way, and if an late
update (or real duplicate) occurs don't alert a second time.
Hope this helps.
-Matthias
Post by Eno Thereska
Hi Kohki,
Kafka streams windows use so called "segments" internally and their
retention time cannot be lower than some minimum. Your configuration is
set
Post by Matthias J. Sax
to less than this minimum, therefore is not accepted. Even Windows#until
Post by Eno Thereska
* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a
window will be maintained.
Post by Eno Thereska
For more info consider reading [this](https://github.com/
confluentinc/examples/issues/76) issue.
Post by Eno Thereska
Regards, Jozef
Sent from [ProtonMail](https://protonmail.ch), encrypted email based
in
Post by Matthias J. Sax
Switzerland.
Post by Eno Thereska
-------- Original Message --------
Subject: Re: Immutable Record with Kafka Stream
Local Time: February 24, 2017 7:11 PM
UTC Time: February 24, 2017 7:11 PM
Guozhang, thanks for the reply, but I'm having trouble understanding,
here's the statement from the document
Windowing operations are available in the Kafka Streams DSL
<http://docs.confluent.io/3.0.0/streams/developer-guide.
html#streams-developer-guide-dsl>,
Post by Eno Thereska
where users can specify a *retention period* for the window. This
allows
Post by Matthias J. Sax
Post by Eno Thereska
Kafka Streams to retain old window buckets for a period of time in
order to
Post by Eno Thereska
wait for the late arrival of records whose timestamps fall within the
window interval. If a record arrives after the retention period has
passed,
Post by Eno Thereska
the record cannot be processed and is dropped.
And I believe I can set retention period by using 'until'
TimeWindows.of(60000).until(60000)
After receiving a data from (00:06:00), I don't know why it still
continue
Post by Eno Thereska
receiving data from time of 00:00:00, what is 'until' supposed to do ?
Thanks
-Kohki
--
Kohki Nishio
--
-- Guozhang
Loading...