Mastering Observability with OpenTelemetry: A Comprehensive Guide
In today's distributed systems landscape, understanding what's happening inside your applications is crucial. OpenTelemetry has emerged as the industry standard for observability, providing a vendor-neutral way to collect telemetry data. This comprehensive guide will walk you through implementing observability using OpenTelemetry across multiple languages and backends.
Understanding the Three Pillars of Observability
Before diving into implementation, let's understand the three fundamental pillars of observability that OpenTelemetry addresses:
1. Traces
Traces represent the journey of a request through your distributed system. They consist of spans that show the path and timing of operations across services. Think of traces as the story of what happened to a specific request.
2. Metrics
Metrics are numerical measurements of your system's behavior over time. They include counters, gauges, and histograms that help you understand performance trends and system health.
3. Logs
Logs are timestamped text records of discrete events. When correlated with traces and metrics, they provide detailed context about what was happening at specific moments.
Setting Up OpenTelemetry
Let's start by implementing OpenTelemetry in three popular languages: Node.js, Python, and Go.
Node.js Implementation
First, install the required packages:
npm install @opentelemetry/api \
@opentelemetry/sdk-node \
@opentelemetry/auto-instrumentations-node \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/exporter-metrics-otlp-http \
@opentelemetry/instrumentation-http \
@opentelemetry/instrumentation-express
Create a telemetry configuration file:
// telemetry.js
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { PeriodicExportingMetricReader, ConsoleMetricExporter } = require('@opentelemetry/sdk-metrics');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-http');
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-http');
// Configure resource attributes
const resource = Resource.default().merge(
new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'node-api-service',
[SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
environment: process.env.NODE_ENV || 'development',
}),
);
// Configure trace exporter
const traceExporter = new OTLPTraceExporter({
url: 'http://localhost:4318/v1/traces',
headers: {},
});
// Configure metric exporter
const metricExporter = new OTLPMetricExporter({
url: 'http://localhost:4318/v1/metrics',
headers: {},
});
// Create SDK instance
const sdk = new NodeSDK({
resource,
traceExporter,
metricReader: new PeriodicExportingMetricReader({
exporter: metricExporter,
exportIntervalMillis: 10000,
}),
instrumentations: [
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-fs': {
enabled: false, // Disable fs instrumentation to reduce noise
},
}),
],
});
// Initialize the SDK
sdk.start();
// Graceful shutdown
process.on('SIGTERM', () => {
sdk.shutdown()
.then(() => console.log('Telemetry terminated'))
.catch((error) => console.error('Error terminating telemetry', error))
.finally(() => process.exit(0));
});
module.exports = sdk;
Now, implement it in your Express application:
// app.js
require('./telemetry'); // Initialize telemetry before anything else
const express = require('express');
const { trace, metrics } = require('@opentelemetry/api');
const app = express();
const PORT = process.env.PORT || 3000;
// Get tracer and meter
const tracer = trace.getTracer('node-api-service');
const meter = metrics.getMeter('node-api-service');
// Create custom metrics
const requestCounter = meter.createCounter('http_requests_total', {
description: 'Total number of HTTP requests',
});
const requestDuration = meter.createHistogram('http_request_duration_ms', {
description: 'Duration of HTTP requests in milliseconds',
});
// Middleware to track request metrics
app.use((req, res, next) => {
const startTime = Date.now();
res.on('finish', () => {
const duration = Date.now() - startTime;
const labels = {
method: req.method,
route: req.route?.path || req.path,
status_code: res.statusCode.toString(),
};
requestCounter.add(1, labels);
requestDuration.record(duration, labels);
});
next();
});
// Example endpoint with custom spans
app.get('/api/users/:id', async (req, res) => {
const span = tracer.startSpan('get_user_handler');
try {
// Add custom attributes to the span
span.setAttributes({
'user.id': req.params.id,
'http.method': req.method,
'http.url': req.url,
});
// Simulate database call with nested span
const user = await tracer.startActiveSpan('database_query', async (dbSpan) => {
dbSpan.setAttributes({
'db.system': 'postgresql',
'db.operation': 'SELECT',
'db.statement': 'SELECT * FROM users WHERE id = $1',
});
// Simulate database latency
await new Promise(resolve => setTimeout(resolve, 50));
const userData = {
id: req.params.id,
name: 'John Doe',
email: '[email protected]',
};
dbSpan.end();
return userData;
});
// Add event to span
span.addEvent('user_data_retrieved', {
'user.id': user.id,
});
res.json(user);
} catch (error) {
span.recordException(error);
span.setStatus({ code: 2, message: error.message });
res.status(500).json({ error: 'Internal server error' });
} finally {
span.end();
}
});
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
Python Implementation
Install the required packages:
pip install opentelemetry-distro \
opentelemetry-exporter-otlp \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-requests \
opentelemetry-instrumentation-sqlalchemy
Create the telemetry setup:
# telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def configure_telemetry(service_name: str, service_version: str):
"""Configure OpenTelemetry for the application."""
# Create resource
resource = Resource.create({
ResourceAttributes.SERVICE_NAME: service_name,
ResourceAttributes.SERVICE_VERSION: service_version,
"environment": "production",
})
# Configure tracing
trace_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(trace_provider)
# Configure OTLP trace exporter
otlp_trace_exporter = OTLPSpanExporter(
endpoint="localhost:4317",
insecure=True,
)
span_processor = BatchSpanProcessor(otlp_trace_exporter)
trace_provider.add_span_processor(span_processor)
# Configure metrics
metric_reader = PeriodicExportingMetricReader(
exporter=OTLPMetricExporter(
endpoint="localhost:4317",
insecure=True,
),
export_interval_millis=10000,
)
meter_provider = MeterProvider(
resource=resource,
metric_readers=[metric_reader],
)
metrics.set_meter_provider(meter_provider)
# Enable auto-instrumentation
FastAPIInstrumentor.instrument()
RequestsInstrumentor.instrument()
logger.info(f"Telemetry configured for {service_name} v{service_version}")
return trace.get_tracer(service_name), metrics.get_meter(service_name)
Implement in a FastAPI application:
# main.py
from fastapi import FastAPI, HTTPException
from telemetry import configure_telemetry
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
from contextlib import asynccontextmanager
import asyncio
import httpx
from datetime import datetime
from typing import Dict, Any
# Configure telemetry
tracer, meter = configure_telemetry("python-api-service", "1.0.0")
# Create custom metrics
request_counter = meter.create_counter(
name="api_requests_total",
description="Total number of API requests",
unit="1",
)
request_duration = meter.create_histogram(
name="api_request_duration_seconds",
description="API request duration in seconds",
unit="s",
)
# Create FastAPI app with lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
yield
# Shutdown - cleanup if needed
app = FastAPI(lifespan=lifespan)
# Custom middleware for metrics
@app.middleware("http")
async def add_metrics(request, call_next):
start_time = datetime.now()
response = await call_next(request)
duration = (datetime.now() - start_time).total_seconds()
labels = {
"method": request.method,
"endpoint": request.url.path,
"status": str(response.status_code),
}
request_counter.add(1, labels)
request_duration.record(duration, labels)
return response
@app.get("/api/products/{product_id}")
async def get_product(product_id: str) -> Dict[str, Any]:
"""Get product details with distributed tracing."""
with tracer.start_as_current_span("get_product_handler") as span:
# Add span attributes
span.set_attributes({
"product.id": product_id,
"http.method": "GET",
"service.layer": "api",
})
try:
# Simulate fetching from cache
cache_result = await fetch_from_cache(product_id)
if cache_result:
span.add_event("cache_hit", {"product.id": product_id})
return cache_result
span.add_event("cache_miss", {"product.id": product_id})
# Fetch from database
product = await fetch_from_database(product_id)
# Call inventory service
inventory = await check_inventory_service(product_id)
result = {
**product,
"inventory": inventory,
}
# Update cache
await update_cache(product_id, result)
return result
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR))
raise HTTPException(status_code=500, detail=str(e))
async def fetch_from_cache(product_id: str) -> Dict[str, Any] | None:
"""Simulate cache lookup with tracing."""
with tracer.start_as_current_span("cache_lookup") as span:
span.set_attributes({
"cache.type": "redis",
"cache.operation": "get",
"cache.key": f"product:{product_id}",
})
# Simulate cache latency
await asyncio.sleep(0.01)
# Simulate 30% cache hit rate
import random
if random.random() < 0.3:
return {
"id": product_id,
"name": "Cached Product",
"price": 99.99,
}
return None
async def fetch_from_database(product_id: str) -> Dict[str, Any]:
"""Simulate database query with tracing."""
with tracer.start_as_current_span("database_query") as span:
span.set_attributes({
"db.system": "postgresql",
"db.operation": "SELECT",
"db.table": "products",
})
# Simulate database latency
await asyncio.sleep(0.05)
return {
"id": product_id,
"name": "Sample Product",
"price": 149.99,
"description": "High-quality product",
}
async def check_inventory_service(product_id: str) -> Dict[str, Any]:
"""Call external inventory service with tracing."""
with tracer.start_as_current_span("inventory_service_call") as span:
span.set_attributes({
"http.method": "GET",
"http.url": f"http://inventory-service/api/stock/{product_id}",
"peer.service": "inventory-service",
})
async with httpx.AsyncClient() as client:
try:
# Simulate external service call
await asyncio.sleep(0.03)
# Mock response
return {
"in_stock": True,
"quantity": 42,
"warehouse": "main",
}
except Exception as e:
span.record_exception(e)
raise
async def update_cache(product_id: str, data: Dict[str, Any]):
"""Update cache with tracing."""
with tracer.start_as_current_span("cache_update") as span:
span.set_attributes({
"cache.type": "redis",
"cache.operation": "set",
"cache.key": f"product:{product_id}",
})
# Simulate cache update
await asyncio.sleep(0.01)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Go Implementation
Create the Go module and install dependencies:
go mod init observability-demo
go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/exporters/otlp/otlptrace
go get go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp
go get go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp
go get go.opentelemetry.io/otel/sdk/trace
go get go.opentelemetry.io/otel/sdk/metric
go get go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp
Implement the telemetry setup:
// telemetry/telemetry.go
package telemetry
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
)
type Telemetry struct {
TracerProvider *sdktrace.TracerProvider
MeterProvider *sdkmetric.MeterProvider
Tracer trace.Tracer
Meter metric.Meter
}
func InitTelemetry(ctx context.Context, serviceName, serviceVersion string) (*Telemetry, error) {
// Create resource
res, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceName),
semconv.ServiceVersion(serviceVersion),
attribute.String("environment", "production"),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
// Initialize trace exporter
traceExporter, err := otlptrace.New(
ctx,
otlptracehttp.NewClient(
otlptracehttp.WithEndpoint("localhost:4318"),
otlptracehttp.WithInsecure(),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
// Create trace provider
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
// Initialize metric exporter
metricExporter, err := otlpmetrichttp.New(
ctx,
otlpmetrichttp.WithEndpoint("localhost:4318"),
otlpmetrichttp.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("failed to create metric exporter: %w", err)
}
// Create meter provider
mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
metricExporter,
sdkmetric.WithInterval(10*time.Second),
),
),
sdkmetric.WithResource(res),
)
otel.SetMeterProvider(mp)
return &Telemetry{
TracerProvider: tp,
MeterProvider: mp,
Tracer: tp.Tracer(serviceName),
Meter: mp.Meter(serviceName),
}, nil
}
func (t *Telemetry) Shutdown(ctx context.Context) error {
var err error
if err = t.TracerProvider.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown tracer provider: %w", err)
}
if err = t.MeterProvider.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown meter provider: %w", err)
}
return nil
}
Create the main application:
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"observability-demo/telemetry"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
type Server struct {
tel *telemetry.Telemetry
requestCounter metric.Int64Counter
requestDuration metric.Float64Histogram
}
type Order struct {
ID string `json:"id"`
CustomerID string `json:"customer_id"`
TotalAmount float64 `json:"total_amount"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
func main() {
ctx := context.Background()
// Initialize telemetry
tel, err := telemetry.InitTelemetry(ctx, "go-order-service", "1.0.0")
if err != nil {
log.Fatalf("failed to initialize telemetry: %v", err)
}
defer func() {
if err := tel.Shutdown(ctx); err != nil {
log.Printf("failed to shutdown telemetry: %v", err)
}
}()
// Create metrics
requestCounter, err := tel.Meter.Int64Counter(
"order_service_requests_total",
metric.WithDescription("Total number of requests to order service"),
)
if err != nil {
log.Fatalf("failed to create request counter: %v", err)
}
requestDuration, err := tel.Meter.Float64Histogram(
"order_service_request_duration_seconds",
metric.WithDescription("Duration of requests in seconds"),
)
if err != nil {
log.Fatalf("failed to create request duration histogram: %v", err)
}
server := &Server{
tel: tel,
requestCounter: requestCounter,
requestDuration: requestDuration,
}
// Setup routes with OpenTelemetry instrumentation
mux := http.NewServeMux()
mux.HandleFunc("/api/orders", server.handleOrders)
mux.HandleFunc("/api/orders/", server.handleOrderByID)
handler := otelhttp.NewHandler(mux, "order-service")
log.Println("Starting server on :8080")
if err := http.ListenAndServe(":8080", handler); err != nil {
log.Fatalf("failed to start server: %v", err)
}
}
func (s *Server) handleOrders(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
startTime := time.Now()
// Record metrics
defer func() {
duration := time.Since(startTime).Seconds()
s.requestCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("method", r.Method),
attribute.String("endpoint", "/api/orders"),
attribute.Int("status", http.StatusOK),
))
s.requestDuration.Record(ctx, duration, metric.WithAttributes(
attribute.String("method", r.Method),
attribute.String("endpoint", "/api/orders"),
))
}()
if r.Method == http.MethodPost {
s.createOrder(w, r)
} else if r.Method == http.MethodGet {
s.listOrders(w, r)
} else {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Server) createOrder(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
span := trace.SpanFromContext(ctx)
span.SetAttributes(attribute.String("operation", "create_order"))
// Parse request body
var order Order
if err := json.NewDecoder(r.Body).Decode(&order); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Failed to parse request")
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// Validate order with tracing
ctx, validateSpan := s.tel.Tracer.Start(ctx, "validate_order")
if err := s.validateOrder(ctx, &order); err != nil {
validateSpan.RecordError(err)
validateSpan.SetStatus(codes.Error, "Validation failed")
validateSpan.End()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
validateSpan.End()
// Process payment with tracing
ctx, paymentSpan := s.tel.Tracer.Start(ctx, "process_payment")
paymentSpan.SetAttributes(
attribute.Float64("payment.amount", order.TotalAmount),
attribute.String("payment.currency", "USD"),
)
if err := s.processPayment(ctx, &order); err != nil {
paymentSpan.RecordError(err)
paymentSpan.SetStatus(codes.Error, "Payment failed")
paymentSpan.End()
http.Error(w, "Payment processing failed", http.StatusPaymentRequired)
return
}
paymentSpan.AddEvent("payment_completed", trace.WithAttributes(
attribute.String("transaction_id", fmt.Sprintf("txn_%d", time.Now().Unix())),
))
paymentSpan.End()
// Save order with tracing
ctx, saveSpan := s.tel.Tracer.Start(ctx, "save_order")
saveSpan.SetAttributes(
attribute.String("db.operation", "INSERT"),
attribute.String("db.table", "orders"),
)
order.ID = fmt.Sprintf("order_%d", time.Now().Unix())
order.CreatedAt = time.Now()
order.Status = "confirmed"
// Simulate database save
time.Sleep(20 * time.Millisecond)
saveSpan.End()
// Send response
span.SetStatus(codes.Ok, "Order created successfully")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(order)
}
func (s *Server) validateOrder(ctx context.Context, order *Order) error {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("customer.id", order.CustomerID),
attribute.Float64("order.amount", order.TotalAmount),
)
// Simulate validation
time.Sleep(10 * time.Millisecond)
if order.TotalAmount <= 0 {
return fmt.Errorf("invalid order amount")
}
if order.CustomerID == "" {
return fmt.Errorf("customer ID is required")
}
return nil
}
func (s *Server) processPayment(ctx context.Context, order *Order) error {
span := trace.SpanFromContext(ctx)
// Simulate external payment service call
_, paymentSpan := s.tel.Tracer.Start(ctx, "payment_service_call")
paymentSpan.SetAttributes(
attribute.String("service.name", "payment-gateway"),
attribute.String("http.method", "POST"),
attribute.String("http.url", "https://payment-gateway.com/charge"),
)
// Simulate network latency
time.Sleep(50 * time.Millisecond)
paymentSpan.End()
return nil
}
func (s *Server) listOrders(w http.ResponseWriter, r *http.Request) {
// Implementation for listing orders
orders := []Order{
{
ID: "order_123",
CustomerID: "customer_456",
TotalAmount: 299.99,
Status: "shipped",
CreatedAt: time.Now().Add(-24 * time.Hour),
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(orders)
}
func (s *Server) handleOrderByID(w http.ResponseWriter, r *http.Request) {
// Implementation for getting order by ID
orderID := r.URL.Path[len("/api/orders/"):]
order := Order{
ID: orderID,
CustomerID: "customer_789",
TotalAmount: 149.99,
Status: "delivered",
CreatedAt: time.Now().Add(-48 * time.Hour),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(order)
}
Distributed Tracing Across Microservices
One of OpenTelemetry's most powerful features is distributed tracing across multiple services. Let's implement a trace context propagation example:
// Service A - API Gateway
const axios = require('axios');
const { trace, context, propagation } = require('@opentelemetry/api');
app.get('/api/checkout', async (req, res) => {
const tracer = trace.getTracer('api-gateway');
await tracer.startActiveSpan('checkout_flow', async (span) => {
try {
// Prepare headers with trace context
const headers = {};
propagation.inject(context.active(), headers);
// Call Order Service
const orderResponse = await axios.post(
'http://order-service:8080/api/orders',
req.body,
{ headers }
);
// Call Notification Service
await axios.post(
'http://notification-service:8081/api/notify',
{
orderId: orderResponse.data.id,
customerId: req.body.customerId,
},
{ headers }
);
res.json({ success: true, orderId: orderResponse.data.id });
} catch (error) {
span.recordException(error);
span.setStatus({ code: 2 });
res.status(500).json({ error: 'Checkout failed' });
} finally {
span.end();
}
});
});
// Service B - Order Service
app.post('/api/orders', (req, res) => {
// Extract trace context from incoming request
const parentContext = propagation.extract(context.active(), req.headers);
context.with(parentContext, async () => {
const span = tracer.startSpan('process_order');
try {
// Process order logic
const order = await processOrder(req.body);
span.setStatus({ code: 1 });
res.json(order);
} catch (error) {
span.recordException(error);
span.setStatus({ code: 2 });
res.status(500).json({ error: 'Order processing failed' });
} finally {
span.end();
}
});
});
Integration with Observability Backends
Jaeger Setup
Deploy Jaeger for trace visualization:
# docker-compose.yml
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "14268:14268" # Accept jaeger.thrift
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
- COLLECTOR_OTLP_ENABLED=true
Prometheus Configuration
Configure Prometheus to scrape OpenTelemetry metrics:
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8889']
- job_name: 'node-app'
static_configs:
- targets: ['node-app:9464']
Grafana Dashboards
Create a comprehensive dashboard for your services:
{
"dashboard": {
"title": "Microservices Observability",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "sum(rate(http_requests_total[5m])) by (service, method)"
}
]
},
{
"title": "P95 Latency",
"targets": [
{
"expr": "histogram_quantile(0.95, sum(rate(http_request_duration_ms_bucket[5m])) by (le, service))"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "sum(rate(http_requests_total{status_code=~\"5..\"}[5m])) by (service)"
}
]
}
]
}
}
Custom Instrumentation Best Practices
1. Semantic Conventions
Always use OpenTelemetry semantic conventions for consistent naming:
const { SemanticAttributes } = require('@opentelemetry/semantic-conventions');
span.setAttributes({
[SemanticAttributes.HTTP_METHOD]: req.method,
[SemanticAttributes.HTTP_URL]: req.url,
[SemanticAttributes.HTTP_STATUS_CODE]: res.statusCode,
[SemanticAttributes.HTTP_USER_AGENT]: req.headers['user-agent'],
});
2. Sampling Strategies
Implement intelligent sampling to reduce overhead:
// Custom sampler based on endpoint
type EndpointSampler struct {
defaultRate float64
endpoints map[string]float64
}
func (s *EndpointSampler) ShouldSample(parameters sdktrace.SamplingParameters) sdktrace.SamplingResult {
endpoint := parameters.Attributes.Value("http.target")
rate, exists := s.endpoints[endpoint]
if !exists {
rate = s.defaultRate
}
if rand.Float64() < rate {
return sdktrace.SamplingResult{
Decision: sdktrace.RecordAndSample,
Tracestate: trace.TraceState{},
}
}
return sdktrace.SamplingResult{
Decision: sdktrace.Drop,
}
}
3. Context Propagation
Ensure proper context propagation across async operations:
async def process_batch(items):
# Capture current context
current_context = trace.get_current_span().get_span_context()
async def process_item(item):
# Create child span with parent context
with tracer.start_as_current_span(
"process_item",
context=trace.set_span_in_context(
trace.NonRecordingSpan(current_context)
)
) as span:
span.set_attributes({"item.id": item.id})
# Process item
return await perform_processing(item)
# Process items concurrently
results = await asyncio.gather(*[process_item(item) for item in items])
return results
Production Deployment Considerations
Performance Impact
Minimize observability overhead:
- Batch Export: Use batch processors for efficient data export
- Sampling: Implement head-based sampling for high-traffic services
- Resource Detection: Cache resource attributes to avoid repeated detection
- Async Processing: Use non-blocking exporters
Configuration Management
Use environment variables for flexible configuration:
const config = {
serviceName: process.env.OTEL_SERVICE_NAME || 'unknown-service',
exporterEndpoint: process.env.OTEL_EXPORTER_OTLP_ENDPOINT || 'http://localhost:4318',
samplingRate: parseFloat(process.env.OTEL_SAMPLING_RATE || '1.0'),
enableMetrics: process.env.OTEL_METRICS_ENABLED !== 'false',
enableTraces: process.env.OTEL_TRACES_ENABLED !== 'false',
};
Security Considerations
- Sensitive Data: Implement attribute filtering to prevent logging sensitive information
- Authentication: Use secure endpoints with proper authentication for exporters
- Network Security: Encrypt data in transit using TLS
# Filter sensitive attributes
class SensitiveDataFilter(SpanProcessor):
def on_start(self, span, parent_context):
# Remove sensitive attributes
sensitive_keys = ['password', 'api_key', 'credit_card']
for key in sensitive_keys:
if key in span.attributes:
span.set_attribute(key, '[REDACTED]')
Conclusion
OpenTelemetry provides a comprehensive framework for implementing observability across your entire stack. By following these patterns and best practices, you can gain deep insights into your distributed systems while maintaining performance and security.
Key takeaways:
- Start with auto-instrumentation and add custom instrumentation as needed
- Use semantic conventions for consistency
- Implement proper context propagation for distributed tracing
- Choose appropriate sampling strategies for your traffic patterns
- Monitor the performance impact of your observability implementation
- Secure your telemetry data pipeline
With OpenTelemetry, you're not just collecting data—you're building a foundation for understanding and improving your systems at scale. The investment in proper observability pays dividends when troubleshooting issues, optimizing performance, and ensuring reliability in production environments.