Building Real-time Analytics with Apache Kafka and ClickHouse


Building real-time analytics systems that can process millions of events per second while providing sub-second query performance requires careful architecture and the right tools. Apache Kafka and ClickHouse form a powerful combination for handling high-throughput event processing and analytics at scale.

In this guide, we'll explore how to build a robust real-time analytics pipeline using Kafka for stream processing and ClickHouse for analytical queries. We'll cover everything from data ingestion to visualization, with practical examples and performance optimization techniques.


Key Components of Real-time Analytics

  1. Event Ingestion: Kafka producers and topics
  2. Stream Processing: Kafka Streams applications
  3. Data Storage: ClickHouse tables and engines
  4. Query Optimization: Materialized views and aggregations
  5. Visualization: Real-time dashboards

1. Event Ingestion with Kafka

Set up Kafka producers to ingest events at scale.

Kafka Producer Implementation

import org.apache.kafka.clients.producer.*;

public class EventProducer {
    private final KafkaProducer<String, String> producer;
    private final String topic;

    public EventProducer(String bootstrapServers, String topic) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        this.producer = new KafkaProducer<>(props);
        this.topic = topic;
    }

    public void sendEvent(String key, String value) {
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(topic, key, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Error sending event: " + exception.getMessage());
            }
        });
    }
}

Best Practice: Configure batching and compression for optimal throughput.


2. Stream Processing with Kafka Streams

Process and enrich events in real-time using Kafka Streams.

Stream Processing Implementation

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

public class EventProcessor {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "analytics-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        
        // Read from input topic
        KStream<String, String> events = builder.stream("raw-events");
        
        // Process and aggregate events
        KTable<String, Long> aggregated = events
            .groupBy((key, value) -> extractDimension(value))
            .count();
        
        // Write results to output topic
        aggregated.toStream().to("aggregated-events");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

3. Data Storage in ClickHouse

Design efficient ClickHouse tables for real-time analytics.

Table Schema Definition

CREATE TABLE events
(
    timestamp DateTime,
    user_id String,
    event_type String,
    properties String,
    value Float64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, user_id)
SETTINGS index_granularity = 8192;

-- Materialized view for real-time aggregations
CREATE MATERIALIZED VIEW events_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (event_type, timestamp)
AS SELECT
    event_type,
    toStartOfMinute(timestamp) as timestamp,
    count() as event_count,
    sum(value) as total_value
FROM events
GROUP BY event_type, timestamp;

4. Query Optimization

Implement efficient query patterns for real-time analytics.

ClickHouse Query Optimization

-- Efficient time-series query
SELECT
    toStartOfHour(timestamp) as hour,
    event_type,
    sum(event_count) as total_events,
    sum(total_value) as value_sum
FROM events_mv
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY
    hour,
    event_type
ORDER BY hour DESC;

-- Using prewhere for better performance
SELECT
    user_id,
    count() as event_count
FROM events
PREWHERE timestamp >= now() - INTERVAL 1 HOUR
WHERE event_type = 'purchase'
GROUP BY user_id
HAVING event_count > 10;

5. Real-time Dashboard Implementation

Create a real-time dashboard using WebSocket connections.

WebSocket Server Implementation

import { WebSocket, Server } from 'ws';
import { ClickHouse } from 'clickhouse';

class AnalyticsDashboard {
    private clickhouse: ClickHouse;
    private wss: Server;

    constructor() {
        this.clickhouse = new ClickHouse({
            url: 'http://localhost:8123',
            database: 'analytics'
        });

        this.wss = new Server({ port: 8080 });
        this.setupWebSocket();
        this.startMetricsPolling();
    }

    private setupWebSocket() {
        this.wss.on('connection', (ws) => {
            console.log('New client connected');
            ws.on('close', () => console.log('Client disconnected'));
        });
    }

    private async startMetricsPolling() {
        setInterval(async () => {
            const metrics = await this.fetchLatestMetrics();
            this.broadcastMetrics(metrics);
        }, 1000);
    }

    private async fetchLatestMetrics() {
        const query = `
            SELECT
                event_type,
                sum(event_count) as count,
                max(timestamp) as last_update
            FROM events_mv
            WHERE timestamp >= now() - INTERVAL 1 MINUTE
            GROUP BY event_type
        `;

        return await this.clickhouse.query(query).toPromise();
    }

    private broadcastMetrics(metrics: any) {
        const data = JSON.stringify(metrics);
        this.wss.clients.forEach((client) => {
            if (client.readyState === WebSocket.OPEN) {
                client.send(data);
            }
        });
    }
}

Performance Monitoring and Optimization

  1. Kafka Monitoring

    public class KafkaMonitor {
        public static void monitorLag(String groupId) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            
            try (AdminClient adminClient = AdminClient.create(props)) {
                ListConsumerGroupOffsetsResult result = 
                    adminClient.listConsumerGroupOffsets(groupId);
                Map<TopicPartition, OffsetAndMetadata> offsets = 
                    result.partitionsToOffsetAndMetadata().get();
                
                // Calculate lag for each partition
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
                     offsets.entrySet()) {
                    long consumerOffset = entry.getValue().offset();
                    long producerOffset = getLatestOffset(entry.getKey());
                    long lag = producerOffset - consumerOffset;
                    
                    System.out.printf("Partition %s: lag = %d%n", 
                        entry.getKey(), lag);
                }
            }
        }
    }
    
  2. ClickHouse Performance Metrics

    -- Monitor query performance
    SELECT
        query,
        formatReadableSize(memory_usage) as memory,
        query_duration_ms,
        read_rows,
        written_rows
    FROM system.query_log
    WHERE type = 'QueryFinish'
    ORDER BY query_duration_ms DESC
    LIMIT 10;
    

System Architecture Overview

ComponentTechnologyPurposeScale
IngestionKafkaEvent collectionMillions/sec
ProcessingKafka StreamsReal-time aggregationSub-second
StorageClickHouseAnalytics queriesPetabyte-scale
VisualizationWebSocketReal-time updatesMilliseconds

Conclusion

Building a real-time analytics system with Apache Kafka and ClickHouse provides a robust foundation for handling high-throughput event processing and analytics. By following the patterns and practices outlined in this guide, you can create a scalable system that processes millions of events while maintaining sub-second query performance.

Remember to monitor system performance, optimize queries, and scale components based on your specific requirements. Start with these foundational patterns and iterate based on your actual usage patterns and performance metrics.