I'm planning to do aggregation over metrics, and when 'flush' happens, it
emits an aggregation to the downstream (e.g. alarming)
late update. This is quite difficult to manage, all of downstream need to
be stateful .. I'd like the alarming system to be stateless, I wonder how
this should be handled ..
Post by Matthias J. SaxFirst, I want to mention that you do no see "duplicate" -- you see late
updates. Kafka Streams embraces "change" and there is no such thing as a
final aggregate, but each agg output record is an update/refinement of
the result.
Strict filtering of "late updates" is hard in Kafka Streams
If you want to have such filtering, you would need to use
aggregate(...).toStream().transform()
with an attached state for transform() to implement this filter
manually. The state holds all emitted record per key. If a records
arrives, you check if its in the state of not. If not, you add it to the
state and emit it. If yes, you just drop the record.
However, this will still not be perfect, because each time a commit is
triggered, the current window is flushed even if "stream time" did not
pass "window end" timestamp -- thus, the window is not completed yet.
Thus, you would also need to consider current "stream time" that you can
indirectly access via .punctuate(). Thus, for incomplete windows you
might want to filter those "intermediate results" and not add to the
store. This is hard to get right (I am even not sure if it is possible
at all to get right).
Even if this works however, this will only give you no duplicates (in
the strong sense of duplicate) as long as no error occurs. Kafka Streams
does not (yet) support exactly once processing and thus, in case of a
failure, you might get duplicate outputs.
I am not sure what kind of alerting you are doing, but you should
remember if you did raise an alert in some other way, and if an late
update (or real duplicate) occurs don't alert a second time.
Hope this helps.
-Matthias
Post by Eno ThereskaHi Kohki,
Kafka streams windows use so called "segments" internally and their
retention time cannot be lower than some minimum. Your configuration is set
to less than this minimum, therefore is not accepted. Even Windows#until
Post by Eno Thereska* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a
window will be maintained.
Post by Eno ThereskaFor more info consider reading [this](https://github.com/
confluentinc/examples/issues/76) issue.
Post by Eno ThereskaRegards, Jozef
Sent from [ProtonMail](https://protonmail.ch), encrypted email based in
Switzerland.
Post by Eno Thereska-------- Original Message --------
Subject: Re: Immutable Record with Kafka Stream
Local Time: February 24, 2017 7:11 PM
UTC Time: February 24, 2017 7:11 PM
Guozhang, thanks for the reply, but I'm having trouble understanding,
here's the statement from the document
Windowing operations are available in the Kafka Streams DSL
<http://docs.confluent.io/3.0.0/streams/developer-guide.
html#streams-developer-guide-dsl>,
Post by Eno Thereskawhere users can specify a *retention period* for the window. This allows
Kafka Streams to retain old window buckets for a period of time in
order to
Post by Eno Thereskawait for the late arrival of records whose timestamps fall within the
window interval. If a record arrives after the retention period has
passed,
Post by Eno Thereskathe record cannot be processed and is dropped.
And I believe I can set retention period by using 'until'
TimeWindows.of(60000).until(60000)
After receiving a data from (00:06:00), I don't know why it still
continue
Post by Eno Thereskareceiving data from time of 00:00:00, what is 'until' supposed to do ?
Thanks
-Kohki