Lago el Bolsón in Popayán, Colombia, my home town :)

Lago el Bolsón in Popayán, Colombia, my home town :)

In this post I show a method to address the computation of the arithmetic mean of an stream of values (say sensor readings) using the Kafka Streams DSL. The estimation of the average in a stream processing setting implies keeping track of other two measurements, namely the count of incoming records and the sum of their corresponding values. Let’s consider a Kafka streams application consuming messages from a topic to which the readings of multiple temperature sensors are being posted (temperature-readings). The messages from said topic are keyed by the sensor ID, and we want to compute the rolling average of the temperature sensed by each device.

Let’s first create a KGroupedStream to group the sensor readings according to their corresponding sensor ID:

public class KafkaStreamsAggregator {
// ...
public static void main(String[] args) throws Exception {
// ...
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> sourceStream = builder.stream("temperature-readings",
Consumed.with(Serdes.String(), Serdes.Double()));
KGroupedStream<String, Double> perSensorStream = sourceStream.groupByKey();
// ...
}
}

Now we can use the KGroupedStream::aggregate method to compute the rolling average on the perSensorStream we got above. But before this, according to the Kafka documentation this method requires an Initializer and an Aggregator as arguments:

The specified Initializer is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. The specified Aggregator is applied for each input record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record’s value. Thus, aggregate(Initializer, Aggregator) can be used to compute aggregate functions like count (c.f. count()).

Let’s create a POJO to provide such initial aggregation and to hold the intermediate aggregation values:

public class AggregateTuple {
public String sensorId;
public Long count;
public Double sum;
public Double avg;
public AggregateTuple() {
}
public AggregateTuple(String sensorId, Long count, Double sum, Double avg) {
this.sensorId = sensorId;
this.count = count;
this.sum = sum;
this.avg = avg;
}
}

Now we need to provide an implementation of the apply method for the Aggregator argument, which would be in charge of computing a new aggregate from the key and value of an incoming record and the current aggregate of the same key:

public class KafkaStreamsAggregator {
public static AggregateTuple temperatureAggregator(String key, Double value, AggregateTuple aggregate) {
aggregate.sensorId = key;
aggregate.count = aggregate.count + 1; // increment by 1 the current record count
aggregate.sum = aggregate.sum + value; // add the incoming value to the current sum
aggregate.avg = aggregate.sum / aggregate.count; // update the average
return aggregate;
}
public static void main(String[] args) throws Exception {
// ...
}
}

We can finally call the aggregate method on the perSensorStream:

public class KafkaStreamsAggregator {
// ...
public static void main(String[] args) throws Exception {
// ...
KTable<String, AggregateTuple> perSensorAggregate = perSensorStream.aggregate(
() -> new AggregateTuple("", 0L, 0.0, 0.0), // Lambda expression for the Initializer
(key, value, aggregate) -> temperatureAggregator(key, value, aggregate), // Lambda expression for the Aggregator
// we can optionally materialize the resulting KTable:
Materialized.<String, AggregateTuple, KeyValueStore<Bytes, byte[]>>as("temperature-state-store").withValueSerde(aggregateTupleSerde)
);
// ...
}
}

Below you can find the relevant code for the aggregation method outlined in this post.

import org.apache.kafka.streams.*;
import util.serdes.JsonPOJODeserializer;
import util.serdes.JsonPOJOSerializer;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.*;
import java.util.concurrent.CountDownLatch;
public class AggregateTuple {
public String sensorId;
public Long count;
public Double sum;
public Double avg;
public AggregateTuple() {
}
public AggregateTuple(String sensorId, Long count, Double sum, Double avg) {
this.sensorId = sensorId;
this.count = count;
this.sum = sum;
this.avg = avg;
}
}
public class KafkaStreamsAggregator {
public static AggregateTuple temperatureAggregator(String key, Double value, AggregateTuple aggregate) {
aggregate.sensorId = key;
aggregate.count = aggregate.count + 1; // increment by 1 the current record count
aggregate.sum = aggregate.sum + value; // add the incoming value to the current sum
aggregate.avg = aggregate.sum / aggregate.count; // update the average
return aggregate;
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
final StreamsBuilder builder = new StreamsBuilder();
// Set up Serializers and Deserializers
Map<String, Object> serdeProps = new HashMap<>();
final Serializer<AggregateTuple> aggSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", AggregateTuple.class);
aggSerializer.configure(serdeProps, false);
final Deserializer<AggregateTuple> aggDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", AggregateTuple.class);
aggDeserializer.configure(serdeProps, false);
final Serde<AggregateTuple> aggregateTupleSerde = Serdes.serdeFrom(aggSerializer, aggDeserializer);
// Set streaming topology and transformations
KStream<String, Double> sourceStream = builder.stream("temperature-readings",
Consumed.with(Serdes.String(), Serdes.Double()));
KGroupedStream<String, Double> perSensorStream = sourceStream.groupByKey();
KTable<String, AggregateTuple> perSensorAggregate = perSensorStream.aggregate(
() -> new AggregateTuple("", 0L, 0.0, 0.0), // Lambda expression for the Initializer
(key, value, aggregate) -> temperatureAggregator(key, value, aggregate), // Lambda expression for the Aggregator
// we can optionally materialize the resulting KTable:
Materialized.<String, AggregateTuple, KeyValueStore<Bytes, byte[]>>as("temperature-state-store").withValueSerde(aggregateTupleSerde)
);
Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
try {
streams.close();
} catch (Throwable e) {
System.exit(1);
}
latch.countDown();
}
});
latch.await();
System.exit(0);
}
}

I didn’t include the code for handling serialization in this post, but you can find it here: JsonPOJOSerializer and JsonPOJODeserializer.