Event-Driven Architecture with Apache Kafka: A Comprehensive Guide


Apache Kafka has emerged as the de facto standard for building event-driven architectures at scale. Originally developed at LinkedIn and now maintained by the Apache Software Foundation, Kafka provides a distributed streaming platform that enables real-time data processing, event sourcing, and microservices communication. This comprehensive guide explores Kafka's fundamentals and demonstrates practical implementations for modern distributed systems.

In this article, we'll dive deep into Kafka's architecture, explore real-world patterns like CQRS and event sourcing, and provide production-ready code examples in both Java and Node.js.


Kafka Fundamentals

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant messaging. At its core, Kafka acts as a distributed commit log where events are stored durably and can be consumed by multiple applications.

Core Components

  1. Brokers: Kafka servers that store and manage events
  2. Topics: Logical channels for organizing events
  3. Partitions: Physical divisions of topics for parallelism
  4. Producers: Applications that publish events
  5. Consumers: Applications that subscribe to events
  6. ZooKeeper/KRaft: Coordination service for cluster management

Topics and Partitions

Topics are the fundamental unit of organization in Kafka. Each topic is divided into partitions, which are ordered, immutable sequences of records.

// Creating a topic with Java Admin API
import org.apache.kafka.clients.admin.*;
import java.util.*;

public class TopicManager {
    public static void createTopic(String topicName, int partitions, short replicationFactor) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        
        try (AdminClient adminClient = AdminClient.create(props)) {
            NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
            
            // Configure topic settings
            Map<String, String> configs = new HashMap<>();
            configs.put("retention.ms", "604800000"); // 7 days
            configs.put("segment.ms", "86400000"); // 1 day
            configs.put("compression.type", "snappy");
            newTopic.configs(configs);
            
            CreateTopicsResult result = adminClient.createTopics(
                Collections.singleton(newTopic)
            );
            
            result.all().get();
            System.out.println("Topic created successfully: " + topicName);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Message Keys and Partitioning

Kafka uses message keys to determine partition assignment. Messages with the same key always go to the same partition, ensuring order preservation for related events.

// Node.js producer with custom partitioner
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092']
});

const producer = kafka.producer({
  createPartitioner: () => {
    return ({ topic, partitionMetadata, message }) => {
      // Custom partitioning logic based on customer ID
      if (message.key) {
        const customerId = message.key.toString();
        const numPartitions = partitionMetadata.length;
        const hash = customerId.split('').reduce((a, b) => {
          return ((a << 5) - a) + b.charCodeAt(0);
        }, 0);
        return Math.abs(hash) % numPartitions;
      }
      // Round-robin for messages without keys
      return Math.floor(Math.random() * partitionMetadata.length);
    };
  }
});

async function publishOrder(order) {
  await producer.connect();
  
  try {
    await producer.send({
      topic: 'orders',
      messages: [
        {
          key: order.customerId,
          value: JSON.stringify(order),
          headers: {
            'correlation-id': order.orderId,
            'event-type': 'order.created'
          }
        }
      ]
    });
  } finally {
    await producer.disconnect();
  }
}

Producers and Consumers

Producer Configuration and Best Practices

Producers are responsible for publishing events to Kafka topics. Proper configuration ensures reliability and performance.

// Java producer with advanced configuration
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class EventProducer {
    private final KafkaProducer<String, String> producer;
    
    public EventProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // Reliability settings
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // Performance settings
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB
        
        this.producer = new KafkaProducer<>(props);
    }
    
    public void publishEvent(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("Failed to send message: " + exception.getMessage());
                } else {
                    System.out.printf("Sent message to topic=%s partition=%d offset=%d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
                }
            }
        });
    }
    
    public void close() {
        producer.close();
    }
}

Consumer Groups and Offset Management

Consumers work together in consumer groups to process messages from topics. Each partition is assigned to only one consumer within a group.

// Node.js consumer with manual offset management
const { Kafka } = require('kafkajs');

class EventConsumer {
  constructor(groupId) {
    this.kafka = new Kafka({
      clientId: 'event-processor',
      brokers: ['localhost:9092']
    });
    
    this.consumer = this.kafka.consumer({
      groupId: groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      maxBytesPerPartition: 1048576, // 1MB
      retry: {
        initialRetryTime: 100,
        retries: 8
      }
    });
  }
  
  async start(topics, handler) {
    await this.consumer.connect();
    await this.consumer.subscribe({ 
      topics, 
      fromBeginning: false 
    });
    
    await this.consumer.run({
      autoCommit: false, // Manual commit for better control
      eachMessage: async ({ topic, partition, message }) => {
        try {
          const event = {
            key: message.key?.toString(),
            value: JSON.parse(message.value.toString()),
            headers: message.headers,
            timestamp: message.timestamp,
            offset: message.offset,
            partition: partition
          };
          
          // Process the event
          await handler(event);
          
          // Commit offset after successful processing
          await this.consumer.commitOffsets([{
            topic: topic,
            partition: partition,
            offset: (parseInt(message.offset) + 1).toString()
          }]);
        } catch (error) {
          console.error('Error processing message:', error);
          // Implement retry logic or send to DLQ
        }
      }
    });
  }
  
  async stop() {
    await this.consumer.disconnect();
  }
}

// Usage example
const consumer = new EventConsumer('order-processing-group');
consumer.start(['orders'], async (event) => {
  console.log('Processing order:', event.value);
  // Process order logic here
});

Microservices Communication with Kafka

Kafka excels at enabling asynchronous communication between microservices, providing loose coupling and scalability.

Event-Driven Choreography Pattern

// Order Service - Publishing domain events
@Service
public class OrderService {
    private final EventProducer eventProducer;
    
    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        // Create order in database
        Order order = orderRepository.save(new Order(request));
        
        // Publish order created event
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            .items(order.getItems())
            .totalAmount(order.getTotalAmount())
            .timestamp(Instant.now())
            .build();
        
        eventProducer.publish("order-events", order.getId(), event);
        
        return order;
    }
}

// Inventory Service - Reacting to order events
@Component
public class InventoryEventHandler {
    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void handleOrderCreated(OrderCreatedEvent event) {
        try {
            // Reserve inventory
            for (OrderItem item : event.getItems()) {
                inventoryService.reserveStock(item.getProductId(), item.getQuantity());
            }
            
            // Publish inventory reserved event
            InventoryReservedEvent reservedEvent = InventoryReservedEvent.builder()
                .orderId(event.getOrderId())
                .reservationId(UUID.randomUUID().toString())
                .items(event.getItems())
                .build();
            
            eventProducer.publish("inventory-events", event.getOrderId(), reservedEvent);
        } catch (InsufficientStockException e) {
            // Publish inventory shortage event
            publishInventoryShortageEvent(event.getOrderId(), e.getProductId());
        }
    }
}

Saga Pattern Implementation

// Saga coordinator for distributed transactions
class OrderSaga {
  constructor(producer, consumer) {
    this.producer = producer;
    this.consumer = consumer;
    this.sagaState = new Map();
  }
  
  async startOrderSaga(order) {
    const sagaId = uuidv4();
    
    // Initialize saga state
    this.sagaState.set(sagaId, {
      orderId: order.id,
      status: 'STARTED',
      completedSteps: [],
      compensatedSteps: []
    });
    
    // Start the saga
    await this.producer.send({
      topic: 'order-saga-events',
      messages: [{
        key: sagaId,
        value: JSON.stringify({
          type: 'SAGA_STARTED',
          sagaId: sagaId,
          orderId: order.id,
          steps: ['RESERVE_INVENTORY', 'PROCESS_PAYMENT', 'CREATE_SHIPMENT']
        })
      }]
    });
  }
  
  async handleSagaEvent(event) {
    const saga = this.sagaState.get(event.sagaId);
    
    switch (event.type) {
      case 'STEP_COMPLETED':
        saga.completedSteps.push(event.step);
        await this.executeNextStep(saga);
        break;
        
      case 'STEP_FAILED':
        saga.status = 'COMPENSATING';
        await this.startCompensation(saga, event.failedStep);
        break;
        
      case 'COMPENSATION_COMPLETED':
        saga.compensatedSteps.push(event.step);
        await this.continueCompensation(saga);
        break;
    }
  }
  
  async executeNextStep(saga) {
    const nextStep = this.getNextStep(saga);
    
    if (!nextStep) {
      // All steps completed successfully
      saga.status = 'COMPLETED';
      await this.publishSagaCompleted(saga);
      return;
    }
    
    // Execute next step
    await this.producer.send({
      topic: this.getTopicForStep(nextStep),
      messages: [{
        key: saga.orderId,
        value: JSON.stringify({
          sagaId: saga.sagaId,
          orderId: saga.orderId,
          action: 'EXECUTE',
          step: nextStep
        })
      }]
    });
  }
}

CQRS and Event Sourcing

Implementing CQRS with Kafka

Command Query Responsibility Segregation (CQRS) separates read and write models, with Kafka as the event backbone.

// Command side - Write model
@RestController
@RequestMapping("/api/products")
public class ProductCommandController {
    private final KafkaProducer<String, ProductCommand> commandProducer;
    
    @PostMapping
    public ResponseEntity<Void> createProduct(@RequestBody CreateProductCommand command) {
        command.setCommandId(UUID.randomUUID().toString());
        command.setTimestamp(Instant.now());
        
        ProducerRecord<String, ProductCommand> record = 
            new ProducerRecord<>("product-commands", command.getProductId(), command);
        
        commandProducer.send(record);
        
        return ResponseEntity.accepted()
            .header("X-Command-Id", command.getCommandId())
            .build();
    }
}

// Command processor
@Component
public class ProductCommandProcessor {
    @KafkaListener(topics = "product-commands")
    public void processCommand(ProductCommand command) {
        switch (command.getType()) {
            case CREATE:
                Product product = new Product(command);
                productRepository.save(product);
                publishProductCreatedEvent(product);
                break;
            case UPDATE:
                updateProduct(command);
                break;
            case DELETE:
                deleteProduct(command);
                break;
        }
    }
    
    private void publishProductCreatedEvent(Product product) {
        ProductCreatedEvent event = new ProductCreatedEvent(
            product.getId(),
            product.getName(),
            product.getPrice(),
            product.getDescription()
        );
        
        eventProducer.send("product-events", product.getId(), event);
    }
}

// Query side - Read model projection
@Component
public class ProductProjection {
    private final ProductViewRepository viewRepository;
    
    @KafkaListener(topics = "product-events")
    public void handleProductEvent(ProductEvent event) {
        switch (event.getType()) {
            case "ProductCreated":
                createProductView((ProductCreatedEvent) event);
                break;
            case "ProductUpdated":
                updateProductView((ProductUpdatedEvent) event);
                break;
            case "ProductDeleted":
                deleteProductView((ProductDeletedEvent) event);
                break;
        }
    }
    
    private void createProductView(ProductCreatedEvent event) {
        ProductView view = ProductView.builder()
            .id(event.getProductId())
            .name(event.getName())
            .price(event.getPrice())
            .description(event.getDescription())
            .lastUpdated(event.getTimestamp())
            .build();
        
        viewRepository.save(view);
    }
}

Event Sourcing Implementation

// Event store using Kafka
class EventStore {
  constructor(kafka) {
    this.producer = kafka.producer();
    this.admin = kafka.admin();
  }
  
  async saveEvents(aggregateId, events, expectedVersion) {
    const messages = events.map((event, index) => ({
      key: aggregateId,
      value: JSON.stringify({
        aggregateId: aggregateId,
        eventType: event.constructor.name,
        eventData: event,
        eventVersion: expectedVersion + index + 1,
        timestamp: new Date().toISOString()
      }),
      headers: {
        'aggregate-type': event.aggregateType,
        'event-version': (expectedVersion + index + 1).toString()
      }
    }));
    
    await this.producer.send({
      topic: 'event-store',
      messages: messages,
      acks: -1 // Wait for all in-sync replicas
    });
  }
  
  async loadEvents(aggregateId, fromVersion = 0) {
    const consumer = this.kafka.consumer({ 
      groupId: `event-store-reader-${Date.now()}` 
    });
    
    await consumer.connect();
    await consumer.subscribe({ 
      topic: 'event-store', 
      fromBeginning: true 
    });
    
    const events = [];
    let resolvePromise;
    const eventsPromise = new Promise(resolve => resolvePromise = resolve);
    
    await consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value.toString());
        
        if (event.aggregateId === aggregateId && event.eventVersion > fromVersion) {
          events.push(event);
        }
        
        // Check if we've reached the end
        if (message.offset === message.highWaterOffset - 1) {
          resolvePromise();
        }
      }
    });
    
    await eventsPromise;
    await consumer.disconnect();
    
    return events.sort((a, b) => a.eventVersion - b.eventVersion);
  }
}

// Aggregate implementation
class OrderAggregate {
  constructor(id) {
    this.id = id;
    this.version = 0;
    this.uncommittedEvents = [];
    this.status = 'PENDING';
    this.items = [];
    this.totalAmount = 0;
  }
  
  static async load(id, eventStore) {
    const aggregate = new OrderAggregate(id);
    const events = await eventStore.loadEvents(id);
    
    for (const event of events) {
      aggregate.applyEvent(event.eventData, false);
      aggregate.version = event.eventVersion;
    }
    
    return aggregate;
  }
  
  createOrder(customerId, items) {
    if (this.status !== 'PENDING') {
      throw new Error('Order already created');
    }
    
    const event = {
      aggregateType: 'Order',
      customerId: customerId,
      items: items,
      totalAmount: items.reduce((sum, item) => sum + item.price * item.quantity, 0)
    };
    
    this.applyEvent(event, true);
  }
  
  applyEvent(event, isNew = true) {
    switch (event.constructor.name || event.aggregateType) {
      case 'OrderCreated':
        this.status = 'CREATED';
        this.customerId = event.customerId;
        this.items = event.items;
        this.totalAmount = event.totalAmount;
        break;
      // Handle other events...
    }
    
    if (isNew) {
      this.uncommittedEvents.push(event);
    }
  }
  
  async save(eventStore) {
    if (this.uncommittedEvents.length === 0) return;
    
    await eventStore.saveEvents(this.id, this.uncommittedEvents, this.version);
    this.version += this.uncommittedEvents.length;
    this.uncommittedEvents = [];
  }
}

Kafka Streams

Kafka Streams provides a powerful library for building streaming applications that process data in real-time.

Stream Processing Fundamentals

// Real-time order analytics with Kafka Streams
public class OrderAnalyticsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Stream of orders
        KStream<String, Order> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), orderSerde));
        
        // Calculate order totals by customer
        KTable<String, Double> customerTotals = orders
            .groupBy((key, order) -> order.getCustomerId())
            .aggregate(
                () -> 0.0,
                (customerId, order, total) -> total + order.getTotalAmount(),
                Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-totals")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(Serdes.Double())
            );
        
        // Windowed aggregations - Orders per hour
        KTable<Windowed<String>, Long> ordersPerHour = orders
            .groupBy((key, order) -> "all")
            .windowedBy(TimeWindows.of(Duration.ofHours(1)))
            .count(Materialized.as("orders-per-hour"));
        
        // Join with customer data
        KTable<String, Customer> customers = builder.table("customers",
            Consumed.with(Serdes.String(), customerSerde));
        
        KStream<String, EnrichedOrder> enrichedOrders = orders
            .join(customers,
                (order, customer) -> new EnrichedOrder(order, customer),
                Joined.with(Serdes.String(), orderSerde, customerSerde)
            );
        
        // Output enriched orders
        enrichedOrders.to("enriched-orders",
            Produced.with(Serdes.String(), enrichedOrderSerde));
        
        // Build and start the streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Stateful Stream Processing

// Fraud detection with stateful processing
public class FraudDetectionProcessor {
    public static Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Transaction stream
        KStream<String, Transaction> transactions = builder.stream("transactions");
        
        // State store for tracking user behavior
        StoreBuilder<KeyValueStore<String, UserProfile>> storeBuilder = 
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("user-profiles"),
                Serdes.String(),
                userProfileSerde
            );
        builder.addStateStore(storeBuilder);
        
        // Process transactions with state
        KStream<String, FraudAlert> fraudAlerts = transactions
            .transform(() -> new FraudDetectionTransformer(), "user-profiles");
        
        fraudAlerts.to("fraud-alerts");
        
        return builder.build();
    }
    
    static class FraudDetectionTransformer 
        implements Transformer<String, Transaction, KeyValue<String, FraudAlert>> {
        
        private KeyValueStore<String, UserProfile> stateStore;
        private ProcessorContext context;
        
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.stateStore = (KeyValueStore<String, UserProfile>) 
                context.getStateStore("user-profiles");
        }
        
        @Override
        public KeyValue<String, FraudAlert> transform(String key, Transaction transaction) {
            String userId = transaction.getUserId();
            UserProfile profile = stateStore.get(userId);
            
            if (profile == null) {
                profile = new UserProfile(userId);
            }
            
            // Update profile with new transaction
            profile.addTransaction(transaction);
            
            // Check for fraud patterns
            if (isSuspicious(transaction, profile)) {
                FraudAlert alert = FraudAlert.builder()
                    .userId(userId)
                    .transactionId(transaction.getId())
                    .reason(detectFraudPattern(transaction, profile))
                    .riskScore(calculateRiskScore(transaction, profile))
                    .timestamp(context.timestamp())
                    .build();
                
                // Update state
                stateStore.put(userId, profile);
                
                return KeyValue.pair(userId, alert);
            }
            
            // Update state even if no fraud detected
            stateStore.put(userId, profile);
            return null; // No alert
        }
        
        private boolean isSuspicious(Transaction transaction, UserProfile profile) {
            // Velocity check - too many transactions in short time
            if (profile.getTransactionCount(Duration.ofMinutes(5)) > 5) {
                return true;
            }
            
            // Location anomaly
            if (profile.getLastLocation() != null && 
                calculateDistance(profile.getLastLocation(), transaction.getLocation()) > 1000) {
                return true;
            }
            
            // Amount anomaly
            if (transaction.getAmount() > profile.getAverageAmount() * 3) {
                return true;
            }
            
            return false;
        }
    }
}

Kafka Connect

Kafka Connect provides a framework for streaming data between Kafka and external systems.

Source Connector Configuration

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "orders_db",
    "database.server.name": "postgres",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "publication.name": "orders_publication",
    "slot.name": "orders_slot",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "unwrap,route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "postgres.public.(.*)",
    "transforms.route.replacement": "$1-events"
  }
}

Custom Sink Connector

// Custom Elasticsearch sink connector
public class ElasticsearchSinkConnector extends SinkConnector {
    private Map<String, String> configProps;
    
    @Override
    public void start(Map<String, String> props) {
        this.configProps = props;
    }
    
    @Override
    public Class<? extends Task> taskClass() {
        return ElasticsearchSinkTask.class;
    }
    
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            configs.add(configProps);
        }
        return configs;
    }
}

public class ElasticsearchSinkTask extends SinkTask {
    private RestHighLevelClient esClient;
    
    @Override
    public void start(Map<String, String> props) {
        String esHosts = props.get("elasticsearch.hosts");
        this.esClient = new RestHighLevelClient(
            RestClient.builder(HttpHost.create(esHosts))
        );
    }
    
    @Override
    public void put(Collection<SinkRecord> records) {
        BulkRequest bulkRequest = new BulkRequest();
        
        for (SinkRecord record : records) {
            String index = getIndex(record);
            String id = record.key().toString();
            String document = record.value().toString();
            
            IndexRequest indexRequest = new IndexRequest(index)
                .id(id)
                .source(document, XContentType.JSON);
            
            bulkRequest.add(indexRequest);
        }
        
        try {
            BulkResponse response = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (response.hasFailures()) {
                throw new ConnectException("Failed to index documents: " + 
                    response.buildFailureMessage());
            }
        } catch (IOException e) {
            throw new ConnectException("Error indexing to Elasticsearch", e);
        }
    }
}

ksqlDB for Stream Processing

ksqlDB provides a SQL interface for stream processing on top of Kafka.

Creating Streams and Tables

-- Create a stream from Kafka topic
CREATE STREAM orders_stream (
  order_id VARCHAR KEY,
  customer_id VARCHAR,
  product_id VARCHAR,
  quantity INT,
  price DOUBLE,
  order_timestamp TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'orders',
  VALUE_FORMAT = 'JSON',
  TIMESTAMP = 'order_timestamp'
);

-- Create a table for customer data
CREATE TABLE customers_table (
  customer_id VARCHAR PRIMARY KEY,
  name VARCHAR,
  email VARCHAR,
  tier VARCHAR,
  total_spent DOUBLE
) WITH (
  KAFKA_TOPIC = 'customers',
  VALUE_FORMAT = 'JSON'
);

-- Join stream with table
CREATE STREAM enriched_orders AS
  SELECT 
    o.order_id,
    o.customer_id,
    c.name AS customer_name,
    c.tier AS customer_tier,
    o.product_id,
    o.quantity,
    o.price,
    o.quantity * o.price AS total_amount
  FROM orders_stream o
  INNER JOIN customers_table c 
    ON o.customer_id = c.customer_id
  EMIT CHANGES;

-- Windowed aggregation
CREATE TABLE orders_per_hour AS
  SELECT 
    WINDOWSTART AS window_start,
    WINDOWEND AS window_end,
    COUNT(*) AS order_count,
    SUM(quantity * price) AS total_revenue
  FROM orders_stream
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY WINDOWSTART, WINDOWEND
  EMIT CHANGES;

-- Real-time anomaly detection
CREATE STREAM high_value_orders AS
  SELECT * FROM enriched_orders
  WHERE total_amount > 1000
    AND customer_tier != 'PREMIUM'
  EMIT CHANGES;

Push Queries for Real-time Updates

// Node.js client for ksqlDB push queries
const axios = require('axios');

class KsqlDBClient {
  constructor(host = 'http://localhost:8088') {
    this.host = host;
  }
  
  async pushQuery(query) {
    const response = await axios({
      method: 'post',
      url: `${this.host}/query-stream`,
      headers: {
        'Content-Type': 'application/vnd.ksql.v1+json'
      },
      data: {
        sql: query,
        properties: {
          'auto.offset.reset': 'latest'
        }
      },
      responseType: 'stream'
    });
    
    return response.data;
  }
  
  async subscribeToHighValueOrders(callback) {
    const stream = await this.pushQuery(
      'SELECT * FROM high_value_orders EMIT CHANGES;'
    );
    
    let buffer = '';
    stream.on('data', chunk => {
      buffer += chunk.toString();
      const lines = buffer.split('\n');
      buffer = lines.pop(); // Keep incomplete line in buffer
      
      for (const line of lines) {
        if (line.trim()) {
          try {
            const data = JSON.parse(line);
            callback(data);
          } catch (e) {
            console.error('Failed to parse:', line);
          }
        }
      }
    });
  }
}

// Usage
const ksqlClient = new KsqlDBClient();
ksqlClient.subscribeToHighValueOrders(order => {
  console.log('High value order detected:', order);
  // Send notification or trigger workflow
});

Production Best Practices

Cluster Configuration

# Kafka broker configuration for production
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092,SSL://kafka-1.example.com:9093

# Replication and durability
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# Performance tuning
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# Log configuration
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
compression.type=snappy

# Transaction support
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

Security Configuration

// SSL/TLS configuration for producers
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1.example.com:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/var/kafka/ssl/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
props.put("ssl.keystore.location", "/var/kafka/ssl/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");

// SASL authentication
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"producer-user\" " +
    "password=\"producer-password\";");

Monitoring and Alerting

// Prometheus metrics exporter
const { register } = require('prom-client');
const express = require('express');

// Kafka metrics
const kafkaMessagesProduced = new Counter({
  name: 'kafka_messages_produced_total',
  help: 'Total number of messages produced to Kafka',
  labelNames: ['topic', 'status']
});

const kafkaProducerLatency = new Histogram({
  name: 'kafka_producer_latency_seconds',
  help: 'Kafka producer latency in seconds',
  labelNames: ['topic'],
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]
});

const kafkaConsumerLag = new Gauge({
  name: 'kafka_consumer_lag',
  help: 'Kafka consumer lag in messages',
  labelNames: ['topic', 'partition', 'consumer_group']
});

// Monitoring wrapper for producer
class MonitoredProducer {
  constructor(kafkaProducer) {
    this.producer = kafkaProducer;
  }
  
  async send(topic, messages) {
    const start = Date.now();
    
    try {
      const result = await this.producer.send({
        topic,
        messages
      });
      
      kafkaMessagesProduced.inc({ topic, status: 'success' }, messages.length);
      kafkaProducerLatency.observe({ topic }, (Date.now() - start) / 1000);
      
      return result;
    } catch (error) {
      kafkaMessagesProduced.inc({ topic, status: 'error' }, messages.length);
      throw error;
    }
  }
}

// Expose metrics endpoint
const app = express();
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

Performance Tuning Guidelines

  1. Producer Optimization

    • Batch messages with linger.ms and batch.size
    • Use compression (snappy or lz4)
    • Enable idempotence for exactly-once semantics
    • Tune buffer.memory and max.block.ms
  2. Consumer Optimization

    • Increase fetch.min.bytes for better batching
    • Tune max.poll.records based on processing time
    • Use parallel processing with consumer groups
    • Consider manual offset management for critical applications
  3. Broker Optimization

    • Use dedicated disks for Kafka logs
    • Enable OS page cache (leave 50% RAM for cache)
    • Tune JVM heap size (typically 4-8GB)
    • Monitor and balance partition leadership
  4. Network Optimization

    • Place Kafka brokers close to producers/consumers
    • Use rack awareness for multi-datacenter deployments
    • Enable compression at producer level
    • Monitor network utilization

Conclusion

Apache Kafka provides a robust foundation for building event-driven architectures at scale. Its distributed nature, high throughput, and rich ecosystem make it ideal for modern microservices architectures, real-time analytics, and event sourcing implementations.

Key takeaways:

  • Design topics and partitions based on your scalability requirements
  • Use appropriate serialization formats and schemas
  • Implement proper error handling and monitoring
  • Leverage Kafka Streams and ksqlDB for stream processing
  • Follow security best practices in production
  • Monitor consumer lag and broker health continuously

As organizations continue to embrace event-driven patterns, Kafka remains a critical component for building resilient, scalable, and real-time data pipelines. Whether you're implementing CQRS, event sourcing, or simple pub-sub messaging, Kafka's flexibility and performance make it an excellent choice for modern distributed systems.