Posts Stateful processing with Kafka
Post
Cancel

Stateful processing with Kafka

Kafka is a distributed streaming platform storing data in topics and providing scalablity with topic partitions. Even though Kafka is extensively used for connecting our microservices through publish/subscribe, we resort to another database for storing and querying the data.

Ktable & KStreams is an attempt by Confluent to bring stateful processing into Kafka.

I tried to experiment with Ktable with an application and found some nice features and some not so good ones. I try to explain in this post what I learnt from my experience.

Let me first introduce the key concepts.

Duality of Table and a Stream: Stream as a table, and a table as a stream. Stream can be considered changelog of a table. And table as a snapshot of stream containing the latest value by record key.

Locality of Data: Both Stream & Table in every application instance will be populated with data from only a subset of the partitions of the input topic. The data from the entire topic is available collectively across all application instances.

KStream & KTable

Kstream is thus an abstraction over the partitioned record stream and allowing stateless transformations like map, filter, etc.
KTable is a partition read into memory serving as an embedded database. The default store used for Ktable is RocksDB. RocksDB mainly works in memory but may flush to disk as well. KTable provides fault tolerance by recreating the table from the changelog topic(duality between the stream-changelog_topic and the table).

Interactive Queries: KTable data can be queried via Interactive Queries. However, due to the locality, we can only query the subscribed partitions’ data. Even though there are ways to get data from across instances (RPC to query remote store or a GlobalKTable), I have heard from others’ experience that it was not very optimal.

Considering the locality of the queries, I tried to implement a dead letter retry mechanism. Every retry instance is concerned only with its own data. This also lets us scale by merely adding instances without worrying about the locks and transactions on a centralized db.

Retry Application using Ktable

Let dead-letters be a Kafka topic that contains all the failed messages. retry-topic be the topic that contains messages to be retried now. We now need to connect these two topics that adds messages from dead-letters into retry with some backoff strategy.

There are multiple ways to implement the retry mechanism. We could have enumerated retry topics that retry at 5, 10, 15, etc. However, if we like some custom logic or retry them out of order then we would need some state to store them. Let’s try the KTable here.

1. Consume messages from dead-letters topic

  
val appName = "retry-example"
val bootstrapServers = "localhost:9092"
val deadletterTopic = "dead-letters-topic"
val retryTopic = "retry-topic"

val builder = new StreamsBuilder

val dlqStream: KStream[JLong, String] = builder.stream(deadletterTopic, Consumed.`with`[JLong, String]
    (Serdes.Long(), Serdes.String()))

val retryStream: KStream[JLong, String] = dlqStream.map {
    (key: JLong, value: String) =>
    val now = Instant.now.getEpochSecond
    val optMsg = decode[Message](value)

    val msg = optMsg.toOption.get
    val retryAttempt = msg.retryAttempt
    val newMessage = msg.copy(retryAttempt = retryAttempt + 1)

    KeyValue.pair(Custom.nextRetryTs(now, retryAttempt), newMessage.asJson.noSpaces)
}

Since rocksdb organizes data in sortedorder, we can store the messages to be tried with the timestamp to retry as their key enabling range lookups.

2. Persist the messages to the internal store to be retried with a backoff strategy

   
val storeDir = "/tmp/retry-state/"
val storeName = "retry-message-store"
val storeBuilder = new RocksDbKeyValueBytesStoreSupplier(storeName)

// Performing a dummy aggregation
retryStream.groupByKey.reduce(
    new Reducer[String] {
        def apply(oldValue: String, newValue: String) = newValue
    },
    Materialized.as[JLong, String](storeBuilder))

Now, that we have persisted the messages, we need to publish these messages to the retry-topic after the retry timestamp has elapsed.

3. A scheduled actor to replay messages from this internal store

    
class RetryActor(kvStore: ReadOnlyKeyValueStore[JLong, String], retryTopic: String, producerProperties: Properties) extends Actor {

  val producer = new KafkaProducer[Array[Byte], String](producerProperties)
  val batchSize = 1000

  var lastRetry: JLong = 0L

  override def receive: Receive = {
    case Tick =>
      val currentTs = TimeOrderedIdGen.prefix(Instant.now)

      val msgs = kvStore.range(lastRetry+1, currentTs).asScala.take(batchSize).toList
      msgs.map(kv => new ProducerRecord[Array[Byte], String](retryTopic, kv.value)).foreach(producer.send)

      lastRetry = if(msgs.nonEmpty) {
        msgs.last.key
      } else {
        lastRetry
      }
  }
}

object RetryMessages {
  case object Tick
}

4. Get the stream up and running

  
val streams = new KafkaStreams(builder.build(), streamConfig)
streams.start()

5. Schedule the RetryActor

val keyValueStore: ReadOnlyKeyValueStore[JLong, String] =
    waitUntilStoreIsQueryable(storeName, QueryableStoreTypes.keyValueStore[JLong, String], streams)
val system = ActorSystem(appName)

val retryActor = system.actorOf(
    Props.create(classOf[RetryActor], retryTopic, keyValueStore, producerProperties),
    name = "retry-actor")

system.scheduler.schedule(0 seconds, 5 minutes, retryActor, Tick)

the waitUntilStoreIsQueryable waits for the store to be available.

We now have a distributed, in-memory dlq retry mechanism. Yabba Dabba Doo!!

However, there are some aspects where they cant provided like other traditional databases. The part that I was not very happy about was the clean up of data.

Our retry loop is complete, we would like to clear this table after we process the message. The kafka mechanism that we can think of is retention. However, Ktable has an infinite retention meaning clients have to manage the deletion themselves. Okay, no problem. Lets delete the messages when we push it into the retry topic. Unfortunately we can’t, since all we can get is a readOnly pointer to the state-store. The only way to delete messages from KTable, is to push a record in the same input topic with the same key and with value set as null.

Rocksdb supports a deleteRange, but we cant have it here because Kafka records dont have a DeleteRecord kind of implementation. So, that means for every key we retry, we need to publish it back to the dead-letter topic for delete.

Adding this delete workflow,

6. Publish DeleteMessages

     
msgs.map(kv => 
    new ProducerRecord[JLong, String](deadLetterTopic, kv.key, null)
).foreach(deadLetterProducer.send)

7. Consume the delete messages from the input topic

     
val retryStream: KStream[JLong, String] = dlqStream.map {
    (key: JLong, value: String) =>
    val now = Instant.now.getEpochSecond

    if(value == null) {
        KeyValue.pair(key, null)
    } else {
        ... //Similar to previous code
    }
}

KTables are a great concept in that they bring the distribution, fault-tolerance and scalable architecture of Kafka while also making it queryable. The kind of problem that they can solve is quite different from the others. It is relatively young and thus we can expect more features in the very near future.

Updated Mar 25, 2020 2020-03-25T09:51:27+00:00
This post is licensed under CC BY 4.0