Apache Kafka

Takeaway

Configuration

  • To convert a property name to a CLI option, replace . with - and use -- as prefix.

    For example, log.dirs becomes --log.dirs.

  • To convert a property name to an environment variable name, replace . with _ and use KAFKA_ as prefix.

Cheatsheet

Broker - List all topics

confluent kafka topic list --no-auth

or

kcat -L -b $BROKER

Broker - Create a new topic

confluent kafka topic create $topic --no-auth --replication-factor 1

CLI - Consume messages

  • CLI

    • kafka-console-consumer
    • kafka-avro-console-consumer
    • kafka-json-schema-console-consumer
    • kafka-protobuf-console-consumer
  1. Run one of the above consumer commands

    kafka-protobuf-console-consumer \
       --bootstrap-server localhost:9092 \
       --from-beginning \
       --topic t1-p \
       --property schema.registry.url=http://localhost:8081
  2. It will consume all messages from the beginning.

Setup

Docker

Docker - Container Images

Docker - Environment Variables

Docker - KAFKA_ADVERTISED_HOST_NAME

Container's IP within docker0's subnet, mapping to advertised.host.name.

docker0 is a bridge

$ ip l show type bridge
 
3: br-0e97881e642e: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP mode DEFAULT group default
    link/ether 02:42:7f:ce:06:b0 brd ff:ff:ff:ff:ff:ff
4: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN mode DEFAULT group default
    link/ether 02:42:26:bc:e3:8d brd ff:ff:ff:ff:ff:ff
Docker - KAFKA_LISTENERS

Brokers have two listeners: one for communicating within the Docker network, and one for connecting from the host machine.

Because Kafka clients connect directly to brokers after initially connecting (bootstrapping), one listener uses the container name because it is a resolvable name for all containers on the Docker network.

This listener is also used for inter-broker communication.

The second listener uses localhost on a unique port that gets mapped on the host (29092 for broker-1, 39092 for broker-2, and 49092 for broker-3).

With one node, a single listener on localhost works because the localhost name is conveniently correct from within the container and from the host machine, but this doesn't apply in a multi-node setup.

Docker - KAFKA_PROCESS_ROLES
Docker - KAFKA_CONTROLLER_QUORUM_VOTERS

A comma-separated list of the three controllers

Docker - Multi Node example

# docker-compose.yaml
# Reference: https://hub.docker.com/r/apache/kafka-native
services:
  controller-1:
    image: apache/kafka-native:latest
    container_name: controller-1
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  controller-2:
    image: apache/kafka-native:latest
    container_name: controller-2
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  controller-3:
    image: apache/kafka-native:latest
    container_name: controller-3
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  broker-1:
    image: apache/kafka-native:latest
    container_name: broker-1
    ports:
      - 29092:9092
    environment:
      KAFKA_NODE_ID: 4
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3
 
  broker-2:
    image: apache/kafka-native:latest
    container_name: broker-2
    ports:
      - 39092:9092
    environment:
      KAFKA_NODE_ID: 5
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3
 
  broker-3:
    image: apache/kafka-native:latest
    container_name: broker-3
    ports:
      - 49092:9092
    environment:
      KAFKA_NODE_ID: 6
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3

Kubernetes

CLI

kcat

Confluent CLI

confluent-hub

  • Deploy Kafka connector plugins (JAR files) to Kafka Connect embedded within the ksqlDB server

  • Image: confluentinc/cp-ksqldb-server

ksqlDB

Features

Idempotency / Exactly Once

  • Producer
    • enable.idempotence=true (default)

    • acks=all (default)

    • max.in.flight.requests.per.connection=5 (default)

      Enabling idempotence requires max.in.flight.requests.per.connection<=5, because broker only retains at most 5 batches for each producer. If more than 5, previous batches may be removed on broker side.

Transaction

  • Producer
    • transactional.id=<unique_id_per_producer_instance> (default: null)
  • Consumer
    • isolation.level=read_committed (default: read_uncommitted)

      If set to read_committed, consumer.poll() will only return transactional messages which have been committed.

Concepts

Concept - Topic

  • Definition:

    A named stream of records in Kafka — a logical category to which producers write and from which consumers read.

  • Purpose:

    Logical organization: Organises data by use-case (e.g., "orders", "user-events").

Concepts - Partition

  • Definition:

    An ordered, immutable, append-only sequence of records within a topic. A topic is split into one or more partitions.

  • Purpose:

    • Parallelism

      Different partitions can be consumed concurrently by different consumers in the same consumer group.

  • Note:

    Number of partitions is set when creating a topic and can be increased later (but not decreased).

Concepts - Replication Factor

  • Definition:

    The number of copies of each partition maintained across different brokers, namely the number of replica.

  • Purpose:

    1. Faule tolerance and availability

      If the leader broker for a partition fails, a replica can be promoted to leader.

  • Note:

    Replication factor cannot exceed the number of broker instances.

Workflow

  • A single topic is divided into partitions, and each partition is stored on a broker.
  • Each partition has exactly one leader at a time and0 or more followers (replicas) on other brokers.
  • Producers and consumers interact only with the partition's leader.
  • ISR is the set of followers (replicas) for a partition that are up-to-date with the leader (i.e., have caught up to the leader's log within configured lag limits).
  • Only replicas in the ISR are eligible to be elected leader if the current leader fails.
  • Producers with acks=all wait for replication to all ISRs before a write is considered committed.
  • After a producer commits the transaction, messages and offsets become visible to consumers.

Broker

Embedded Kafka - Java

Broker - Config

Broker - Topic Config Reference

Broker - Partitioning Strategy - How to choose the number of partitions

  • Estimate the max throughput per producer for a single partition
  • Consider the number of partitions you will place on each broker and available diskspace and network bandwidth per broker.

Broker - Replication factor

  • Set the default.replication.factor to at least 1 above the min.insync.replicas setting.
  • For more fault-resistant settings, if you have large enough clusters and enough hardware, setting your replication factor to 2 above the min.insync.replicas (abbreviated as RF++) can be preferable. RF++ will allow easier maintenance and prevent outages. The reasoning behind this recommendation is to allow for one planned outage within the replica set and one unplanned outage to occur simultaneously.
  • When configuring your cluster for data durability, setting min.insync.replicas to 2 ensures that at least 2 replicas are caught up and in sync with the producer. This is used in tandem with setting the producer config to ack all requests. This will ensure that at least 2 replicas (leader and one other) acknowledge a write for it to be successful.

Serialization / Deserialization

Serializers and Deserializers

Serializers and Deserializers - Testing

  • Option 1 - Multiple instances

    Using mock serializers and deserializers, which use mock schema registry client under the hood.

  • Option 2 - Singleton

    Using singleton serializers and deserializers, which use singleton mock schema registry under the hood.

    • Must create all necessary singleton Spring Bean definitions to make the whole config consistent.

      @Slf4j
      @Profile("kafka")
      @Configuration
      public class KafkaConfig {
          @Bean
          ContainerCustomizer<String, SmsRequested, ConcurrentMessageListenerContainer<String, SmsRequested>> containerCustomizer(
                  ConcurrentKafkaListenerContainerFactory<String, SmsRequested> factory,
                  ConsumerAwareRebalanceListener rebalanceListener
          ) {
              ContainerCustomizer<String, SmsRequested, ConcurrentMessageListenerContainer<String, SmsRequested>> cust =
                      container -> {
                          container.getContainerProperties().setDeliveryAttemptHeader(true);
                          container.getContainerProperties().setConsumerRebalanceListener(rebalanceListener);
                      };
              factory.setContainerCustomizer(cust);
              return cust;
          }
       
          @Bean
          SchemaProvider avroSchemaProvider() {
              return new AvroSchemaProvider();
          }
       
          @Bean
          AbstractKafkaSchemaSerDeConfig schemaSerDeConfig(KafkaProperties kafkaProperties) {
              var producerProps = kafkaProperties.buildProducerProperties(null);
              return new AbstractKafkaSchemaSerDeConfig(AbstractKafkaSchemaSerDeConfig.baseConfigDef(), producerProps, false);
          }
       
          @Bean
          SchemaRegistryClient schemaRegistryClient(
                  @Value("${spring.kafka.properties.schema.registry.url}") String url,
                  SchemaProvider provider,
                  AbstractKafkaSchemaSerDeConfig schemaSerDeConfig
          ) {
              KafkaUtils.printSerDeConfig(schemaSerDeConfig);
              return SchemaRegistryClientFactory.newClient(
                      Collections.singletonList(url),
                      schemaSerDeConfig.getMaxSchemasPerSubject(),
                      Collections.singletonList(provider),
                      schemaSerDeConfig.originals(),
                      schemaSerDeConfig.requestHeaders()
              );
          }
       
          @Bean
          KafkaAvroSerializer avroValueSerializer(SchemaRegistryClient schemaRegistryClient) {
              return new KafkaAvroSerializer(schemaRegistryClient);
          }
       
          @Bean
          KafkaAvroDeserializer valueDeserializer(SchemaRegistryClient schemaRegistryClient) {
              return new KafkaAvroDeserializer(schemaRegistryClient);
          }
       
          @SuppressWarnings({"rawtypes", "unchecked"})
          @Bean
          DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer(KafkaAvroSerializer avroSerializer) {
              return producerFactory -> {
                  producerFactory.setValueSerializer((Serializer) avroSerializer);
                  producerFactory.addListener(new CustomProducerFactoryListener<>());
                  producerFactory.addPostProcessor(new CustomProducerPostProcessor<>());
              };
          }
       
          @SuppressWarnings({"rawtypes", "unchecked"})
          @Bean
          DefaultKafkaConsumerFactoryCustomizer consumerFactoryCustomizer(KafkaAvroDeserializer avroDeserializer) {
              return consumerFactory -> {
                  consumerFactory.setValueDeserializer((Deserializer) avroDeserializer);
                  consumerFactory.addListener(new CustomConsumerFactoryListener());
                  consumerFactory.addPostProcessor(new CustomConsumerPostProcessor());
              };
          }
      }

Avro

Protobuf

Producer

Producer - Config reference

Producer - Message Delivery Guarantees

Producer - Message Compression

Producer - Record Headers

Producer - Cheatsheet

Producer - Produce messages with CLI

  • CLI

    • kafka-console-producer
    • kafka-avro-console-producer
    • kafka-json-schema-console-producer
    • kafka-protobuf-console-producer
  1. Use one of the above CLI to produce messages. The topic will be created automatically if auto.create.topics.enable=true.

    kafka-protobuf-console-producer \
       --bootstrap-server ${BOOTSTRAP_SERVER} \
       --property schema.registry.url=${SCHEMA_REGISTRY_URL} \
       --topic t1-p \
       --property parse.key=true \
       --property key.schema.id=$SCHEMA_ID \
       --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }'
  2. You will be prompted with stdin to input the message payload

    Input messages as many as you want, press ENTER after inputting each message to send it, and switch to a new line to type a new message. To quit, use Ctrl + C.

    { "f1": "value1-p" }

    An alternative is to use a file for message input.

    kafka-protobuf-console-producer \
       --bootstrap-server ${BOOTSTRAP_SERVER} \
       --property schema.registry.url=${SCHEMA_REGISTRY_URL} \
       --topic t1-p \
       --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }' \
    < topic-input.txt
  3. Each message will be sent to broker and can be seen in Control Center - Topics view.

    Control Center - Topics view

Consumer

  • Number of consumers in a consumer group shouldn't exceed the number of partitions in the topic, because each partition has a consumer assigned to it.
  • At the heart of the Consumer API is a simple loop for polling the server for more data.

Consumer - Config reference

Consumer - Metrics

Micrometer MetricWhat It Tells YouWarning Signs
kafka.consumer.fetch.manager.records.lagNumber of messages consumer is behindSteady increase over time
kafka.consumer.fetch.manager.records.lag.maxMaximum lag across all partitionsSpikes during peak hours
kafka.consumer.fetch.manager.records.consumed.rateConsumption throughputDrops during lag increases
kafka.consumer.fetch.manager.bytes.consumed.rateData volume throughputLower than producer rate
kafka.consumer.fetch.manager.fetch.rateHow often consumer pollsStalls or inconsistency

Kafka Admin

Kafka Connect

Kafka Connect - Config reference

Kafka Connect - Datagen Source Connector to generate mock data

Kafka Connect - Cheatsheet

Kafka Connect - Get worker cluster ID, version, and git source code commit ID

curl ${KAFKA_CONNECT_HOST}:8083/ | jq

Kafka Connect - List the connector plugins available on a worker

  • Connector plugins are JARs deployed to Kafka Connect, inactive but available to be used.

    curl ${KAFKA_CONNECT_HOST}:8083/connector-plugins | jq

Kafka Connect - List active connectors on a worker

  • Connectors are connector plugins that are currently running.

    curl ${KAFKA_CONNECT_HOST}:8083/connectors | jq

REST Proxy

Authentication

  • REST Proxy default admin credentials: admin/admin_pw

Observability

Observability - JMX

Testing

Resources

Resources - Examples

Resources - Visualization

Interview

Interview Questions

Demo