Posts Implementing a Health Check Mechanism for Kafka
Post
Cancel

Implementing a Health Check Mechanism for Kafka

Kafka is a well known distributed streaming platform that stores data in topics and provides scalability with topic partitions.

At Veon, we use Kafka extensively to publish and consume events from across different microservices. As almost all our microservices emit/consume events to/from Kafka, we wanted to build a Healthcheck mechanism for checking connectivity to Kafka.

Some key concepts about Kafka used in the health-check
Partition: Kafka provides parallelism by storing its data across multiple partitions.
Consumer Group: A subscriber to a particular topic.
Consumers in a Consumer Group: Partitions of a topic are balanced between the consumers of a consumer group. This enables parallel consumption of data from the topic. The number of consumers can scale horizontally upto the number of partitions.
Offset: Each message in a partition is indexed with an identifier called an offset. Kafka stores the last committed offset for each partition for every consumer group. This offset determines where to start processing in case an instance(consumer) fails and restarts(or rebalanced).

Some important configuration parameters for the health-check
offset.retention.minutes : The time for which kafka remembers the offset for a partition. Defaults to 24 hours.
auto.offset.reset : In case a new consumer-group comes in or offset.retention.minutes has elapsed, there needs to be a way to start processing messages. Two options are provided either start processing from the start(earliest) or at the end(latest). Defaults to earliest to not miss processing data.

Building a health-check mechanism

At Veon, all application instances run on docker containers in swarm mode.
Docker health-check can be thought of as just a simple curl call to the /health endpoint inside the container.

1
HEALTHCHECK CMD curl --fail http://localhost/health     

On this /health a variety of HealthChecks can be plugged as required. One of them will be the KafkaHealthCheck.

Kafka health-check

There are multiple ways to design a health check for Kafka. The simplest that we decided to implement was to produce a message to Kafka and consume it within a threshold time.

1. A Singleton KafkaHealthCheck Object

>object KafkaHealthCheck { def apply(checkName: String = "kafka-health-check")(implicit system: ActorSystem): StatusCheck[Future] = { start(checkName, HealthCheckConfig.apply) } }

2. Configuring our healthchecks

>case class HealthCheckConfig( tickTime: FiniteDuration, failureFactor: Double, serviceName: String, identifier: String )

What do each these healthcheck Config mean?

tickTime - How often to produce a message to check the healthiness
failureFactor- How many ticks should we wait before the alarm triggers.
serviceName - For logging purposes.
Identifier - Instance identifier which is also used for the Consumer-Group and needs to be unique among the active offsets remembered.

3. Health Check Message(Ping) Producer

>def runHealthCheckProducer( producerSettings: ProducerSettings[String, String], healthCheckConfig: HealthCheckConfig )(implicit clock: Clock, materializer: Materializer): Unit = { Source .tick( healthCheckConfig.tickTime, healthCheckConfig.tickTime, () ) .map(_ => HealthCheckPing(healthCheckConfig.serviceName, healthCheckConfig.identifier, clock.instant())) .map(HealthCheckPing.toJson) .via(debugLog(msg => s"Sending health check $msg to kafka")) .map(new ProducerRecord[String, String](topic, _)) .to(Producer.plainSink(producerSettings)) .run() }

This produces a HealthCheckPing every tick seconds into the health-check topic. The Ping itself encapsulates Who and When of this message.

4. Health Check Ping Consumer

> private def runHealthCheckConsumer( consumerSettings: ConsumerSettings[String, String], healthCheckConfig: HealthCheckConfig )(onCheckReceive: HealthCheckPing => Unit)(implicit system: ActorSystem, materializer: Materializer): Unit = { committableSource(consumerSettings, Subscriptions.topics(topic)) .alsoTo(kafkaCommitSink) .map(msg => HealthCheckPing.fromJson(msg.record.value())) .collect { case Some(msg: HealthCheckPing) if msg.isMine(healthCheckConfig) => msg } .via(debugLog(msg => s"Received health check from kafka $msg")) .toMat(Sink.foreach(onCheckReceive))(Keep.left) .run() }

The consumer subscribes to the health-check topic and filters only messages that it produced (based on the identifier encapsulated in the Ping). There is a callback(onCheckReceive) invoked when a successful Ping was received.

5. The HealthCheck Ping

>private[healthcheck] case class HealthCheckPing(serviceName: String, identifier: String, timestamp: Instant) { def isMine(config: HealthCheckConfig): Boolean = serviceName == config.serviceName && identifier == config.identifier }

6. Connecting the pieces together

We use a simple actor with state to hold the time in which a last successful produced message was received.

>def start( checkName: String, healthCheckConfig: HealthCheckConfig )(implicit system: ActorSystem, clock: Clock = Clock.systemUTC()): StatusCheck[Future] = { implicit val mat: ActorMaterializer = materializer import system.dispatcher val producerConsumerSettings = ProducerConsumerSettings(healthCheckConfig) var lastHealthCheckAt = clock.instant() info(s"Starting kafka health check with config $healthCheckConfig") runHealthCheckProducer(producerConsumerSettings.producerSettings, healthCheckConfig) runHealthCheckConsumer(producerConsumerSettings.consumerSettings, healthCheckConfig)( message => lastHealthCheckAt = message.timestamp ) }

The callback on a successful ping reception updates the lastHealthCheckAt state.

7. The Health-check check

>check[Future](checkName) { Future.successful { val failAfter = healthCheckConfig.tickTime * healthCheckConfig.failureFactor if (lastHealthCheckAt.isAfter(clock.instant().minus(failAfter.toSeconds, ChronoUnit.SECONDS))) Ok else Ko(s"Last health check received at $lastHealthCheckAt and it's older than $failAfter") } }

The check call is invoked when the /health endpoint is invoked by Docker. healthCheckConfig.tickTime * healthCheckConfig.failureFactor determines the threshold to wait. If the last successful healthcheck was received before this threshold, then it returns failure(KO) else everything is OK.

Some points to remember while setting up the healthcheck. It is necessary to set the auto.offset.reset=latest for this topic. Since every new instance is its own consumer-group, kafka resorts to the default earliest strategy and reads all messages from start. This can cause the threshold to elaspse, triggering Docker swarm to replace unhealthy containers. This then repeats the process of reading the message from start by the new containers and elapsing the threshold. And the cycle of failures & restarts goes on indefinitely.

As our services were running over docker-swarm, we decided to use the container-id provided by docker as identifer. [Note: We saw that offset.retention.minutes defaults to 24 hours]. Till this 24 hours, the consumer-group id should not repeat else, it might start receiving older(some other) messages and might appear unhealthy causing the indefinite restarts.

That’s all Folks! This was a simple health check on Kafka added at Veon.

Updated Mar 25, 2020 2020-03-25T09:51:27+00:00
This post is licensed under CC BY 4.0