Building Real-time Analytics with Apache Kafka and ClickHouse
Building real-time analytics systems that can process millions of events per second while providing sub-second query performance requires careful architecture and the right tools. Apache Kafka and ClickHouse form a powerful combination for handling high-throughput event processing and analytics at scale.
In this guide, we'll explore how to build a robust real-time analytics pipeline using Kafka for stream processing and ClickHouse for analytical queries. We'll cover everything from data ingestion to visualization, with practical examples and performance optimization techniques.
Key Components of Real-time Analytics
- Event Ingestion: Kafka producers and topics
- Stream Processing: Kafka Streams applications
- Data Storage: ClickHouse tables and engines
- Query Optimization: Materialized views and aggregations
- Visualization: Real-time dashboards
1. Event Ingestion with Kafka
Set up Kafka producers to ingest events at scale.
Kafka Producer Implementation
import org.apache.kafka.clients.producer.*;
public class EventProducer {
private final KafkaProducer<String, String> producer;
private final String topic;
public EventProducer(String bootstrapServers, String topic) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
this.producer = new KafkaProducer<>(props);
this.topic = topic;
}
public void sendEvent(String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending event: " + exception.getMessage());
}
});
}
}
Best Practice: Configure batching and compression for optimal throughput.
2. Stream Processing with Kafka Streams
Process and enrich events in real-time using Kafka Streams.
Stream Processing Implementation
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
public class EventProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "analytics-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> events = builder.stream("raw-events");
// Process and aggregate events
KTable<String, Long> aggregated = events
.groupBy((key, value) -> extractDimension(value))
.count();
// Write results to output topic
aggregated.toStream().to("aggregated-events");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
3. Data Storage in ClickHouse
Design efficient ClickHouse tables for real-time analytics.
Table Schema Definition
CREATE TABLE events
(
timestamp DateTime,
user_id String,
event_type String,
properties String,
value Float64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, user_id)
SETTINGS index_granularity = 8192;
-- Materialized view for real-time aggregations
CREATE MATERIALIZED VIEW events_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (event_type, timestamp)
AS SELECT
event_type,
toStartOfMinute(timestamp) as timestamp,
count() as event_count,
sum(value) as total_value
FROM events
GROUP BY event_type, timestamp;
4. Query Optimization
Implement efficient query patterns for real-time analytics.
ClickHouse Query Optimization
-- Efficient time-series query
SELECT
toStartOfHour(timestamp) as hour,
event_type,
sum(event_count) as total_events,
sum(total_value) as value_sum
FROM events_mv
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY
hour,
event_type
ORDER BY hour DESC;
-- Using prewhere for better performance
SELECT
user_id,
count() as event_count
FROM events
PREWHERE timestamp >= now() - INTERVAL 1 HOUR
WHERE event_type = 'purchase'
GROUP BY user_id
HAVING event_count > 10;
5. Real-time Dashboard Implementation
Create a real-time dashboard using WebSocket connections.
WebSocket Server Implementation
import { WebSocket, Server } from 'ws';
import { ClickHouse } from 'clickhouse';
class AnalyticsDashboard {
private clickhouse: ClickHouse;
private wss: Server;
constructor() {
this.clickhouse = new ClickHouse({
url: 'http://localhost:8123',
database: 'analytics'
});
this.wss = new Server({ port: 8080 });
this.setupWebSocket();
this.startMetricsPolling();
}
private setupWebSocket() {
this.wss.on('connection', (ws) => {
console.log('New client connected');
ws.on('close', () => console.log('Client disconnected'));
});
}
private async startMetricsPolling() {
setInterval(async () => {
const metrics = await this.fetchLatestMetrics();
this.broadcastMetrics(metrics);
}, 1000);
}
private async fetchLatestMetrics() {
const query = `
SELECT
event_type,
sum(event_count) as count,
max(timestamp) as last_update
FROM events_mv
WHERE timestamp >= now() - INTERVAL 1 MINUTE
GROUP BY event_type
`;
return await this.clickhouse.query(query).toPromise();
}
private broadcastMetrics(metrics: any) {
const data = JSON.stringify(metrics);
this.wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
}
Performance Monitoring and Optimization
-
Kafka Monitoring
public class KafkaMonitor { public static void monitorLag(String groupId) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); try (AdminClient adminClient = AdminClient.create(props)) { ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets(groupId); Map<TopicPartition, OffsetAndMetadata> offsets = result.partitionsToOffsetAndMetadata().get(); // Calculate lag for each partition for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { long consumerOffset = entry.getValue().offset(); long producerOffset = getLatestOffset(entry.getKey()); long lag = producerOffset - consumerOffset; System.out.printf("Partition %s: lag = %d%n", entry.getKey(), lag); } } } }
-
ClickHouse Performance Metrics
-- Monitor query performance SELECT query, formatReadableSize(memory_usage) as memory, query_duration_ms, read_rows, written_rows FROM system.query_log WHERE type = 'QueryFinish' ORDER BY query_duration_ms DESC LIMIT 10;
System Architecture Overview
Component | Technology | Purpose | Scale |
---|---|---|---|
Ingestion | Kafka | Event collection | Millions/sec |
Processing | Kafka Streams | Real-time aggregation | Sub-second |
Storage | ClickHouse | Analytics queries | Petabyte-scale |
Visualization | WebSocket | Real-time updates | Milliseconds |
Conclusion
Building a real-time analytics system with Apache Kafka and ClickHouse provides a robust foundation for handling high-throughput event processing and analytics. By following the patterns and practices outlined in this guide, you can create a scalable system that processes millions of events while maintaining sub-second query performance.
Remember to monitor system performance, optimize queries, and scale components based on your specific requirements. Start with these foundational patterns and iterate based on your actual usage patterns and performance metrics.