Takeaway
Configuration
-
To convert a property name to a CLI option, replace
.
with-
and use--
as prefix.For example,
log.dirs
becomes--log.dirs
. -
To convert a property name to an environment variable name, replace
.
with_
and useKAFKA_
as prefix.
Cheatsheet
Broker - List all topics
confluent kafka topic list --no-auth
OR
kcat -L -b $BROKER
Broker - Create a new topic
confluent kafka topic create $topic --no-auth --replication-factor 1
CLI - Consume messages
-
CLI
kafka-console-consumer
kafka-avro-console-consumer
kafka-json-schema-console-consumer
kafka-protobuf-console-consumer
-
Run one of the above consumer commands
kafka-protobuf-console-consumer \ --bootstrap-server localhost:9092 \ --from-beginning \ --topic t1-p \ --property schema.registry.url=http://localhost:8081
-
It will consume all messages from the beginning.
Setup
Docker
- Confluent Docs - Install Confluent Platform using Docker (opens in a new tab)
- Docker Configuration Parameters for Confluent Platform - Required Kafka configurations for KRaft mode (opens in a new tab)
- Docker Configuration Parameters for Confluent Platform - Required Kafka configurations for ZooKeeper mode (opens in a new tab)
- Confluent Docs - Docker Image Reference for Confluent Platform (opens in a new tab)
Docker - Container Images
-
Broker
-
Schema Registry
-
confluentinc/cp-schema-registry - Docker Image | Docker Hub (opens in a new tab)
Only compatible with
confluentinc/cp-kafka
image
-
Docker - Environment Variables
Docker - KAFKA_ADVERTISED_HOST_NAME
Container's IP within docker0's subnet
, mapping to advertised.host.name
.
docker0
is a bridge
$ ip l show type bridge
3: br-0e97881e642e: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP mode DEFAULT group default
link/ether 02:42:7f:ce:06:b0 brd ff:ff:ff:ff:ff:ff
4: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN mode DEFAULT group default
link/ether 02:42:26:bc:e3:8d brd ff:ff:ff:ff:ff:ff
Docker - KAFKA_LISTENERS
Brokers have two listeners: one for communicating within the Docker network, and one for connecting from the host machine.
Because Kafka clients connect directly to brokers after initially connecting (bootstrapping), one listener uses the container name because it is a resolvable name for all containers on the Docker network.
This listener is also used for inter-broker communication.
The second listener uses localhost on a unique port that gets mapped on the host (29092 for broker-1, 39092 for broker-2, and 49092 for broker-3).
With one node, a single listener on localhost works because the localhost name is conveniently correct from within the container and from the host machine, but this doesn't apply in a multi-node setup.
Docker - KAFKA_PROCESS_ROLES
Docker - KAFKA_CONTROLLER_QUORUM_VOTERS
A comma-separated list of the three controllers
Docker - Multi Node example
# docker-compose.yaml
# Reference: https://hub.docker.com/r/apache/kafka-native
services:
controller-1:
image: apache/kafka-native:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-2:
image: apache/kafka-native:latest
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-3:
image: apache/kafka-native:latest
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
broker-1:
image: apache/kafka-native:latest
container_name: broker-1
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-2:
image: apache/kafka-native:latest
container_name: broker-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-3:
image: apache/kafka-native:latest
container_name: broker-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: "PLAINTEXT://:19092,PLAINTEXT_HOST://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
Kubernetes
CLI
kcat
Confluent CLI
-
GitHub - Confluent CLI (opens in a new tab)
- Image:
confluentinc/confluent-cli
- Image:
-
Confluent Docs - Confluent CLI Command Reference (opens in a new tab)
-
Confluent Docs - Confluent CLI compatibility with Confluent Platform (opens in a new tab)
confluent-hub
-
Deploy Kafka connector plugins (JAR files) to
Kafka Connect
embedded within theksqlDB
server -
Image:
confluentinc/cp-ksqldb-server
ksqlDB
-
Image:
confluentinc/cp-ksqldb-server
-
Resources
Broker
Embedded Kafka - Java
-
Confluent
-
Spring Kafka
Broker - Config
Broker - Topic Config
Partitioning Strategy
Replication factor and brokers
- Replication factor cannot exceed the number of brokers.
Serialization / Deserialization
Serializers and Deserializers
Serializers and Deserializers - Testing
-
Option 1 - Multiple instances
Using mock serializers and deserializers, which use mock schema registry client under the hood.
-
Option 2 - Singleton
Using singleton serializers and deserializers, which use singleton mock schema registry under the hood.
-
Must create all necessary singleton Spring Bean definitions to make the whole config consistent.
@Slf4j @Profile("kafka") @Configuration public class KafkaConfig { @Bean ContainerCustomizer<String, SmsRequested, ConcurrentMessageListenerContainer<String, SmsRequested>> containerCustomizer( ConcurrentKafkaListenerContainerFactory<String, SmsRequested> factory, ConsumerAwareRebalanceListener rebalanceListener ) { ContainerCustomizer<String, SmsRequested, ConcurrentMessageListenerContainer<String, SmsRequested>> cust = container -> { container.getContainerProperties().setDeliveryAttemptHeader(true); container.getContainerProperties().setConsumerRebalanceListener(rebalanceListener); }; factory.setContainerCustomizer(cust); return cust; } @Bean SchemaProvider avroSchemaProvider() { return new AvroSchemaProvider(); } @Bean AbstractKafkaSchemaSerDeConfig schemaSerDeConfig(KafkaProperties kafkaProperties) { var producerProps = kafkaProperties.buildProducerProperties(null); return new AbstractKafkaSchemaSerDeConfig(AbstractKafkaSchemaSerDeConfig.baseConfigDef(), producerProps, false); } @Bean SchemaRegistryClient schemaRegistryClient( @Value("${spring.kafka.properties.schema.registry.url}") String url, SchemaProvider provider, AbstractKafkaSchemaSerDeConfig schemaSerDeConfig ) { KafkaUtils.printSerDeConfig(schemaSerDeConfig); return SchemaRegistryClientFactory.newClient( Collections.singletonList(url), schemaSerDeConfig.getMaxSchemasPerSubject(), Collections.singletonList(provider), schemaSerDeConfig.originals(), schemaSerDeConfig.requestHeaders() ); } @Bean KafkaAvroSerializer avroValueSerializer(SchemaRegistryClient schemaRegistryClient) { return new KafkaAvroSerializer(schemaRegistryClient); } @Bean KafkaAvroDeserializer valueDeserializer(SchemaRegistryClient schemaRegistryClient) { return new KafkaAvroDeserializer(schemaRegistryClient); } @SuppressWarnings({"rawtypes", "unchecked"}) @Bean DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer(KafkaAvroSerializer avroSerializer) { return producerFactory -> { producerFactory.setValueSerializer((Serializer) avroSerializer); producerFactory.addListener(new CustomProducerFactoryListener<>()); producerFactory.addPostProcessor(new CustomProducerPostProcessor<>()); }; } @SuppressWarnings({"rawtypes", "unchecked"}) @Bean DefaultKafkaConsumerFactoryCustomizer consumerFactoryCustomizer(KafkaAvroDeserializer avroDeserializer) { return consumerFactory -> { consumerFactory.setValueDeserializer((Deserializer) avroDeserializer); consumerFactory.addListener(new CustomConsumerFactoryListener()); consumerFactory.addPostProcessor(new CustomConsumerPostProcessor()); }; } }
-
Schema Registry
-
Confluent Schema Registry
is aConfluent
product that stores and manages schemas for Kafka messages, not a part ofApache Kafka
. -
Subject
-
A serializer registers a schema in
Schema Registry
under asubject
name, which defines a namespace in the registry. -
Subject
name is determined by the subject name strategy (opens in a new tab).-
io.confluent.kafka.serializers.subject.TopicNameStrategy (default)
- default
$topic_name
plus-key
or-value
depending on configuration$fully_qualified_record_name
$topic_name
-$fully_qualified_record_name
-
io.confluent.kafka.serializers.subject.RecordNameStrategy
The subject name is the fully-qualified name of the record type.
The schema registry checks the compatibility for a particular record type, regardless of topic.
-
TopicRecordNameStrategy
The subject name is
<topic>-<type>
, where<topic>
is the Kafka topic name, and<type>
is the fully-qualified name of the Avro record type of the message.This setting also allows any number of event types in the same topic, and further constrains the compatibility check to the current topic only.
-
-
-
Testing
-
Resources
Schema Registry Client
-
Challenges
-
CachedSchemaRegistryClient is a REST client and retrieves schema from remote Schema Registry, which centrally manage all schemas.
-
MockSchemaRegistryClient uses single-instance in-memory store as the Schema Registry and doesn't share it among instances.
MockSchemaRegistryClient is not thread safe, so need to be careful when used as singleton for test cases.
-
-
Implementations
-
Confluent
-
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient (opens in a new tab)
- Thread safe, can be used as a singleton Spring bean
-
-
Mock
-
io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry (opens in a new tab)
-
io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient (opens in a new tab)
- Not thread safe
- In-memory store per instance
-
-
org.apache.kafka.common.errors.SerializationException: Unknown magic byte
Schema Registry API - List all subjects
curl -s http://$SCHEMA_REGISTRY_URL/subjects | jq
Schema Registry API - List all schemas
curl -s http://$SCHEMA_REGISTRY_URL/schemas | jq
Schema Registry API - Get a schema with a specified schema ID
curl -s http://$SCHEMA_REGISTRY_URL/schemas/ids/$SCHEMA_ID | jq
Avro
Protobuf
Producer
- FAQ: Producer side settings explained: linger.ms and batch.size (opens in a new tab)
- Hevo - Critical Kafka Producer Config Parameters Simplified (opens in a new tab)
- Medium - Building a lossless Kafka Producer (opens in a new tab)
- GitHub - hevoio/recoverable-kafka-producer (opens in a new tab)
- Hands On: Tuning the Apache Kafka Producer Client (opens in a new tab)
Producer - Config reference
Producer - Message Delivery Guarantees
Producer - Message Compression
- IBM Developer - Message compression in Apache Kafka (opens in a new tab)
- Cloudflare - Squeezing the firehose: getting the most from Kafka compression (opens in a new tab)
Producer - Record Headers
-
Design
public interface Header { String key(); byte[] value(); }
-
Use cases
- Automated routing of messages based on header information between clusters
- Enterprise APM tools (e.g., Appdynamics or Dynatrace) need to stitch in “magic” transaction IDs for them to provide end-to-end transaction flow monitoring.
- Audit metadata is recorded with the message, for example, the client-id that produced the record.
- Business payload needs to be encrypted end to end and signed without tamper, but ecosystem components need access to metadata to achieve tasks.
Producer - Cheatsheet
Producer - Produce messages with CLI
-
CLI
kafka-console-producer
kafka-avro-console-producer
kafka-json-schema-console-producer
kafka-protobuf-console-producer
-
Use one of the above CLI to produce messages. The topic will be created automatically if
auto.create.topics.enable=true
.kafka-protobuf-console-producer \ --bootstrap-server ${BOOTSTRAP_SERVER} \ --property schema.registry.url=${SCHEMA_REGISTRY_URL} \ --topic t1-p \ --property parse.key=true \ --property key.schema.id=$SCHEMA_ID \ --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }'
-
You will be prompted with
stdin
to input the message payloadInput messages as many as you want, press ENTER after inputting each message to send it, and switch to a new line to type a new message. To quit, use
Ctrl + C
.{ "f1": "value1-p" }
An alternative is to use a file for message input.
kafka-protobuf-console-producer \ --bootstrap-server ${BOOTSTRAP_SERVER} \ --property schema.registry.url=${SCHEMA_REGISTRY_URL} \ --topic t1-p \ --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }' \ < topic-input.txt
-
Each message will be sent to broker and can be seen in Control Center - Topics view.
Consumer
- Number of consumers in a consumer group shouldn't exceed the number of partitions in the topic, because each partition has a consumer assigned to it.
- At the heart of the Consumer API is a simple loop for polling the server for more data.
Consumer - Spring Kafka - MessageListenerContainer
There are two implementations of MessageListenerContainer
- the KafkaMessageListenerContainer
(KMLC
) and ConcurrentMessageListenerContainer
(CMLC
).
The CMLC
is simply a wrapper for one or more KMLC
s, with the number of KMLC
s specified by the concurrency.
@KafkaListener
always uses a CMLC
.
Each KMLC
gets one Consumer
(and one thread). The thread continually poll()s the consumer
, with the specified pollTimeout
.
How the topics/partitions are distributed across the KMLC
s depends on
- How many partitions the topic(s) have
- The consumer's
partition.assignment.strategy
(opens in a new tab) property
If you have multiple topics with fewer partitions than the concurrency, you will likely need an alternate partition assignor, such as the round robin assignor, otherwise you will have idle containers with no assignment.
-
If we are using 1
@KafkaListener
with 2 Topics, then Spring Kafka creates a singleMessageListenerContainer
, and if I use separate@KafkaListener
annotations, then Spring Kafka creates 2MessageListenerContainers
?A: That is correct; if you explicitly want a different container for each topic, you can provide multiple
@KafkaListener
annotations on the same method. -
Does
MessageListenerContainer
meanconsumer
?A: See my explanation above.
-
If I give concurrency as 4 in
ConcurrentKafkaListenerContainerFactory
, then that means for every@KafkaListener
, I open 4 threads with the broker? That means coordinator sees them as 4 differentconsumers
?A: That is correct - it's the only way to get concurrency with Kafka (without adding very complicated logic to manage offsets).
-
How polling works with
@KafkaListener
? Does it get only 1ConsumerRecord
from the broker every time?A: The number of records returned by each poll depends on a number of consumer properties,
max.poll.records
(opens in a new tab),fetch.min.bytes
(opens in a new tab),fetch.max.wait.ms
(opens in a new tab).
Consumer - Config reference
Kafka Admin
-
Spring Kafka
Kafka Connect
-
Resources
-
Confluent Docs - Connect REST Interface (opens in a new tab)
-
Confluent blog - Easy Ways to Generate Test Data in Kafka (opens in a new tab)
Use
Kafka Connect Datagen Connector
to generate test data
-
Kafka Connect - Config reference
Kafka Connect - Datagen Source Connector to generate mock data
- Datagen Source Connector for Confluent Platform (opens in a new tab)
- How to generate mock data to a Kafka topic using the Datagen Source Connector (opens in a new tab)
Kafka Connect - Cheatsheet
Kafka Connect - Get worker cluster ID, version, and git source code commit ID
curl ${KAFKA_CONNECT_HOST}:8083/ | jq
Kafka Connect - List the connector plugins available on a worker
-
Connector plugins are JARs deployed to Kafka Connect, inactive but available to be used.
curl ${KAFKA_CONNECT_HOST}:8083/connector-plugins | jq
Kafka Connect - List active connectors on a worker
-
Connectors are connector plugins that are currently running.
curl ${KAFKA_CONNECT_HOST}:8083/connectors | jq
REST Proxy
Authentication
- REST Proxy default admin credentials:
admin/admin_pw
Observability
Observability - JMX
Spring Kafka
@EnableKafka
- Enables detection of
@KafkaListener
annotations on any Spring-managed bean
@KafkaListener
- A method annotated with
@KafkaListener
is called aMethodKafkaListenerEndpoint
, which is an implementation ofKafkaListenerEndpoint
.
Topics
- Topic is a logical grouping of partitions.
- Each partition has its own offsets, which are referenced by consumer groups to track their own message consumption progress.
Auto creating topics
-
Define a
NewTopic
bean.@Bean NewTopic topic(@Value("${spring.kafka.template.default-topic}") String defaultTopic) { return TopicBuilder.name(defaultTopic) .partitions(1) .replicas(1) .build(); }
-
NewTopic
beans are picked up byKafkaAdmin
only whenauto-create
is on.# Default value is true spring.kafka.admin.auto-create=true
Disable auto creating topics
-
Broker
auto.create.topics.enable (opens in a new tab)
-
Docker:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
-
-
Consumer
allow.auto.create.topics (opens in a new tab)
auto.offset.reset (opens in a new tab)
This configuration is mostly applicable when a consumer group (usually a new one) reads from a topic for the very first time and the consumer instances within the group need to know exactly where to start reading from the topic’s partitions.
-
Admin client
-
Spring Kafka
For Spring only, automatically create topics during context initialization by looking for all
NewTopics
beans.spring.kafka.admin.auto-create=false
-
Spring Kafka - Serde
Deserializer
config comes fromConsumerFactory
Serializer
config comes fromProducerFactory
Spring Kafka - Logging
Spring Kafka - Logging - Producer
logging.level.org.springframework.kafka.core=trace
2025-01-23T21:27:15.110+11:00 TRACE 796460 --- [ main] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = false), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.263+11:00 TRACE 796460 --- [ main] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null)
2025-01-23T21:27:15.277+11:00 TRACE 796460 --- [ad | producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=ccom.notification-sms-v1.dev, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=5366cbc9-26e4-4e63-9512-2d9d14d7ffbb, value={"metadata": {"nabEventVersion": "1.0.0", "type": "SMS", "source": "Source", "subject": "Subject", "id": "a66c5a52-67a8-43e6-a049-e1803a2b4c18", "time": "2022-01-01T00:00:00Z", "correlationId": "1af65815-3509-4b64-bbb8-a9a6d11302b6", "producer": "Producer", "traceId": "6dbb61ea-9522-4883-8037-6422f80fae8b"}, "data": {"apiMetaData": null, "messageId": "bd596ee0-6980-4d3c-8600-a4bec7fb3f5a", "account": "1234567890", "destination": "1234567890", "message": {"body": "Hello World", "template": {"path": "/path/to/template", "context": null}}}}, timestamp=null), metadata: ccom.notification-sms-v1.dev-0@0
References
-
Kafka, Kafka Streams and Kafka Connect: What’s the difference? (opens in a new tab)
-
Confluent Docs - Supported Versions and Interoperability (opens in a new tab)
Confluent Platform and Apache Kafka compatibility matrix
-
IBM Developer - Apache Kafka - Resources and Tools (opens in a new tab)
Great collection of articles and resources
Resources - Examples
- GitHub - confluentinc/cp-demo (opens in a new tab)
- GitHub - confluentinc/examples (opens in a new tab)
Interview
Interview Questions
-
How to implement deduplication of messages?
-
How to ensure ordered message delivery?
Produce messages to the same partition