Spring Kafka

@EnableKafka

  • Enables detection of @KafkaListener annotations on any Spring-managed bean

Topics

  • Topic is a logical grouping of partitions.
  • Each partition has its own offsets, which are referenced by consumer groups to track their own message consumption progress.

Auto creating topics

  • Define a NewTopic bean.

    @Bean
    NewTopic topic(@Value("${spring.kafka.template.default-topic}") String defaultTopic) {
        return TopicBuilder.name(defaultTopic)
            .partitions(1)
            .replicas(1)
            .build();
    }
  • NewTopic beans are picked up by KafkaAdmin only when auto-create is on.

    # Default value is true
    spring.kafka.admin.auto-create=true

Disable auto creating topics

Serde

  • Deserializer config comes from ConsumerFactory
  • Serializer config comes from ProducerFactory

Logging

Logging - Producer

logging.level.org.springframework.kafka.core=trace
2025-01-23T21:27:15.110+11:00 TRACE 796460 --- [           main] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.263+11:00 TRACE 796460 --- [           main] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.277+11:00 TRACE 796460 --- [ad | producer-1] o.s.kafka.core.KafkaTemplate             : Sent ok: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null), metadata: ccom.notification-sms-v1.dev-0@0

Consumer - MessageListenerContainer

There are two implementations of MessageListenerContainer - the KafkaMessageListenerContainer (KMLC) and ConcurrentMessageListenerContainer (CMLC).

The CMLC is simply a wrapper for one or more KMLCs, with the number of KMLCs specified by the concurrency.

@KafkaListener always uses a CMLC.

Each KMLC gets one Consumer (and one thread). The thread continually poll()s the consumer, with the specified pollTimeout.

How the topics/partitions are distributed across the KMLCs depends on

If you have multiple topics with fewer partitions than the concurrency, you will likely need an alternate partition assigner, such as the round robin assigner, otherwise you will have idle containers with no assignment.

  • If we are using 1 @KafkaListener with 2 Topics, then Spring Kafka creates a single MessageListenerContainer, and if I use separate @KafkaListener annotations, then Spring Kafka creates 2 MessageListenerContainers?

    A: That is correct; if you explicitly want a different container for each topic, you can provide multiple @KafkaListener annotations on the same method.

  • Does MessageListenerContainer mean consumer?

    A: See my explanation above.

  • If I give concurrency as 4 in ConcurrentKafkaListenerContainerFactory, then that means for every @KafkaListener, I open 4 threads with the broker? That means coordinator sees them as 4 different consumers?

    A: That is correct - it's the only way to get concurrency with Kafka (without adding very complicated logic to manage offsets).

  • How polling works with @KafkaListener? Does it get only 1 ConsumerRecord from the broker every time?

    A: The number of records returned by each poll depends on a number of consumer properties, max.poll.records (opens in a new tab), fetch.min.bytes (opens in a new tab), fetch.max.wait.ms (opens in a new tab).

Consumer - @KafkaListener

  • A method annotated with @KafkaListener is called a MethodKafkaListenerEndpoint, which is an implementation of KafkaListenerEndpoint.

Error Handling

Blocking retry

Do retry when retriable exceptions occur during consuming a message, and block the next message.

The default behavior is attempting to consume one massage at most 10 times, then consume the next message and print an error log if it still fails. Please see the org.springframework.kafka.listener.DefaultErrorHandler for details.

Non-blocking retry

Send the message to another retry topic, when the message exceeds the blocking retry max attempts limit.

With @RetryableTopic, it will build the retry topics for you with the broker default setting. It might create multiple topics if we retry many times and every time will send to a different topic (can be configured with fixedDelayTopicStrategy property), like origintopic-retry-1, origintopic-retry-2, etc.

Dead letter queue and handler

Send the message to another dead letter topic, when the message exceeds the non-blocking retry max attempts limit or the exception is not a retriable exception.