Kafka 101 – how does it achieve high throughput?

Messaging queues – like Apache Kafka, Apache ActiveMQ, and RabbitMQ, – are used in various scenarios to address challenges related to communication, coordination, and data flow between different components of a system.

Problems without Messaging Queues

A typical organization has multiple sources of data with disparate data formats. Data integration involves combining data from these multiple sources into one unified view of the business. Each integration comes with difficulties around:

  • Protocol – how the data is transported (TCP, HTTP, REST, FTP, JDBC…)
  • Data format – how the data is parsed (Binary, CSV, JSON, Avro…)
  • Data schema & evolution – how the data is shaped and may change
Data Integration without kafka

Consider an e-commerce platform that receives a high volume of orders from customers. The system is composed of several components, including a website for order placement, a payment processing service, an inventory management system, and a shipping service.

The problems that are usually faced without a messaging queue in this setup would be:

  1. Order Congestion:
    • Without a messaging queue, the order placement component might directly communicate with the payment processing, inventory management, and shipping services. As the number of orders increases, this direct communication can lead to congestion, delays, and potential system failures.
  2. Scalability Issues:
    • Scaling individual components independently becomes challenging. For instance, if the payment processing service experiences a surge in requests, scaling it without affecting other services can be complex.
  3. Fault Tolerance:
    • In a direct communication model, if one component fails (e.g., the inventory management service), it can impact the entire order processing pipeline, resulting in lost orders or errors.

A messaging Queue can address these issues efficiently by decoupling the services, something like:

Data Integration with Kafka

Now let’s dig deep and see how it achieves such high throughput & resiliency. There are many messaging queues products in the market but we’ll keep our focus limited to Kafka in this blog.

Before that, let’s build(or revise if you’re already familiar) some concepts that will help understand everything better.

What is Kafka?

Apache Kafka is an open-sourcedistributedevent streaming platform used for building real-time data pipelines and streaming applications.

Kafka is designed to handle large volumes of data in a scalable and fault-tolerant manner, making it ideal for use cases such as real-time analytics, data ingestion, and event-driven architectures.

At its core, Kafka is a distributed publish-subscribe messaging system.

Components of Kafka

Have a look at this somewhat complex architecture:

Kafka components

No worries, we’ll break it down together:

  • Record
    • a record in Kafka is a self-contained unit of data that flows through the Kafka system, from producers to consumers. A record is immutable.
  • Topic
    • is a category or feed name to which records are published by producers.
    • Topics act as channels through which data is organized and distributed.
  • Producer
    • responsible for publishing (producing) records to Kafka Topic
  • Consumer
    • subscribe to Kafka topics and process the records published to those topics.
    • Kafka supports parallel consumption across multiple consumer instances.
  • Broker
    • is a Kafka server that stores data and serves client requests.
  • Kafka Cluster
    • Kafka operates as a cluster of brokers(servers) to provide fault tolerance and scalability.
  • Partition
    • Each topic is divided into partitions, which are ordered and immutable sequences of records.
    • this is done to reduce the load on one single server. (Kind of like sharding – keys A-M on Partition0, N-Z on Partition1)
  • Offset
    • a unique identifier(increasing number starting with 0) assigned to each record within a partition.
    • Offsets allow consumers to keep track of their position in a partition.
    • Consumers can commit offsets to Kafka to record their progress, enabling reliable and at-least-once message delivery.
  • Consumer Group
    • A Consumer Group comprises of multiple consumers.
    • This enables parallel processing and load balancing.
    • A Consumer Group is a subscriber of a kafka topic. A record published is consumed by all consumer groups.
    • Among the consumers in a consumer group, only one consumer will consume a record.
    • Each partition within a topic is consumed by only one consumer within a group, but multiple partitions can be processed simultaneously by different consumers in the group. Huh? In simple terms – One partition is consumed by One consumer, but One consumer can consume from many partitions.
Kafka partitions and consumers
  • Zookeeper
    • Kafka relies on Apache ZooKeeper for distributed coordination and management of its cluster.

Features of Kafka

  • Distributed
    • Being distributed in nature helps achieve scalability via horizontal scaling, high availability by replication of data, high throughput via parallel processing, geographic distribution of servers etc.
  • Fault Tolerant
    • replicating data across multiple nodes, ensuring data durability and availability.
  • Scalability
    • can scale horizontally by adding more broker nodes to the cluster
  • High Throughput
    • It can handle a very high volume of data very efficiently. We’ll dive more into this in a later section.
  • Durability
    • Data in Kafka is persisted to disk, providing durability even in the case of node failures.
  • Exactly-Once Semantics
    • supports exactly-once semantics for message delivery, ensuring that messages are processed and delivered exactly once.

Why is Kafka so fast?

Fast here means – High Throughput, not low-latency.

The Design choices opted by Kafka are what contribute to its high throughput.

  1. Distributed Architecture
    • It can scale horizontally and distribute the load.
  2. Partitioning
    • data in a topic is partitioned across multiple brokers.
    • Each partition is an ordered and immutable sequence of records
    • these partitions are distributed across multiple servers -> hence enabling parallel processing
  3. Log-centric Storage Model
    • data is written sequentially to an immutable log.
    • highly efficient for both write and read operations, as it minimizes disk seeks and allows for sequential disk I/O
    • sequential IO is much faster than random IO (by order of 100-1000)
    • read/write from SSD is much faster than read/write from HDD (by 3-4 times)
  4. Batching
    • supports the concept of batched writes, where multiple records are grouped and written as a batch.
    • Batching reduces the overhead of individual record writes and improves disk I/O efficiency.
  5. Zero-Copy
    • Kafka minimizes data copying during data transfer.
    • Instead of copying data between buffers, Kafka utilizes the operating system’s page cache and direct memory access (DMA) techniques to read and write data efficiently.
    • This zero-copy approach reduces CPU and memory overhead, contributing to faster performance.

Use cases of Kafka

  • Messaging systems
  • Activity Tracking
  • Gather metrics from many different locations, for example, IoT devices
  • Application logs analysis
  • De-coupling of system dependencies
  • Integration with Big Data technologies like Spark, Flink, Storm, Hadoop.
  • Event-sourcing store

What Kafka is NOT?

  • a database
    • even though it doesn’t delete data after processing, it doesn’t have analytical capabilities, no query model is supported
  • Work queues
    • Kafka is made of topics, not queues (unlike RabbitMQ, ActiveMQ, SQS).
    • Queues are meant to scale to millions of consumers and to delete messages once processed.
    • In Kafka data is not deleted once processed and consumers cannot scale beyond the number of partitions in a topic.
  • Kafka as a blockchain
    • Kafka topics present some characteristics of a blockchain, where data is appended in a log, and Kafka topics can be immutable,
    • but lack some key properties of blockchains such as the cryptographic verification of the data, as well as full history preservation.

Kafka as a Queue vs Pub-Sub

Data in a queue is immediately popped out when it is consumed. Unlike a queue, A record is not deleted in Kafka after it is consumed by all the consumer groups. This is controlled via a retention policy defined in the configuration. (say 48 hours – then a record older than 48 hours would be deleted).

  • As a Pub-Sub Model
    • Have multiple consumers in one consumer group, hence data from one partition would be consumed by one consumer only
    • but consumers in another consumer group would still be able to read(pub-sub)
  • As a Queue:
    • Have only one consumer in a consumer group
    • now each entry would be read by only one consumer

Push vs Pull

There are pros and cons to both approaches. However, a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately, in a push system, this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can.

The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive.

To avoid this, Kafka has parameters in the pull request that allow the consumer request to block in a “long poll” waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

Kafka Rebalancing

  • Re-balance is the re-assignment of partition ownership among consumers within a given consumer group.
  • A Re-balance happens when:
    • a consumer JOINS the group
    • a consumer SHUTS DOWN cleanly
    • a consumer is considered DEAD by the group coordinator. (when consumer crash or is busy with a long-running processing) -> no heartbeats sent in the meanwhile by the consumer to the group coordinator within the configured session interval
    • new partitions are added

For a Consumer Group:

  • Group Coordinator: is one of the brokers in the cluster
  • Group Leader: is the first consumer that joins a group

Rebalance can be more or less described as follows:

  • The leader receives a list of all consumers in the group from the group coordinator
  • this list will include all consumers that sent a heartbeat recently and which are therefore considered alive
  • the leader is responsible for assigning a subset of partitions to each consumer.
  • After deciding on the partition assignment (Kafka has a couple built-in partition assignment policies), the group leader sends the list of assignments to the group coordinator, which sends this information to all the consumers.

How to ensure the ordering of messages

Determine if ordering is really necessary. Considering two events where order matters: insert a name-> update a name, The update shouldn’t happen before insert.

To ensure this – write events related to one order to one partition only. This ensures only one consumer would process these data

How to ensure order in the face of failure? Failure might be due to deserialization etc.. let’s say, insert operation failed, then in such case we would not want to proceed with update operation.

This can be handled by a bit of sanity check:

in this example, check if data has gone through the insert phase

  • check in the cache/db where(if) you’re maintaining the state
  • add this piece of info while adding the next event into the queue
void insertConsumer(data) { 
  data.isCreated = True;
  createEvent(update, data); 
} 

void updateConsumer(data) { 
  if(data.isCreated) { 
    do something 
  } else { 
    log/throw/instrument/fail 
    } 
}
Java

How producer decide a partition to write to?

The producer will decide target partition to place any message, depending on:

  • Partition id, if it’s specified within the message
  • key % num partitions, if no partition id is mentioned
  • Round robin if neither partition id nor message key is available in the message means only the value is available.

Setup

$ tar -xzf kafka.tgz
$ cd kafka_2.13-3.6.1
  • Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

Spring Implementation

  • dependency in POM
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.0</version>
</dependency>
XML
  • Creating Topics Programmatically

We need to add the KafkaAdmin Spring bean, which will automatically add topics for all beans of type NewTopic

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("topic_1", 1, (short) 1);
    }
}
Java
  • Create producer configs
package com.neatcode.kafka.config;

@Configuration
class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * BOOTSTRAP_SERVERS_CONFIG - Host and port on which Kafka is running.
     * KEY_SERIALIZER_CLASS_CONFIG - Serializer class to be used for the key.
     * VALUE_SERIALIZER_CLASS_CONFIG - Serializer class to be used for the value.
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * ProducerFactory is responsible for creating Kafka Producer instances.
     * Producer instances are thread safe.
     * Using a single instance throughout an application context will give higher performance.
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * we need a KafkaTemplate, which wraps a Producer instance and
     * provides convenience methods for sending messages to Kafka topics.
     *
     * Consequently, KakfaTemplate instances are also thread safe, and use of one instance is recommended.
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Java
  • create consumer config
@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value(value = "${spring.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
Java
  • create kafka producer
@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * The send API returns a CompletableFuture object.
     * If we want to block the sending thread and get the result about the sent message, we can call the get API of the CompletableFuture object.
     * The thread will wait for the result, but it will slow down the producer.
     *
     * Kafka is a fast-stream processing platform.
     * Therefore, it’s better to handle the results asynchronously so that the subsequent messages do not wait for the result of the previous message.
     * We can do this through a callback:
     */
    public void sendMessage(String topicName, String message) {
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("Sent message=[" + message +
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
            } else {
                System.out.println("Unable to send message=[" +
                    message + "] due to : " + ex.getMessage());
            }
        });
    }
}
Java
  • create kafka consumer
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "topic_1", groupId = "foo")
    public void listenGroupFoo(String message) {
        System.out.println("Received Message in group foo: " + message);
    }
}
Java

Optional, but we can create an API to accept data from the user and send it to the producer to post it to the topic.

package com.neatcode.kafka.controller;

import com.neatcode.kafka.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/kafka")
public class ProducerController {

    @Autowired
    KafkaProducer kafkaProducer;

    @GetMapping("/{message}")
    public String sendEvent(@PathVariable String message) {
        kafkaProducer.sendMessage("topic_1", message);
        return "Success";
    }
}
Java

That’s it! Feel free to suggest any improvement.

Other reads you might be interested in: