Event-Driven Architecture with Apache Kafka: A Comprehensive Guide
Apache Kafka has emerged as the de facto standard for building event-driven architectures at scale. Originally developed at LinkedIn and now maintained by the Apache Software Foundation, Kafka provides a distributed streaming platform that enables real-time data processing, event sourcing, and microservices communication. This comprehensive guide explores Kafka's fundamentals and demonstrates practical implementations for modern distributed systems.
In this article, we'll dive deep into Kafka's architecture, explore real-world patterns like CQRS and event sourcing, and provide production-ready code examples in both Java and Node.js.
Kafka Fundamentals
Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant messaging. At its core, Kafka acts as a distributed commit log where events are stored durably and can be consumed by multiple applications.
Core Components
- Brokers: Kafka servers that store and manage events
- Topics: Logical channels for organizing events
- Partitions: Physical divisions of topics for parallelism
- Producers: Applications that publish events
- Consumers: Applications that subscribe to events
- ZooKeeper/KRaft: Coordination service for cluster management
Topics and Partitions
Topics are the fundamental unit of organization in Kafka. Each topic is divided into partitions, which are ordered, immutable sequences of records.
// Creating a topic with Java Admin API
import org.apache.kafka.clients.admin.*;
import java.util.*;
public class TopicManager {
public static void createTopic(String topicName, int partitions, short replicationFactor) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
// Configure topic settings
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "604800000"); // 7 days
configs.put("segment.ms", "86400000"); // 1 day
configs.put("compression.type", "snappy");
newTopic.configs(configs);
CreateTopicsResult result = adminClient.createTopics(
Collections.singleton(newTopic)
);
result.all().get();
System.out.println("Topic created successfully: " + topicName);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Message Keys and Partitioning
Kafka uses message keys to determine partition assignment. Messages with the same key always go to the same partition, ensuring order preservation for related events.
// Node.js producer with custom partitioner
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});
const producer = kafka.producer({
createPartitioner: () => {
return ({ topic, partitionMetadata, message }) => {
// Custom partitioning logic based on customer ID
if (message.key) {
const customerId = message.key.toString();
const numPartitions = partitionMetadata.length;
const hash = customerId.split('').reduce((a, b) => {
return ((a << 5) - a) + b.charCodeAt(0);
}, 0);
return Math.abs(hash) % numPartitions;
}
// Round-robin for messages without keys
return Math.floor(Math.random() * partitionMetadata.length);
};
}
});
async function publishOrder(order) {
await producer.connect();
try {
await producer.send({
topic: 'orders',
messages: [
{
key: order.customerId,
value: JSON.stringify(order),
headers: {
'correlation-id': order.orderId,
'event-type': 'order.created'
}
}
]
});
} finally {
await producer.disconnect();
}
}
Producers and Consumers
Producer Configuration and Best Practices
Producers are responsible for publishing events to Kafka topics. Proper configuration ensures reliability and performance.
// Java producer with advanced configuration
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class EventProducer {
private final KafkaProducer<String, String> producer;
public EventProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance settings
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB
this.producer = new KafkaProducer<>(props);
}
public void publishEvent(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Failed to send message: " + exception.getMessage());
} else {
System.out.printf("Sent message to topic=%s partition=%d offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
public void close() {
producer.close();
}
}
Consumer Groups and Offset Management
Consumers work together in consumer groups to process messages from topics. Each partition is assigned to only one consumer within a group.
// Node.js consumer with manual offset management
const { Kafka } = require('kafkajs');
class EventConsumer {
constructor(groupId) {
this.kafka = new Kafka({
clientId: 'event-processor',
brokers: ['localhost:9092']
});
this.consumer = this.kafka.consumer({
groupId: groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
retry: {
initialRetryTime: 100,
retries: 8
}
});
}
async start(topics, handler) {
await this.consumer.connect();
await this.consumer.subscribe({
topics,
fromBeginning: false
});
await this.consumer.run({
autoCommit: false, // Manual commit for better control
eachMessage: async ({ topic, partition, message }) => {
try {
const event = {
key: message.key?.toString(),
value: JSON.parse(message.value.toString()),
headers: message.headers,
timestamp: message.timestamp,
offset: message.offset,
partition: partition
};
// Process the event
await handler(event);
// Commit offset after successful processing
await this.consumer.commitOffsets([{
topic: topic,
partition: partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
} catch (error) {
console.error('Error processing message:', error);
// Implement retry logic or send to DLQ
}
}
});
}
async stop() {
await this.consumer.disconnect();
}
}
// Usage example
const consumer = new EventConsumer('order-processing-group');
consumer.start(['orders'], async (event) => {
console.log('Processing order:', event.value);
// Process order logic here
});
Microservices Communication with Kafka
Kafka excels at enabling asynchronous communication between microservices, providing loose coupling and scalability.
Event-Driven Choreography Pattern
// Order Service - Publishing domain events
@Service
public class OrderService {
private final EventProducer eventProducer;
@Transactional
public Order createOrder(CreateOrderRequest request) {
// Create order in database
Order order = orderRepository.save(new Order(request));
// Publish order created event
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.items(order.getItems())
.totalAmount(order.getTotalAmount())
.timestamp(Instant.now())
.build();
eventProducer.publish("order-events", order.getId(), event);
return order;
}
}
// Inventory Service - Reacting to order events
@Component
public class InventoryEventHandler {
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderCreated(OrderCreatedEvent event) {
try {
// Reserve inventory
for (OrderItem item : event.getItems()) {
inventoryService.reserveStock(item.getProductId(), item.getQuantity());
}
// Publish inventory reserved event
InventoryReservedEvent reservedEvent = InventoryReservedEvent.builder()
.orderId(event.getOrderId())
.reservationId(UUID.randomUUID().toString())
.items(event.getItems())
.build();
eventProducer.publish("inventory-events", event.getOrderId(), reservedEvent);
} catch (InsufficientStockException e) {
// Publish inventory shortage event
publishInventoryShortageEvent(event.getOrderId(), e.getProductId());
}
}
}
Saga Pattern Implementation
// Saga coordinator for distributed transactions
class OrderSaga {
constructor(producer, consumer) {
this.producer = producer;
this.consumer = consumer;
this.sagaState = new Map();
}
async startOrderSaga(order) {
const sagaId = uuidv4();
// Initialize saga state
this.sagaState.set(sagaId, {
orderId: order.id,
status: 'STARTED',
completedSteps: [],
compensatedSteps: []
});
// Start the saga
await this.producer.send({
topic: 'order-saga-events',
messages: [{
key: sagaId,
value: JSON.stringify({
type: 'SAGA_STARTED',
sagaId: sagaId,
orderId: order.id,
steps: ['RESERVE_INVENTORY', 'PROCESS_PAYMENT', 'CREATE_SHIPMENT']
})
}]
});
}
async handleSagaEvent(event) {
const saga = this.sagaState.get(event.sagaId);
switch (event.type) {
case 'STEP_COMPLETED':
saga.completedSteps.push(event.step);
await this.executeNextStep(saga);
break;
case 'STEP_FAILED':
saga.status = 'COMPENSATING';
await this.startCompensation(saga, event.failedStep);
break;
case 'COMPENSATION_COMPLETED':
saga.compensatedSteps.push(event.step);
await this.continueCompensation(saga);
break;
}
}
async executeNextStep(saga) {
const nextStep = this.getNextStep(saga);
if (!nextStep) {
// All steps completed successfully
saga.status = 'COMPLETED';
await this.publishSagaCompleted(saga);
return;
}
// Execute next step
await this.producer.send({
topic: this.getTopicForStep(nextStep),
messages: [{
key: saga.orderId,
value: JSON.stringify({
sagaId: saga.sagaId,
orderId: saga.orderId,
action: 'EXECUTE',
step: nextStep
})
}]
});
}
}
CQRS and Event Sourcing
Implementing CQRS with Kafka
Command Query Responsibility Segregation (CQRS) separates read and write models, with Kafka as the event backbone.
// Command side - Write model
@RestController
@RequestMapping("/api/products")
public class ProductCommandController {
private final KafkaProducer<String, ProductCommand> commandProducer;
@PostMapping
public ResponseEntity<Void> createProduct(@RequestBody CreateProductCommand command) {
command.setCommandId(UUID.randomUUID().toString());
command.setTimestamp(Instant.now());
ProducerRecord<String, ProductCommand> record =
new ProducerRecord<>("product-commands", command.getProductId(), command);
commandProducer.send(record);
return ResponseEntity.accepted()
.header("X-Command-Id", command.getCommandId())
.build();
}
}
// Command processor
@Component
public class ProductCommandProcessor {
@KafkaListener(topics = "product-commands")
public void processCommand(ProductCommand command) {
switch (command.getType()) {
case CREATE:
Product product = new Product(command);
productRepository.save(product);
publishProductCreatedEvent(product);
break;
case UPDATE:
updateProduct(command);
break;
case DELETE:
deleteProduct(command);
break;
}
}
private void publishProductCreatedEvent(Product product) {
ProductCreatedEvent event = new ProductCreatedEvent(
product.getId(),
product.getName(),
product.getPrice(),
product.getDescription()
);
eventProducer.send("product-events", product.getId(), event);
}
}
// Query side - Read model projection
@Component
public class ProductProjection {
private final ProductViewRepository viewRepository;
@KafkaListener(topics = "product-events")
public void handleProductEvent(ProductEvent event) {
switch (event.getType()) {
case "ProductCreated":
createProductView((ProductCreatedEvent) event);
break;
case "ProductUpdated":
updateProductView((ProductUpdatedEvent) event);
break;
case "ProductDeleted":
deleteProductView((ProductDeletedEvent) event);
break;
}
}
private void createProductView(ProductCreatedEvent event) {
ProductView view = ProductView.builder()
.id(event.getProductId())
.name(event.getName())
.price(event.getPrice())
.description(event.getDescription())
.lastUpdated(event.getTimestamp())
.build();
viewRepository.save(view);
}
}
Event Sourcing Implementation
// Event store using Kafka
class EventStore {
constructor(kafka) {
this.producer = kafka.producer();
this.admin = kafka.admin();
}
async saveEvents(aggregateId, events, expectedVersion) {
const messages = events.map((event, index) => ({
key: aggregateId,
value: JSON.stringify({
aggregateId: aggregateId,
eventType: event.constructor.name,
eventData: event,
eventVersion: expectedVersion + index + 1,
timestamp: new Date().toISOString()
}),
headers: {
'aggregate-type': event.aggregateType,
'event-version': (expectedVersion + index + 1).toString()
}
}));
await this.producer.send({
topic: 'event-store',
messages: messages,
acks: -1 // Wait for all in-sync replicas
});
}
async loadEvents(aggregateId, fromVersion = 0) {
const consumer = this.kafka.consumer({
groupId: `event-store-reader-${Date.now()}`
});
await consumer.connect();
await consumer.subscribe({
topic: 'event-store',
fromBeginning: true
});
const events = [];
let resolvePromise;
const eventsPromise = new Promise(resolve => resolvePromise = resolve);
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
if (event.aggregateId === aggregateId && event.eventVersion > fromVersion) {
events.push(event);
}
// Check if we've reached the end
if (message.offset === message.highWaterOffset - 1) {
resolvePromise();
}
}
});
await eventsPromise;
await consumer.disconnect();
return events.sort((a, b) => a.eventVersion - b.eventVersion);
}
}
// Aggregate implementation
class OrderAggregate {
constructor(id) {
this.id = id;
this.version = 0;
this.uncommittedEvents = [];
this.status = 'PENDING';
this.items = [];
this.totalAmount = 0;
}
static async load(id, eventStore) {
const aggregate = new OrderAggregate(id);
const events = await eventStore.loadEvents(id);
for (const event of events) {
aggregate.applyEvent(event.eventData, false);
aggregate.version = event.eventVersion;
}
return aggregate;
}
createOrder(customerId, items) {
if (this.status !== 'PENDING') {
throw new Error('Order already created');
}
const event = {
aggregateType: 'Order',
customerId: customerId,
items: items,
totalAmount: items.reduce((sum, item) => sum + item.price * item.quantity, 0)
};
this.applyEvent(event, true);
}
applyEvent(event, isNew = true) {
switch (event.constructor.name || event.aggregateType) {
case 'OrderCreated':
this.status = 'CREATED';
this.customerId = event.customerId;
this.items = event.items;
this.totalAmount = event.totalAmount;
break;
// Handle other events...
}
if (isNew) {
this.uncommittedEvents.push(event);
}
}
async save(eventStore) {
if (this.uncommittedEvents.length === 0) return;
await eventStore.saveEvents(this.id, this.uncommittedEvents, this.version);
this.version += this.uncommittedEvents.length;
this.uncommittedEvents = [];
}
}
Kafka Streams
Kafka Streams provides a powerful library for building streaming applications that process data in real-time.
Stream Processing Fundamentals
// Real-time order analytics with Kafka Streams
public class OrderAnalyticsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Stream of orders
KStream<String, Order> orders = builder.stream("orders",
Consumed.with(Serdes.String(), orderSerde));
// Calculate order totals by customer
KTable<String, Double> customerTotals = orders
.groupBy((key, order) -> order.getCustomerId())
.aggregate(
() -> 0.0,
(customerId, order, total) -> total + order.getTotalAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-totals")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
// Windowed aggregations - Orders per hour
KTable<Windowed<String>, Long> ordersPerHour = orders
.groupBy((key, order) -> "all")
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.count(Materialized.as("orders-per-hour"));
// Join with customer data
KTable<String, Customer> customers = builder.table("customers",
Consumed.with(Serdes.String(), customerSerde));
KStream<String, EnrichedOrder> enrichedOrders = orders
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderSerde, customerSerde)
);
// Output enriched orders
enrichedOrders.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde));
// Build and start the streams application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Stateful Stream Processing
// Fraud detection with stateful processing
public class FraudDetectionProcessor {
public static Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
// Transaction stream
KStream<String, Transaction> transactions = builder.stream("transactions");
// State store for tracking user behavior
StoreBuilder<KeyValueStore<String, UserProfile>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-profiles"),
Serdes.String(),
userProfileSerde
);
builder.addStateStore(storeBuilder);
// Process transactions with state
KStream<String, FraudAlert> fraudAlerts = transactions
.transform(() -> new FraudDetectionTransformer(), "user-profiles");
fraudAlerts.to("fraud-alerts");
return builder.build();
}
static class FraudDetectionTransformer
implements Transformer<String, Transaction, KeyValue<String, FraudAlert>> {
private KeyValueStore<String, UserProfile> stateStore;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<String, UserProfile>)
context.getStateStore("user-profiles");
}
@Override
public KeyValue<String, FraudAlert> transform(String key, Transaction transaction) {
String userId = transaction.getUserId();
UserProfile profile = stateStore.get(userId);
if (profile == null) {
profile = new UserProfile(userId);
}
// Update profile with new transaction
profile.addTransaction(transaction);
// Check for fraud patterns
if (isSuspicious(transaction, profile)) {
FraudAlert alert = FraudAlert.builder()
.userId(userId)
.transactionId(transaction.getId())
.reason(detectFraudPattern(transaction, profile))
.riskScore(calculateRiskScore(transaction, profile))
.timestamp(context.timestamp())
.build();
// Update state
stateStore.put(userId, profile);
return KeyValue.pair(userId, alert);
}
// Update state even if no fraud detected
stateStore.put(userId, profile);
return null; // No alert
}
private boolean isSuspicious(Transaction transaction, UserProfile profile) {
// Velocity check - too many transactions in short time
if (profile.getTransactionCount(Duration.ofMinutes(5)) > 5) {
return true;
}
// Location anomaly
if (profile.getLastLocation() != null &&
calculateDistance(profile.getLastLocation(), transaction.getLocation()) > 1000) {
return true;
}
// Amount anomaly
if (transaction.getAmount() > profile.getAverageAmount() * 3) {
return true;
}
return false;
}
}
}
Kafka Connect
Kafka Connect provides a framework for streaming data between Kafka and external systems.
Source Connector Configuration
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "orders_db",
"database.server.name": "postgres",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"publication.name": "orders_publication",
"slot.name": "orders_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "postgres.public.(.*)",
"transforms.route.replacement": "$1-events"
}
}
Custom Sink Connector
// Custom Elasticsearch sink connector
public class ElasticsearchSinkConnector extends SinkConnector {
private Map<String, String> configProps;
@Override
public void start(Map<String, String> props) {
this.configProps = props;
}
@Override
public Class<? extends Task> taskClass() {
return ElasticsearchSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
configs.add(configProps);
}
return configs;
}
}
public class ElasticsearchSinkTask extends SinkTask {
private RestHighLevelClient esClient;
@Override
public void start(Map<String, String> props) {
String esHosts = props.get("elasticsearch.hosts");
this.esClient = new RestHighLevelClient(
RestClient.builder(HttpHost.create(esHosts))
);
}
@Override
public void put(Collection<SinkRecord> records) {
BulkRequest bulkRequest = new BulkRequest();
for (SinkRecord record : records) {
String index = getIndex(record);
String id = record.key().toString();
String document = record.value().toString();
IndexRequest indexRequest = new IndexRequest(index)
.id(id)
.source(document, XContentType.JSON);
bulkRequest.add(indexRequest);
}
try {
BulkResponse response = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response.hasFailures()) {
throw new ConnectException("Failed to index documents: " +
response.buildFailureMessage());
}
} catch (IOException e) {
throw new ConnectException("Error indexing to Elasticsearch", e);
}
}
}
ksqlDB for Stream Processing
ksqlDB provides a SQL interface for stream processing on top of Kafka.
Creating Streams and Tables
-- Create a stream from Kafka topic
CREATE STREAM orders_stream (
order_id VARCHAR KEY,
customer_id VARCHAR,
product_id VARCHAR,
quantity INT,
price DOUBLE,
order_timestamp TIMESTAMP
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'order_timestamp'
);
-- Create a table for customer data
CREATE TABLE customers_table (
customer_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR,
tier VARCHAR,
total_spent DOUBLE
) WITH (
KAFKA_TOPIC = 'customers',
VALUE_FORMAT = 'JSON'
);
-- Join stream with table
CREATE STREAM enriched_orders AS
SELECT
o.order_id,
o.customer_id,
c.name AS customer_name,
c.tier AS customer_tier,
o.product_id,
o.quantity,
o.price,
o.quantity * o.price AS total_amount
FROM orders_stream o
INNER JOIN customers_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
-- Windowed aggregation
CREATE TABLE orders_per_hour AS
SELECT
WINDOWSTART AS window_start,
WINDOWEND AS window_end,
COUNT(*) AS order_count,
SUM(quantity * price) AS total_revenue
FROM orders_stream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY WINDOWSTART, WINDOWEND
EMIT CHANGES;
-- Real-time anomaly detection
CREATE STREAM high_value_orders AS
SELECT * FROM enriched_orders
WHERE total_amount > 1000
AND customer_tier != 'PREMIUM'
EMIT CHANGES;
Push Queries for Real-time Updates
// Node.js client for ksqlDB push queries
const axios = require('axios');
class KsqlDBClient {
constructor(host = 'http://localhost:8088') {
this.host = host;
}
async pushQuery(query) {
const response = await axios({
method: 'post',
url: `${this.host}/query-stream`,
headers: {
'Content-Type': 'application/vnd.ksql.v1+json'
},
data: {
sql: query,
properties: {
'auto.offset.reset': 'latest'
}
},
responseType: 'stream'
});
return response.data;
}
async subscribeToHighValueOrders(callback) {
const stream = await this.pushQuery(
'SELECT * FROM high_value_orders EMIT CHANGES;'
);
let buffer = '';
stream.on('data', chunk => {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
const data = JSON.parse(line);
callback(data);
} catch (e) {
console.error('Failed to parse:', line);
}
}
}
});
}
}
// Usage
const ksqlClient = new KsqlDBClient();
ksqlClient.subscribeToHighValueOrders(order => {
console.log('High value order detected:', order);
// Send notification or trigger workflow
});
Production Best Practices
Cluster Configuration
# Kafka broker configuration for production
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092,SSL://kafka-1.example.com:9093
# Replication and durability
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# Performance tuning
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# Log configuration
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
compression.type=snappy
# Transaction support
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
Security Configuration
// SSL/TLS configuration for producers
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1.example.com:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/kafka/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
props.put("ssl.keystore.location", "/var/kafka/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
// SASL authentication
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"producer-user\" " +
"password=\"producer-password\";");
Monitoring and Alerting
// Prometheus metrics exporter
const { register } = require('prom-client');
const express = require('express');
// Kafka metrics
const kafkaMessagesProduced = new Counter({
name: 'kafka_messages_produced_total',
help: 'Total number of messages produced to Kafka',
labelNames: ['topic', 'status']
});
const kafkaProducerLatency = new Histogram({
name: 'kafka_producer_latency_seconds',
help: 'Kafka producer latency in seconds',
labelNames: ['topic'],
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]
});
const kafkaConsumerLag = new Gauge({
name: 'kafka_consumer_lag',
help: 'Kafka consumer lag in messages',
labelNames: ['topic', 'partition', 'consumer_group']
});
// Monitoring wrapper for producer
class MonitoredProducer {
constructor(kafkaProducer) {
this.producer = kafkaProducer;
}
async send(topic, messages) {
const start = Date.now();
try {
const result = await this.producer.send({
topic,
messages
});
kafkaMessagesProduced.inc({ topic, status: 'success' }, messages.length);
kafkaProducerLatency.observe({ topic }, (Date.now() - start) / 1000);
return result;
} catch (error) {
kafkaMessagesProduced.inc({ topic, status: 'error' }, messages.length);
throw error;
}
}
}
// Expose metrics endpoint
const app = express();
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
Performance Tuning Guidelines
-
Producer Optimization
- Batch messages with
linger.ms
andbatch.size
- Use compression (snappy or lz4)
- Enable idempotence for exactly-once semantics
- Tune
buffer.memory
andmax.block.ms
- Batch messages with
-
Consumer Optimization
- Increase
fetch.min.bytes
for better batching - Tune
max.poll.records
based on processing time - Use parallel processing with consumer groups
- Consider manual offset management for critical applications
- Increase
-
Broker Optimization
- Use dedicated disks for Kafka logs
- Enable OS page cache (leave 50% RAM for cache)
- Tune JVM heap size (typically 4-8GB)
- Monitor and balance partition leadership
-
Network Optimization
- Place Kafka brokers close to producers/consumers
- Use rack awareness for multi-datacenter deployments
- Enable compression at producer level
- Monitor network utilization
Conclusion
Apache Kafka provides a robust foundation for building event-driven architectures at scale. Its distributed nature, high throughput, and rich ecosystem make it ideal for modern microservices architectures, real-time analytics, and event sourcing implementations.
Key takeaways:
- Design topics and partitions based on your scalability requirements
- Use appropriate serialization formats and schemas
- Implement proper error handling and monitoring
- Leverage Kafka Streams and ksqlDB for stream processing
- Follow security best practices in production
- Monitor consumer lag and broker health continuously
As organizations continue to embrace event-driven patterns, Kafka remains a critical component for building resilient, scalable, and real-time data pipelines. Whether you're implementing CQRS, event sourcing, or simple pub-sub messaging, Kafka's flexibility and performance make it an excellent choice for modern distributed systems.