MLOps Best Practices: From Model Development to Production
MLOps (Machine Learning Operations) combines machine learning, DevOps, and data engineering to deploy and maintain ML models in production. This guide explores best practices and tools for building robust ML pipelines.
Key Topics
- Model Versioning and Reproducibility
- Continuous Training and Deployment
- Model Serving and Scaling
- Monitoring and Observability
- A/B Testing and Progressive Rollouts
1. Model Versioning and Reproducibility
Ensure your ML experiments are reproducible and traceable.
Using DVC for Model Versioning
# Initialize DVC
!dvc init
# Add data to DVC
!dvc add data/training_data.csv
!dvc add models/model.pkl
# Create ML pipeline
# dvc.yaml
stages:
preprocess:
cmd: python src/preprocess.py
deps:
- src/preprocess.py
- data/raw_data.csv
outs:
- data/processed_data.csv
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed_data.csv
outs:
- models/model.pkl
- metrics.json
metrics:
- metrics.json:
cache: false
# Track experiments with MLflow
import mlflow
mlflow.set_experiment("customer_churn")
with mlflow.start_run():
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("n_estimators", 100)
# Train model
model.fit(X_train, y_train)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
Environment Management
# environment.yml
name: ml-project
channels:
- conda-forge
- defaults
dependencies:
- python=3.9
- pandas
- scikit-learn
- tensorflow
- mlflow
- dvc
- pip
- pip:
- wandb
- great-expectations
2. Continuous Training and Deployment
Implement automated ML pipelines with Kubeflow.
Kubeflow Pipeline Definition
import kfp
from kfp import dsl
@dsl.pipeline(
name='Training Pipeline',
description='End-to-end ML training pipeline'
)
def training_pipeline(
data_path: str,
model_path: str,
hyperparameters: dict
):
# Data preprocessing
preprocess_op = dsl.ContainerOp(
name='preprocess',
image='preprocess-image:latest',
arguments=[
'--data-path', data_path,
'--output-path', 'processed_data'
]
)
# Model training
train_op = dsl.ContainerOp(
name='train',
image='train-image:latest',
arguments=[
'--data-path', preprocess_op.output,
'--model-path', model_path,
'--hyperparameters', hyperparameters
]
)
# Model evaluation
evaluate_op = dsl.ContainerOp(
name='evaluate',
image='evaluate-image:latest',
arguments=[
'--model-path', train_op.output,
'--test-data', 'test_data'
]
)
# Compile and run pipeline
kfp.compiler.Compiler().compile(
training_pipeline,
'pipeline.yaml'
)
Automated Retraining Trigger
from datetime import datetime
import pandas as pd
from sklearn.metrics import accuracy_score
def check_model_performance(
model,
new_data: pd.DataFrame,
threshold: float = 0.9
) -> bool:
"""Check if model needs retraining"""
predictions = model.predict(new_data.drop('target', axis=1))
current_accuracy = accuracy_score(new_data['target'], predictions)
return current_accuracy < threshold
def trigger_retraining(data_drift: bool, performance_drop: bool) -> bool:
"""Decision logic for model retraining"""
return data_drift or performance_drop
# Monitoring script
while True:
# Get new data
new_data = fetch_new_data()
# Check for data drift
drift_detected = detect_data_drift(new_data)
# Check model performance
performance_drop = check_model_performance(
current_model,
new_data
)
if trigger_retraining(drift_detected, performance_drop):
# Trigger Kubeflow pipeline
client = kfp.Client()
client.create_run_from_pipeline_package(
'pipeline.yaml',
arguments={
'data_path': 'new_data_path',
'model_path': 'new_model_path',
'hyperparameters': {'learning_rate': 0.01}
}
)
time.sleep(3600) # Check every hour
3. Model Serving and Scaling
Deploy models with TensorFlow Serving and Kubernetes.
TensorFlow Serving Configuration
# Save model in SavedModel format
import tensorflow as tf
model.save('saved_model/1/')
# Model serving configuration
# serving_config.config
model_config_list {
config {
name: "model"
base_path: "/models/model"
model_platform: "tensorflow"
model_version_policy {
specific {
versions: 1
versions: 2
}
}
}
}
Kubernetes Deployment
# model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-serving
spec:
replicas: 3
selector:
matchLabels:
app: model-serving
template:
metadata:
labels:
app: model-serving
spec:
containers:
- name: tensorflow-serving
image: tensorflow/serving:latest
ports:
- containerPort: 8501
resources:
limits:
cpu: "2"
memory: "4Gi"
requests:
cpu: "1"
memory: "2Gi"
volumeMounts:
- name: model-volume
mountPath: /models
volumes:
- name: model-volume
persistentVolumeClaim:
claimName: model-pvc
---
# model-service.yaml
apiVersion: v1
kind: Service
metadata:
name: model-service
spec:
selector:
app: model-serving
ports:
- port: 8501
targetPort: 8501
type: LoadBalancer
Horizontal Pod Autoscaling
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: model-serving-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: model-serving
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
4. Monitoring and Observability
Implement comprehensive monitoring using Prometheus and Grafana.
Model Monitoring
from prometheus_client import Counter, Histogram, start_http_server
# Define metrics
prediction_counter = Counter(
'model_predictions_total',
'Total number of predictions',
['model_version', 'prediction']
)
latency_histogram = Histogram(
'prediction_latency_seconds',
'Time spent processing prediction'
)
# Monitoring wrapper
def monitor_predictions(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
# Record metrics
prediction_counter.labels(
model_version='v1',
prediction=str(result)
).inc()
latency_histogram.observe(time.time() - start_time)
return result
return wrapper
# Apply to prediction endpoint
@app.route('/predict', methods=['POST'])
@monitor_predictions
def predict():
data = request.get_json()
prediction = model.predict(data['features'])
return jsonify({'prediction': prediction.tolist()})
Monitoring Dashboard
# grafana-dashboard.json
{
"dashboard": {
"title": "Model Monitoring",
"panels": [
{
"title": "Predictions per Second",
"type": "graph",
"targets": [
{
"expr": "rate(model_predictions_total[5m])",
"legendFormat": "{{prediction}}"
}
]
},
{
"title": "Prediction Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(prediction_latency_seconds_bucket[5m]))",
"legendFormat": "p95"
}
]
}
]
}
}
5. A/B Testing and Progressive Rollouts
Implement controlled model rollouts using Istio.
Istio Virtual Service Configuration
# virtual-service.yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: model-routing
spec:
hosts:
- model-service
http:
- route:
- destination:
host: model-service-v1
subset: v1
weight: 90
- destination:
host: model-service-v2
subset: v2
weight: 10
Canary Deployment Script
import kubernetes
from kubernetes import client, config
def update_traffic_split(v1_weight: int, v2_weight: int):
"""Update traffic distribution between model versions"""
config.load_kube_config()
api_client = client.CustomObjectsApi()
virtual_service = {
"apiVersion": "networking.istio.io/v1alpha3",
"kind": "VirtualService",
"metadata": {"name": "model-routing"},
"spec": {
"hosts": ["model-service"],
"http": [{
"route": [
{
"destination": {
"host": "model-service-v1",
"subset": "v1"
},
"weight": v1_weight
},
{
"destination": {
"host": "model-service-v2",
"subset": "v2"
},
"weight": v2_weight
}
]
}]
}
}
api_client.replace_namespaced_custom_object(
group="networking.istio.io",
version="v1alpha3",
namespace="default",
plural="virtualservices",
name="model-routing",
body=virtual_service
)
# Progressive rollout
def canary_rollout(steps: int = 5, interval: int = 3600):
"""Gradually increase traffic to new model version"""
for i in range(steps + 1):
v2_weight = (i * 100) // steps
v1_weight = 100 - v2_weight
update_traffic_split(v1_weight, v2_weight)
print(f"Updated weights: V1={v1_weight}%, V2={v2_weight}%")
time.sleep(interval)
# Execute canary deployment
canary_rollout(steps=5, interval=3600) # 5 steps, 1 hour each
Best Practices Summary
-
Version Control and Reproducibility
- Use DVC for data versioning
- Track experiments with MLflow
- Maintain reproducible environments
-
Continuous Training
- Automate training pipelines
- Monitor data drift
- Implement automated retraining
-
Model Serving
- Use container orchestration
- Implement autoscaling
- Ensure high availability
-
Monitoring
- Track model performance
- Monitor system metrics
- Set up alerting
-
Deployment
- Use progressive rollouts
- Implement A/B testing
- Monitor deployment metrics
Conclusion
MLOps is crucial for successfully deploying and maintaining machine learning models in production. By following these best practices, you can:
- Ensure reproducibility and traceability
- Automate model training and deployment
- Scale your ML infrastructure effectively
- Monitor and maintain model performance
- Safely roll out model updates
Remember that MLOps is an iterative process. Start with the basics and gradually implement more sophisticated practices as your needs grow.