In this post I show a method to address the computation of the arithmetic mean of an stream of values (say sensor readings) using the Kafka Streams DSL. The estimation of the average in a stream processing setting implies keeping track of other two measurements, namely the count of incoming records and the sum of their corresponding values. Let’s consider a Kafka streams application consuming messages from a topic to which the readings of multiple temperature sensors are being posted (temperature-readings
). The messages from said topic are keyed by the sensor ID, and we want to compute the rolling average of the temperature sensed by each device.
Let’s first create a KGroupedStream
to group the sensor readings according to their corresponding sensor ID:
Now we can use the KGroupedStream::aggregate
method to compute the rolling average on the perSensorStream
we got above. But before this, according to the Kafka documentation this method requires an Initializer
and an Aggregator
as arguments:
The specified
Initializer
is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. The specifiedAggregator
is applied for each input record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record’s value. Thus, aggregate(Initializer, Aggregator) can be used to compute aggregate functions like count (c.f. count()).
Let’s create a POJO to provide such initial aggregation and to hold the intermediate aggregation values:
Now we need to provide an implementation of the apply
method for the Aggregator
argument, which would be in charge of computing a new aggregate from the key
and value
of an incoming record and the current aggregate
of the same key:
We can finally call the aggregate
method on the perSensorStream
:
Below you can find the relevant code for the aggregation method outlined in this post.
I didn’t include the code for handling serialization in this post, but you can find it here: JsonPOJOSerializer and JsonPOJODeserializer.