- Serialization, Deserialization, and Message Conversion (opens in a new tab)
- Configuring Topics (opens in a new tab)
@EnableKafka
- Enables detection of
@KafkaListenerannotations on any Spring-managed bean
Topics
- Topic is a logical grouping of partitions.
- Each partition has its own offsets, which are referenced by consumer groups to track their own message consumption progress.
Auto creating topics
-
Define a
NewTopicbean.@Bean NewTopic topic(@Value("${spring.kafka.template.default-topic}") String defaultTopic) { return TopicBuilder.name(defaultTopic) .partitions(1) .replicas(1) .build(); } -
NewTopicbeans are picked up byKafkaAdminonly whenauto-createis on.# Default value is true spring.kafka.admin.auto-create=true
Disable auto creating topics
-
Broker
auto.create.topics.enable (opens in a new tab)
-
Docker:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
-
-
Consumer
allow.auto.create.topics (opens in a new tab)
auto.offset.reset (opens in a new tab)
This configuration is mostly applicable when a consumer group (usually a new one) reads from a topic for the very first time and the consumer instances within the group need to know exactly where to start reading from the topic’s partitions.
-
Admin client
-
Spring Kafka
For Spring only, automatically create topics during context initialization by looking for all
NewTopicsbeans.spring.kafka.admin.auto-create=false
-
Serde
Deserializerconfig comes fromConsumerFactorySerializerconfig comes fromProducerFactory
Logging
Logging - Producer
logging.level.org.springframework.kafka.core=trace2025-01-23T21:27:15.110+11:00 TRACE 796460 --- [ main] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.263+11:00 TRACE 796460 --- [ main] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.277+11:00 TRACE 796460 --- [ad | producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null), metadata: ccom.notification-sms-v1.dev-0@0Consumer - MessageListenerContainer
There are two implementations of MessageListenerContainer - the KafkaMessageListenerContainer (KMLC) and ConcurrentMessageListenerContainer (CMLC).
The CMLC is simply a wrapper for one or more KMLCs, with the number of KMLCs specified by the concurrency.
@KafkaListener always uses a CMLC.
Each KMLC gets one Consumer (and one thread). The thread continually poll()s the consumer, with the specified pollTimeout.
How the topics/partitions are distributed across the KMLCs depends on
- How many
partitions the topic(s) have - The consumer's
partition.assignment.strategy(opens in a new tab) property
If you have multiple topics with fewer partitions than the concurrency, you will likely need an alternate partition assigner, such as the round robin assigner, otherwise you will have idle containers with no assignment.
-
If we are using 1
@KafkaListenerwith 2 Topics, then Spring Kafka creates a singleMessageListenerContainer, and if I use separate@KafkaListenerannotations, then Spring Kafka creates 2MessageListenerContainers?A: That is correct; if you explicitly want a different container for each topic, you can provide multiple
@KafkaListenerannotations on the same method. -
Does
MessageListenerContainermeanconsumer?A: See my explanation above.
-
If I give concurrency as 4 in
ConcurrentKafkaListenerContainerFactory, then that means for every@KafkaListener, I open 4 threads with the broker? That means coordinator sees them as 4 differentconsumers?A: That is correct - it's the only way to get concurrency with Kafka (without adding very complicated logic to manage offsets).
-
How polling works with
@KafkaListener? Does it get only 1ConsumerRecordfrom the broker every time?A: The number of records returned by each poll depends on a number of consumer properties,
max.poll.records(opens in a new tab),fetch.min.bytes(opens in a new tab),fetch.max.wait.ms(opens in a new tab).
Consumer - @KafkaListener
- A method annotated with
@KafkaListeneris called aMethodKafkaListenerEndpoint, which is an implementation ofKafkaListenerEndpoint.
Error Handling
Blocking retry
Do retry when retriable exceptions occur during consuming a message, and block the next message.
The default behavior is attempting to consume one massage at most 10 times, then consume the next message and print an error log if it still fails. Please see the org.springframework.kafka.listener.DefaultErrorHandler for details.
Non-blocking retry
Send the message to another retry topic, when the message exceeds the blocking retry max attempts limit.
With @RetryableTopic, it will build the retry topics for you with the broker default setting. It might create multiple topics if we retry many times and every time will send to a different topic (can be configured with fixedDelayTopicStrategy property), like origintopic-retry-1, origintopic-retry-2, etc.
Dead letter queue and handler
Send the message to another dead letter topic, when the message exceeds the non-blocking retry max attempts limit or the exception is not a retriable exception.