Apache Kafka

Takeaway

Configuration

  • To convert a property name to a CLI option, replace . with - and use -- as prefix.

    For example, log.dirs becomes --log.dirs.

  • To convert a property name to an environment variable name, replace . with _ and use KAFKA_ as prefix.

Cheatsheet

Broker - List all topics

confluent kafka topic list --no-auth

OR

kcat -L -b $BROKER

Broker - Create a new topic

confluent kafka topic create $topic --no-auth --replication-factor 1

CLI - Consume messages

  • CLI

    • kafka-console-consumer
    • kafka-avro-console-consumer
    • kafka-json-schema-console-consumer
    • kafka-protobuf-console-consumer
  1. Run one of the above consumer commands

    kafka-protobuf-console-consumer \
       --bootstrap-server localhost:9092 \
       --from-beginning \
       --topic t1-p \
       --property schema.registry.url=http://localhost:8081
  2. It will consume all messages from the beginning.

Setup

Docker

Docker - Container Images

Docker - Environment Variables

Docker - KAFKA_ADVERTISED_HOST_NAME

Container's IP within docker0's subnet, mapping to advertised.host.name.

docker0 is a bridge

$ ip l show type bridge
 
3: br-0e97881e642e: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP mode DEFAULT group default
    link/ether 02:42:7f:ce:06:b0 brd ff:ff:ff:ff:ff:ff
4: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN mode DEFAULT group default
    link/ether 02:42:26:bc:e3:8d brd ff:ff:ff:ff:ff:ff
Docker - KAFKA_LISTENERS

Brokers have two listeners: one for communicating within the Docker network, and one for connecting from the host machine.

Because Kafka clients connect directly to brokers after initially connecting (bootstrapping), one listener uses the container name because it is a resolvable name for all containers on the Docker network.

This listener is also used for inter-broker communication.

The second listener uses localhost on a unique port that gets mapped on the host (29092 for broker-1, 39092 for broker-2, and 49092 for broker-3).

With one node, a single listener on localhost works because the localhost name is conveniently correct from within the container and from the host machine, but this doesn't apply in a multi-node setup.

Docker - KAFKA_PROCESS_ROLES
Docker - KAFKA_CONTROLLER_QUORUM_VOTERS

A comma-separated list of the three controllers

Docker - Multi Node example

# docker-compose.yaml
# Reference: https://hub.docker.com/r/apache/kafka-native
services:
  controller-1:
    image: apache/kafka-native:latest
    container_name: controller-1
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  controller-2:
    image: apache/kafka-native:latest
    container_name: controller-2
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  controller-3:
    image: apache/kafka-native:latest
    container_name: controller-3
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  broker-1:
    image: apache/kafka-native:latest
    container_name: broker-1
    ports:
      - 29092:9092
    environment:
      KAFKA_NODE_ID: 4
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3
 
  broker-2:
    image: apache/kafka-native:latest
    container_name: broker-2
    ports:
      - 39092:9092
    environment:
      KAFKA_NODE_ID: 5
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3
 
  broker-3:
    image: apache/kafka-native:latest
    container_name: broker-3
    ports:
      - 49092:9092
    environment:
      KAFKA_NODE_ID: 6
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3

Kubernetes

CLI

kcat

Confluent CLI

confluent-hub

  • Deploy Kafka connector plugins (JAR files) to Kafka Connect embedded within the ksqlDB server

  • Image: confluentinc/cp-ksqldb-server

ksqlDB

Broker

Embedded Kafka - Java

Broker - Config

Broker - Topic Config

Partitioning Strategy

Replication factor and brokers

  • Replication factor cannot exceed the number of brokers.

Serialization / Deserialization

Serializers and Deserializers

Serializers and Deserializers - Testing

  • Option 1 - Multiple instances

    Using mock serializers and deserializers, which use mock schema registry client under the hood.

  • Option 2 - Singleton

    Using singleton serializers and deserializers, which use singleton mock schema registry under the hood.

    • Must create all necessary singleton Spring Bean definitions to make the whole config consistent.

      @Slf4j
      @Profile("kafka")
      @Configuration
      public class KafkaConfig {
          @Bean
          ContainerCustomizer<String, SmsRequested, ConcurrentMessageListenerContainer<String, SmsRequested>> containerCustomizer(
                  ConcurrentKafkaListenerContainerFactory<String, SmsRequested> factory,
                  ConsumerAwareRebalanceListener rebalanceListener
          ) {
              ContainerCustomizer<String, SmsRequested, ConcurrentMessageListenerContainer<String, SmsRequested>> cust =
                      container -> {
                          container.getContainerProperties().setDeliveryAttemptHeader(true);
                          container.getContainerProperties().setConsumerRebalanceListener(rebalanceListener);
                      };
              factory.setContainerCustomizer(cust);
              return cust;
          }
       
          @Bean
          SchemaProvider avroSchemaProvider() {
              return new AvroSchemaProvider();
          }
       
          @Bean
          AbstractKafkaSchemaSerDeConfig schemaSerDeConfig(KafkaProperties kafkaProperties) {
              var producerProps = kafkaProperties.buildProducerProperties(null);
              return new AbstractKafkaSchemaSerDeConfig(AbstractKafkaSchemaSerDeConfig.baseConfigDef(), producerProps, false);
          }
       
          @Bean
          SchemaRegistryClient schemaRegistryClient(
                  @Value("${spring.kafka.properties.schema.registry.url}") String url,
                  SchemaProvider provider,
                  AbstractKafkaSchemaSerDeConfig schemaSerDeConfig
          ) {
              KafkaUtils.printSerDeConfig(schemaSerDeConfig);
              return SchemaRegistryClientFactory.newClient(
                      Collections.singletonList(url),
                      schemaSerDeConfig.getMaxSchemasPerSubject(),
                      Collections.singletonList(provider),
                      schemaSerDeConfig.originals(),
                      schemaSerDeConfig.requestHeaders()
              );
          }
       
          @Bean
          KafkaAvroSerializer avroValueSerializer(SchemaRegistryClient schemaRegistryClient) {
              return new KafkaAvroSerializer(schemaRegistryClient);
          }
       
          @Bean
          KafkaAvroDeserializer valueDeserializer(SchemaRegistryClient schemaRegistryClient) {
              return new KafkaAvroDeserializer(schemaRegistryClient);
          }
       
          @SuppressWarnings({"rawtypes", "unchecked"})
          @Bean
          DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer(KafkaAvroSerializer avroSerializer) {
              return producerFactory -> {
                  producerFactory.setValueSerializer((Serializer) avroSerializer);
                  producerFactory.addListener(new CustomProducerFactoryListener<>());
                  producerFactory.addPostProcessor(new CustomProducerPostProcessor<>());
              };
          }
       
          @SuppressWarnings({"rawtypes", "unchecked"})
          @Bean
          DefaultKafkaConsumerFactoryCustomizer consumerFactoryCustomizer(KafkaAvroDeserializer avroDeserializer) {
              return consumerFactory -> {
                  consumerFactory.setValueDeserializer((Deserializer) avroDeserializer);
                  consumerFactory.addListener(new CustomConsumerFactoryListener());
                  consumerFactory.addPostProcessor(new CustomConsumerPostProcessor());
              };
          }
      }

Schema Registry

Schema Registry Client

org.apache.kafka.common.errors.SerializationException: Unknown magic byte

Schema Registry API - List all subjects

curl -s http://$SCHEMA_REGISTRY_URL/subjects | jq

Schema Registry API - List all schemas

curl -s http://$SCHEMA_REGISTRY_URL/schemas | jq

Schema Registry API - Get a schema with a specified schema ID

curl -s http://$SCHEMA_REGISTRY_URL/schemas/ids/$SCHEMA_ID | jq

Avro

Protobuf

Producer

Producer - Config reference

Producer - Message Delivery Guarantees

Producer - Message Compression

Producer - Record Headers

Producer - Cheatsheet

Producer - Produce messages with CLI

  • CLI

    • kafka-console-producer
    • kafka-avro-console-producer
    • kafka-json-schema-console-producer
    • kafka-protobuf-console-producer
  1. Use one of the above CLI to produce messages. The topic will be created automatically if auto.create.topics.enable=true.

    kafka-protobuf-console-producer \
       --bootstrap-server ${BOOTSTRAP_SERVER} \
       --property schema.registry.url=${SCHEMA_REGISTRY_URL} \
       --topic t1-p \
       --property parse.key=true \
       --property key.schema.id=$SCHEMA_ID \
       --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }'
  2. You will be prompted with stdin to input the message payload

    Input messages as many as you want, press ENTER after inputting each message to send it, and switch to a new line to type a new message. To quit, use Ctrl + C.

    { "f1": "value1-p" }

    An alternative is to use a file for message input.

    kafka-protobuf-console-producer \
       --bootstrap-server ${BOOTSTRAP_SERVER} \
       --property schema.registry.url=${SCHEMA_REGISTRY_URL} \
       --topic t1-p \
       --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }' \
    < topic-input.txt
  3. Each message will be sent to broker and can be seen in Control Center - Topics view.

    Control Center - Topics view

Consumer

  • Number of consumers in a consumer group shouldn't exceed the number of partitions in the topic, because each partition has a consumer assigned to it.
  • At the heart of the Consumer API is a simple loop for polling the server for more data.

Consumer - Spring Kafka - MessageListenerContainer

There are two implementations of MessageListenerContainer - the KafkaMessageListenerContainer (KMLC) and ConcurrentMessageListenerContainer (CMLC).

The CMLC is simply a wrapper for one or more KMLCs, with the number of KMLCs specified by the concurrency.

@KafkaListener always uses a CMLC.

Each KMLC gets one Consumer (and one thread). The thread continually poll()s the consumer, with the specified pollTimeout.

How the topics/partitions are distributed across the KMLCs depends on

If you have multiple topics with fewer partitions than the concurrency, you will likely need an alternate partition assignor, such as the round robin assignor, otherwise you will have idle containers with no assignment.

  • If we are using 1 @KafkaListener with 2 Topics, then Spring Kafka creates a single MessageListenerContainer, and if I use separate @KafkaListener annotations, then Spring Kafka creates 2 MessageListenerContainers?

    A: That is correct; if you explicitly want a different container for each topic, you can provide multiple @KafkaListener annotations on the same method.

  • Does MessageListenerContainer mean consumer?

    A: See my explanation above.

  • If I give concurrency as 4 in ConcurrentKafkaListenerContainerFactory, then that means for every @KafkaListener, I open 4 threads with the broker? That means coordinator sees them as 4 different consumers?

    A: That is correct - it's the only way to get concurrency with Kafka (without adding very complicated logic to manage offsets).

  • How polling works with @KafkaListener? Does it get only 1 ConsumerRecord from the broker every time?

    A: The number of records returned by each poll depends on a number of consumer properties, max.poll.records (opens in a new tab), fetch.min.bytes (opens in a new tab), fetch.max.wait.ms (opens in a new tab).

Consumer - Config reference

Kafka Admin

Kafka Connect

Kafka Connect - Config reference

Kafka Connect - Datagen Source Connector to generate mock data

Kafka Connect - Cheatsheet

Kafka Connect - Get worker cluster ID, version, and git source code commit ID

curl ${KAFKA_CONNECT_HOST}:8083/ | jq

Kafka Connect - List the connector plugins available on a worker

  • Connector plugins are JARs deployed to Kafka Connect, inactive but available to be used.

    curl ${KAFKA_CONNECT_HOST}:8083/connector-plugins | jq

Kafka Connect - List active connectors on a worker

  • Connectors are connector plugins that are currently running.

    curl ${KAFKA_CONNECT_HOST}:8083/connectors | jq

REST Proxy

Authentication

  • REST Proxy default admin credentials: admin/admin_pw

Observability

Observability - JMX

Spring Kafka

@EnableKafka

  • Enables detection of @KafkaListener annotations on any Spring-managed bean

@KafkaListener

  • A method annotated with @KafkaListener is called a MethodKafkaListenerEndpoint, which is an implementation of KafkaListenerEndpoint.

Topics

  • Topic is a logical grouping of partitions.
  • Each partition has its own offsets, which are referenced by consumer groups to track their own message consumption progress.

Auto creating topics

  • Define a NewTopic bean.

    @Bean
    NewTopic topic(@Value("${spring.kafka.template.default-topic}") String defaultTopic) {
        return TopicBuilder.name(defaultTopic)
            .partitions(1)
            .replicas(1)
            .build();
    }
  • NewTopic beans are picked up by KafkaAdmin only when auto-create is on.

    # Default value is true
    spring.kafka.admin.auto-create=true

Disable auto creating topics

Spring Kafka - Serde

  • Deserializer config comes from ConsumerFactory
  • Serializer config comes from ProducerFactory

Spring Kafka - Logging

Spring Kafka - Logging - Producer

logging.level.org.springframework.kafka.core=trace
2025-01-23T21:27:15.110+11:00 TRACE 796460 --- [           main] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.263+11:00 TRACE 796460 --- [           main] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.277+11:00 TRACE 796460 --- [ad | producer-1] o.s.kafka.core.KafkaTemplate             : Sent ok: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null), metadata: ccom.notification-sms-v1.dev-0@0

References

Resources - Examples

Interview

Interview Questions