Kafka is a distributed streaming platform storing data in topics and providing scalablity with topic partitions. Even though Kafka is extensively used for connecting our microservices through publish/subscribe, we resort to another database for storing and querying the data.
Ktable & KStreams is an attempt by Confluent to bring stateful processing into Kafka.
I tried to experiment with Ktable with an application and found some nice features and some not so good ones. I try to explain in this post what I learnt from my experience.
Let me first introduce the key concepts.
Duality of Table and a Stream: Stream as a table, and a table as a stream. Stream can be considered changelog of a table. And table as a snapshot of stream containing the latest value by record key.
Locality of Data: Both Stream & Table in every application instance will be populated with data from only a subset of the partitions of the input topic. The data from the entire topic is available collectively across all application instances.
KStream & KTable
Kstream is thus an abstraction over the partitioned record stream and allowing stateless transformations like map, filter, etc.
KTable is a partition read into memory serving as an embedded database. The default store used for Ktable is RocksDB. RocksDB mainly works in memory but may flush to disk as well. KTable provides fault tolerance by recreating the table from the changelog topic(duality between the stream-changelog_topic and the table).
Interactive Queries: KTable data can be queried via Interactive Queries. However, due to the locality, we can only query the subscribed partitions’ data. Even though there are ways to get data from across instances (RPC to query remote store or a GlobalKTable), I have heard from others’ experience that it was not very optimal.
Considering the locality of the queries, I tried to implement a dead letter retry mechanism. Every retry instance is concerned only with its own data. This also lets us scale by merely adding instances without worrying about the locks and transactions on a centralized db.
Retry Application using Ktable
Let dead-letters
be a Kafka topic that contains all the failed messages. retry-topic
be the topic that contains messages to be retried now. We now need to connect these two topics that adds messages from dead-letters into retry with some backoff strategy.
There are multiple ways to implement the retry mechanism. We could have enumerated retry topics that retry at 5, 10, 15, etc. However, if we like some custom logic or retry them out of order then we would need some state to store them. Let’s try the KTable here.
1. Consume messages from dead-letters topic
val appName = "retry-example"
val bootstrapServers = "localhost:9092"
val deadletterTopic = "dead-letters-topic"
val retryTopic = "retry-topic"
val builder = new StreamsBuilder
val dlqStream: KStream[JLong, String] = builder.stream(deadletterTopic, Consumed.`with`[JLong, String]
(Serdes.Long(), Serdes.String()))
val retryStream: KStream[JLong, String] = dlqStream.map {
(key: JLong, value: String) =>
val now = Instant.now.getEpochSecond
val optMsg = decode[Message](value)
val msg = optMsg.toOption.get
val retryAttempt = msg.retryAttempt
val newMessage = msg.copy(retryAttempt = retryAttempt + 1)
KeyValue.pair(Custom.nextRetryTs(now, retryAttempt), newMessage.asJson.noSpaces)
}
Since rocksdb organizes data in sortedorder, we can store the messages to be tried with the timestamp to retry as their key enabling range lookups.
2. Persist the messages to the internal store to be retried with a backoff strategy
val storeDir = "/tmp/retry-state/"
val storeName = "retry-message-store"
val storeBuilder = new RocksDbKeyValueBytesStoreSupplier(storeName)
// Performing a dummy aggregation
retryStream.groupByKey.reduce(
new Reducer[String] {
def apply(oldValue: String, newValue: String) = newValue
},
Materialized.as[JLong, String](storeBuilder))
Now, that we have persisted the messages, we need to publish these messages to the retry-topic
after the retry timestamp has elapsed.
3. A scheduled actor to replay messages from this internal store
class RetryActor(kvStore: ReadOnlyKeyValueStore[JLong, String], retryTopic: String, producerProperties: Properties) extends Actor {
val producer = new KafkaProducer[Array[Byte], String](producerProperties)
val batchSize = 1000
var lastRetry: JLong = 0L
override def receive: Receive = {
case Tick =>
val currentTs = TimeOrderedIdGen.prefix(Instant.now)
val msgs = kvStore.range(lastRetry+1, currentTs).asScala.take(batchSize).toList
msgs.map(kv => new ProducerRecord[Array[Byte], String](retryTopic, kv.value)).foreach(producer.send)
lastRetry = if(msgs.nonEmpty) {
msgs.last.key
} else {
lastRetry
}
}
}
object RetryMessages {
case object Tick
}
4. Get the stream up and running
val streams = new KafkaStreams(builder.build(), streamConfig)
streams.start()
5. Schedule the RetryActor
val keyValueStore: ReadOnlyKeyValueStore[JLong, String] =
waitUntilStoreIsQueryable(storeName, QueryableStoreTypes.keyValueStore[JLong, String], streams)
val system = ActorSystem(appName)
val retryActor = system.actorOf(
Props.create(classOf[RetryActor], retryTopic, keyValueStore, producerProperties),
name = "retry-actor")
system.scheduler.schedule(0 seconds, 5 minutes, retryActor, Tick)
the waitUntilStoreIsQueryable
waits for the store to be available.
We now have a distributed, in-memory dlq retry mechanism. Yabba Dabba Doo!!
However, there are some aspects where they cant provided like other traditional databases. The part that I was not very happy about was the clean up of data.
Our retry loop is complete, we would like to clear this table after we process the message. The kafka mechanism that we can think of is retention
. However, Ktable has an infinite retention meaning clients have to manage the deletion themselves. Okay, no problem. Lets delete the messages when we push it into the retry topic. Unfortunately we can’t, since all we can get is a readOnly pointer to the state-store. The only way to delete messages from KTable, is to push a record in the same input topic with the same key and with value set as null.
Rocksdb supports a deleteRange
, but we cant have it here because Kafka records dont have a DeleteRecord
kind of implementation. So, that means for every key we retry, we need to publish it back to the dead-letter topic for delete.
Adding this delete workflow,
6. Publish DeleteMessages
msgs.map(kv =>
new ProducerRecord[JLong, String](deadLetterTopic, kv.key, null)
).foreach(deadLetterProducer.send)
7. Consume the delete messages from the input topic
val retryStream: KStream[JLong, String] = dlqStream.map {
(key: JLong, value: String) =>
val now = Instant.now.getEpochSecond
if(value == null) {
KeyValue.pair(key, null)
} else {
... //Similar to previous code
}
}
KTables are a great concept in that they bring the distribution, fault-tolerance and scalable architecture of Kafka while also making it queryable. The kind of problem that they can solve is quite different from the others. It is relatively young and thus we can expect more features in the very near future.