
Lago el Bolsón in Popayán, Colombia, my home town :)
In this post I show a method to address the computation of the arithmetic mean of an stream of values (say sensor readings) using the Kafka Streams DSL. The estimation of the average in a stream processing setting implies keeping track of other two measurements, namely the count of incoming records and the sum of their corresponding values. Let’s consider a Kafka streams application consuming messages from a topic to which the readings of multiple temperature sensors are being posted (temperature-readings
). The messages from said topic are keyed by the sensor ID, and we want to compute the rolling average of the temperature sensed by each device.
Let’s first create a KGroupedStream
to group the sensor readings according to their corresponding sensor ID:
public class KafkaStreamsAggregator { | |
// ... | |
public static void main(String[] args) throws Exception { | |
// ... | |
final StreamsBuilder builder = new StreamsBuilder(); | |
KStream<String, Double> sourceStream = builder.stream("temperature-readings", | |
Consumed.with(Serdes.String(), Serdes.Double())); | |
KGroupedStream<String, Double> perSensorStream = sourceStream.groupByKey(); | |
// ... | |
} | |
} |
Now we can use the KGroupedStream::aggregate
method to compute the rolling average on the perSensorStream
we got above. But before this, according to the Kafka documentation this method requires an Initializer
and an Aggregator
as arguments:
The specified
Initializer
is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. The specifiedAggregator
is applied for each input record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record’s value. Thus, aggregate(Initializer, Aggregator) can be used to compute aggregate functions like count (c.f. count()).
Let’s create a POJO to provide such initial aggregation and to hold the intermediate aggregation values:
public class AggregateTuple { | |
public String sensorId; | |
public Long count; | |
public Double sum; | |
public Double avg; | |
public AggregateTuple() { | |
} | |
public AggregateTuple(String sensorId, Long count, Double sum, Double avg) { | |
this.sensorId = sensorId; | |
this.count = count; | |
this.sum = sum; | |
this.avg = avg; | |
} | |
} |
Now we need to provide an implementation of the apply
method for the Aggregator
argument, which would be in charge of computing a new aggregate from the key
and value
of an incoming record and the current aggregate
of the same key:
public class KafkaStreamsAggregator { | |
public static AggregateTuple temperatureAggregator(String key, Double value, AggregateTuple aggregate) { | |
aggregate.sensorId = key; | |
aggregate.count = aggregate.count + 1; // increment by 1 the current record count | |
aggregate.sum = aggregate.sum + value; // add the incoming value to the current sum | |
aggregate.avg = aggregate.sum / aggregate.count; // update the average | |
return aggregate; | |
} | |
public static void main(String[] args) throws Exception { | |
// ... | |
} | |
} |
We can finally call the aggregate
method on the perSensorStream
:
public class KafkaStreamsAggregator { | |
// ... | |
public static void main(String[] args) throws Exception { | |
// ... | |
KTable<String, AggregateTuple> perSensorAggregate = perSensorStream.aggregate( | |
() -> new AggregateTuple("", 0L, 0.0, 0.0), // Lambda expression for the Initializer | |
(key, value, aggregate) -> temperatureAggregator(key, value, aggregate), // Lambda expression for the Aggregator | |
// we can optionally materialize the resulting KTable: | |
Materialized.<String, AggregateTuple, KeyValueStore<Bytes, byte[]>>as("temperature-state-store").withValueSerde(aggregateTupleSerde) | |
); | |
// ... | |
} | |
} |
Below you can find the relevant code for the aggregation method outlined in this post.
import org.apache.kafka.streams.*; | |
import util.serdes.JsonPOJODeserializer; | |
import util.serdes.JsonPOJOSerializer; | |
import org.apache.commons.cli.*; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.Serde; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.common.utils.Bytes; | |
import org.apache.kafka.streams.kstream.*; | |
import org.apache.kafka.streams.state.KeyValueStore; | |
import java.util.*; | |
import java.util.concurrent.CountDownLatch; | |
public class AggregateTuple { | |
public String sensorId; | |
public Long count; | |
public Double sum; | |
public Double avg; | |
public AggregateTuple() { | |
} | |
public AggregateTuple(String sensorId, Long count, Double sum, Double avg) { | |
this.sensorId = sensorId; | |
this.count = count; | |
this.sum = sum; | |
this.avg = avg; | |
} | |
} | |
public class KafkaStreamsAggregator { | |
public static AggregateTuple temperatureAggregator(String key, Double value, AggregateTuple aggregate) { | |
aggregate.sensorId = key; | |
aggregate.count = aggregate.count + 1; // increment by 1 the current record count | |
aggregate.sum = aggregate.sum + value; // add the incoming value to the current sum | |
aggregate.avg = aggregate.sum / aggregate.count; // update the average | |
return aggregate; | |
} | |
public static void main(String[] args) throws Exception { | |
Properties props = new Properties(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature-aggregator"); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
return props; | |
final StreamsBuilder builder = new StreamsBuilder(); | |
// Set up Serializers and Deserializers | |
Map<String, Object> serdeProps = new HashMap<>(); | |
final Serializer<AggregateTuple> aggSerializer = new JsonPOJOSerializer<>(); | |
serdeProps.put("JsonPOJOClass", AggregateTuple.class); | |
aggSerializer.configure(serdeProps, false); | |
final Deserializer<AggregateTuple> aggDeserializer = new JsonPOJODeserializer<>(); | |
serdeProps.put("JsonPOJOClass", AggregateTuple.class); | |
aggDeserializer.configure(serdeProps, false); | |
final Serde<AggregateTuple> aggregateTupleSerde = Serdes.serdeFrom(aggSerializer, aggDeserializer); | |
// Set streaming topology and transformations | |
KStream<String, Double> sourceStream = builder.stream("temperature-readings", | |
Consumed.with(Serdes.String(), Serdes.Double())); | |
KGroupedStream<String, Double> perSensorStream = sourceStream.groupByKey(); | |
KTable<String, AggregateTuple> perSensorAggregate = perSensorStream.aggregate( | |
() -> new AggregateTuple("", 0L, 0.0, 0.0), // Lambda expression for the Initializer | |
(key, value, aggregate) -> temperatureAggregator(key, value, aggregate), // Lambda expression for the Aggregator | |
// we can optionally materialize the resulting KTable: | |
Materialized.<String, AggregateTuple, KeyValueStore<Bytes, byte[]>>as("temperature-state-store").withValueSerde(aggregateTupleSerde) | |
); | |
Topology topology = builder.build(); | |
final KafkaStreams streams = new KafkaStreams(topology, props); | |
streams.start(); | |
final CountDownLatch latch = new CountDownLatch(1); | |
// attach shutdown handler to catch control-c | |
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { | |
@Override | |
public void run() { | |
try { | |
streams.close(); | |
} catch (Throwable e) { | |
System.exit(1); | |
} | |
latch.countDown(); | |
} | |
}); | |
latch.await(); | |
System.exit(0); | |
} | |
} |
I didn’t include the code for handling serialization in this post, but you can find it here: JsonPOJOSerializer and JsonPOJODeserializer.