Kubernetes Operators: Extending Kubernetes with Custom Resources and Controllers
Kubernetes has revolutionized container orchestration, but managing complex stateful applications requires more than just deployments and services. Enter Kubernetes Operators—a method of packaging, deploying, and managing applications that extends Kubernetes' capabilities through custom resources and intelligent controllers. This comprehensive guide will teach you how to build operators that automate operational knowledge into software.
Understanding Kubernetes Operators
An Operator is a software extension to Kubernetes that uses custom resources to manage applications and their components. Operators follow Kubernetes principles, notably the control loop pattern, to maintain the desired state of your applications.
The Operator Pattern
┌─────────────────────────────────────────────────────┐
│ Kubernetes API │
└────────────────┬───────────────┬────────────────────┘
│ │
┌───────▼───────┐ │
│ Custom │ │
│ Resource │ │
│ Definition │ │
└───────┬───────┘ │
│ │
┌───────▼───────────────▼────────┐
│ Controller │
│ ┌─────────────────────────┐ │
│ │ Reconciliation Loop │ │
│ │ 1. Observe │ │
│ │ 2. Analyze │ │
│ │ 3. Act │ │
│ └─────────────────────────┘ │
└─────────────┬──────────────────┘
│
┌────────▼────────┐
│ Application │
│ Resources │
└─────────────────┘
Creating Custom Resource Definitions (CRDs)
CRDs allow you to store and retrieve structured data in Kubernetes. Let's create a CRD for a database cluster:
# database-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: databases.example.com
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required: ["engine", "version", "replicas"]
properties:
engine:
type: string
enum: ["postgres", "mysql", "mongodb"]
version:
type: string
replicas:
type: integer
minimum: 1
maximum: 9
storage:
type: object
properties:
size:
type: string
pattern: "^[0-9]+Gi$"
class:
type: string
backup:
type: object
properties:
enabled:
type: boolean
schedule:
type: string
retention:
type: integer
status:
type: object
properties:
phase:
type: string
enum: ["Creating", "Running", "Failed", "Deleting"]
ready:
type: boolean
replicas:
type: integer
endpoint:
type: string
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
reason:
type: string
message:
type: string
lastTransitionTime:
type: string
additionalPrinterColumns:
- name: Engine
type: string
jsonPath: .spec.engine
- name: Version
type: string
jsonPath: .spec.version
- name: Replicas
type: integer
jsonPath: .spec.replicas
- name: Status
type: string
jsonPath: .status.phase
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
scope: Namespaced
names:
plural: databases
singular: database
kind: Database
shortNames:
- db
Building a Controller with Kubebuilder
Let's build a controller using Kubebuilder, a framework for building Kubernetes APIs:
1. Initialize the Project
# Install kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/
# Create project
mkdir database-operator && cd database-operator
kubebuilder init --domain example.com --repo github.com/example/database-operator
# Create API
kubebuilder create api --group apps --version v1 --kind Database
2. Define the API Types
// api/v1/database_types.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DatabaseSpec defines the desired state of Database
type DatabaseSpec struct {
// Engine is the database engine to use
// +kubebuilder:validation:Enum=postgres;mysql;mongodb
Engine string `json:"engine"`
// Version is the database version
Version string `json:"version"`
// Replicas is the number of database instances
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=9
Replicas int32 `json:"replicas"`
// Storage configuration
Storage StorageSpec `json:"storage,omitempty"`
// Backup configuration
Backup BackupSpec `json:"backup,omitempty"`
// Resources for the database pods
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}
type StorageSpec struct {
// Size of the persistent volume
// +kubebuilder:validation:Pattern="^[0-9]+Gi$"
Size string `json:"size,omitempty"`
// StorageClass to use
Class string `json:"class,omitempty"`
}
type BackupSpec struct {
// Enabled determines if backups are enabled
Enabled bool `json:"enabled,omitempty"`
// Schedule in cron format
Schedule string `json:"schedule,omitempty"`
// Retention days
Retention int32 `json:"retention,omitempty"`
}
// DatabaseStatus defines the observed state of Database
type DatabaseStatus struct {
// Phase represents the current phase of database
Phase DatabasePhase `json:"phase,omitempty"`
// Ready indicates if the database is ready
Ready bool `json:"ready,omitempty"`
// Replicas is the number of ready replicas
Replicas int32 `json:"replicas,omitempty"`
// Endpoint for database connection
Endpoint string `json:"endpoint,omitempty"`
// Conditions represent the latest available observations
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
// +kubebuilder:validation:Enum=Creating;Running;Failed;Deleting
type DatabasePhase string
const (
DatabasePhaseCreating DatabasePhase = "Creating"
DatabasePhaseRunning DatabasePhase = "Running"
DatabasePhaseFailed DatabasePhase = "Failed"
DatabasePhaseDeleting DatabasePhase = "Deleting"
)
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.spec.engine`
// +kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=`.metadata.creationTimestamp`
// Database is the Schema for the databases API
type Database struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DatabaseSpec `json:"spec,omitempty"`
Status DatabaseStatus `json:"status,omitempty"`
}
3. Implement the Controller
// controllers/database_controller.go
package controllers
import (
"context"
"fmt"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
appsv1 "github.com/example/database-operator/api/v1"
)
// DatabaseReconciler reconciles a Database object
type DatabaseReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=apps.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("database", req.NamespacedName)
// Fetch the Database instance
database := &appsv1.Database{}
if err := r.Get(ctx, req.NamespacedName, database); err != nil {
if errors.IsNotFound(err) {
log.Info("Database resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get Database")
return ctrl.Result{}, err
}
// Add finalizer for graceful cleanup
finalizerName := "database.example.com/finalizer"
if database.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(database, finalizerName) {
controllerutil.AddFinalizer(database, finalizerName)
if err := r.Update(ctx, database); err != nil {
return ctrl.Result{}, err
}
}
} else {
// Handle deletion
if controllerutil.ContainsFinalizer(database, finalizerName) {
if err := r.deleteExternalResources(database); err != nil {
return ctrl.Result{}, err
}
controllerutil.RemoveFinalizer(database, finalizerName)
if err := r.Update(ctx, database); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// Update status phase
database.Status.Phase = appsv1.DatabasePhaseCreating
if err := r.Status().Update(ctx, database); err != nil {
log.Error(err, "Failed to update Database status")
return ctrl.Result{}, err
}
// Create or update ConfigMap
configMap := r.configMapForDatabase(database)
if err := ctrl.SetControllerReference(database, configMap, r.Scheme); err != nil {
return ctrl.Result{}, err
}
foundConfigMap := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}, foundConfigMap)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating a new ConfigMap", "ConfigMap.Namespace", configMap.Namespace, "ConfigMap.Name", configMap.Name)
err = r.Create(ctx, configMap)
if err != nil {
return ctrl.Result{}, err
}
} else if err != nil {
return ctrl.Result{}, err
}
// Create or update Service
service := r.serviceForDatabase(database)
if err := ctrl.SetControllerReference(database, service, r.Scheme); err != nil {
return ctrl.Result{}, err
}
foundService := &corev1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating a new Service", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
err = r.Create(ctx, service)
if err != nil {
return ctrl.Result{}, err
}
} else if err != nil {
return ctrl.Result{}, err
}
// Create or update StatefulSet
statefulSet := r.statefulSetForDatabase(database)
if err := ctrl.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
return ctrl.Result{}, err
}
foundStatefulSet := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: statefulSet.Name, Namespace: statefulSet.Namespace}, foundStatefulSet)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating a new StatefulSet", "StatefulSet.Namespace", statefulSet.Namespace, "StatefulSet.Name", statefulSet.Name)
err = r.Create(ctx, statefulSet)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
return ctrl.Result{}, err
}
// Update StatefulSet if needed
if foundStatefulSet.Spec.Replicas != &database.Spec.Replicas {
foundStatefulSet.Spec.Replicas = &database.Spec.Replicas
err = r.Update(ctx, foundStatefulSet)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// Update status
database.Status.Replicas = foundStatefulSet.Status.ReadyReplicas
database.Status.Ready = foundStatefulSet.Status.ReadyReplicas == database.Spec.Replicas
if database.Status.Ready {
database.Status.Phase = appsv1.DatabasePhaseRunning
database.Status.Endpoint = fmt.Sprintf("%s.%s.svc.cluster.local:%d",
service.Name, database.Namespace, r.getPort(database))
}
// Update conditions
r.updateConditions(database, foundStatefulSet)
if err := r.Status().Update(ctx, database); err != nil {
log.Error(err, "Failed to update Database status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *DatabaseReconciler) statefulSetForDatabase(db *appsv1.Database) *appsv1.StatefulSet {
labels := map[string]string{
"app": "database",
"database": db.Name,
"engine": db.Spec.Engine,
}
replicas := db.Spec.Replicas
statefulSet := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name + "-" + db.Spec.Engine,
Namespace: db.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
ServiceName: db.Name + "-" + db.Spec.Engine,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: db.Spec.Engine,
Image: r.getImage(db),
Ports: []corev1.ContainerPort{{
ContainerPort: r.getPort(db),
Name: db.Spec.Engine,
}},
VolumeMounts: []corev1.VolumeMount{{
Name: "data",
MountPath: r.getDataPath(db),
}},
Env: r.getEnvVars(db),
Resources: db.Spec.Resources,
}},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
StorageClassName: &db.Spec.Storage.Class,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(db.Spec.Storage.Size),
},
},
},
}},
},
}
return statefulSet
}
func (r *DatabaseReconciler) getImage(db *appsv1.Database) string {
switch db.Spec.Engine {
case "postgres":
return fmt.Sprintf("postgres:%s", db.Spec.Version)
case "mysql":
return fmt.Sprintf("mysql:%s", db.Spec.Version)
case "mongodb":
return fmt.Sprintf("mongo:%s", db.Spec.Version)
default:
return ""
}
}
func (r *DatabaseReconciler) getPort(db *appsv1.Database) int32 {
switch db.Spec.Engine {
case "postgres":
return 5432
case "mysql":
return 3306
case "mongodb":
return 27017
default:
return 0
}
}
func (r *DatabaseReconciler) getDataPath(db *appsv1.Database) string {
switch db.Spec.Engine {
case "postgres":
return "/var/lib/postgresql/data"
case "mysql":
return "/var/lib/mysql"
case "mongodb":
return "/data/db"
default:
return "/data"
}
}
func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.Database{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
Advanced Operator Patterns
1. Backup Controller
// controllers/backup_controller.go
package controllers
import (
"context"
"time"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func (r *DatabaseReconciler) ensureBackupCronJob(ctx context.Context, db *appsv1.Database) error {
if !db.Spec.Backup.Enabled {
// Delete CronJob if backup is disabled
cronJob := &batchv1beta1.CronJob{}
err := r.Get(ctx, types.NamespacedName{
Name: db.Name + "-backup",
Namespace: db.Namespace,
}, cronJob)
if err == nil {
return r.Delete(ctx, cronJob)
}
return nil
}
cronJob := &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name + "-backup",
Namespace: db.Namespace,
},
Spec: batchv1beta1.CronJobSpec{
Schedule: db.Spec.Backup.Schedule,
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{{
Name: "backup",
Image: r.getBackupImage(db),
Command: r.getBackupCommand(db),
Env: []corev1.EnvVar{
{
Name: "DATABASE_HOST",
Value: db.Status.Endpoint,
},
{
Name: "DATABASE_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: db.Name + "-secret",
},
Key: "password",
},
},
},
{
Name: "S3_BUCKET",
Value: "database-backups",
},
{
Name: "RETENTION_DAYS",
Value: fmt.Sprintf("%d", db.Spec.Backup.Retention),
},
},
}},
},
},
},
},
},
}
if err := ctrl.SetControllerReference(db, cronJob, r.Scheme); err != nil {
return err
}
found := &batchv1beta1.CronJob{}
err := r.Get(ctx, types.NamespacedName{Name: cronJob.Name, Namespace: cronJob.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
return r.Create(ctx, cronJob)
}
return err
}
func (r *DatabaseReconciler) getBackupCommand(db *appsv1.Database) []string {
timestamp := time.Now().Format("20060102-150405")
switch db.Spec.Engine {
case "postgres":
return []string{
"/bin/bash",
"-c",
fmt.Sprintf(`
pg_dump -h $DATABASE_HOST -U postgres -d postgres | \
gzip | \
aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.sql.gz
`, db.Name, timestamp),
}
case "mysql":
return []string{
"/bin/bash",
"-c",
fmt.Sprintf(`
mysqldump -h $DATABASE_HOST -u root -p$DATABASE_PASSWORD --all-databases | \
gzip | \
aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.sql.gz
`, db.Name, timestamp),
}
case "mongodb":
return []string{
"/bin/bash",
"-c",
fmt.Sprintf(`
mongodump --uri="mongodb://$DATABASE_HOST" --archive | \
gzip | \
aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.archive.gz
`, db.Name, timestamp),
}
default:
return []string{}
}
}
2. Monitoring Integration
// controllers/monitoring.go
package controllers
import (
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
)
func (r *DatabaseReconciler) ensureServiceMonitor(ctx context.Context, db *appsv1.Database) error {
serviceMonitor := &monitoringv1.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name + "-monitor",
Namespace: db.Namespace,
Labels: map[string]string{
"app": "database",
"database": db.Name,
},
},
Spec: monitoringv1.ServiceMonitorSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "database",
"database": db.Name,
},
},
Endpoints: []monitoringv1.Endpoint{
{
Port: "metrics",
Interval: "30s",
Path: "/metrics",
},
},
},
}
if err := ctrl.SetControllerReference(db, serviceMonitor, r.Scheme); err != nil {
return err
}
found := &monitoringv1.ServiceMonitor{}
err := r.Get(ctx, types.NamespacedName{
Name: serviceMonitor.Name,
Namespace: serviceMonitor.Namespace,
}, found)
if err != nil && errors.IsNotFound(err) {
return r.Create(ctx, serviceMonitor)
}
return err
}
// Prometheus rules for alerting
func (r *DatabaseReconciler) ensurePrometheusRule(ctx context.Context, db *appsv1.Database) error {
rule := &monitoringv1.PrometheusRule{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name + "-rules",
Namespace: db.Namespace,
},
Spec: monitoringv1.PrometheusRuleSpec{
Groups: []monitoringv1.RuleGroup{
{
Name: db.Name + "-alerts",
Rules: []monitoringv1.Rule{
{
Alert: "DatabaseDown",
Expr: intstr.FromString(fmt.Sprintf(`up{job="%s-monitor"} == 0`, db.Name)),
For: "5m",
Labels: map[string]string{
"severity": "critical",
"database": db.Name,
},
Annotations: map[string]string{
"summary": "Database {{ $labels.database }} is down",
"description": "Database {{ $labels.database }} has been down for more than 5 minutes.",
},
},
{
Alert: "DatabaseHighConnections",
Expr: intstr.FromString(fmt.Sprintf(`database_connections{database="%s"} > 80`, db.Name)),
For: "10m",
Labels: map[string]string{
"severity": "warning",
"database": db.Name,
},
Annotations: map[string]string{
"summary": "High connection count on {{ $labels.database }}",
"description": "Database {{ $labels.database }} has more than 80 active connections.",
},
},
},
},
},
},
}
if err := ctrl.SetControllerReference(db, rule, r.Scheme); err != nil {
return err
}
return r.Create(ctx, rule)
}
3. Webhook Validation
// api/v1/database_webhook.go
package v1
import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
var databaselog = logf.Log.WithName("database-resource")
func (r *Database) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}
// +kubebuilder:webhook:path=/mutate-apps-example-com-v1-database,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps.example.com,resources=databases,verbs=create;update,versions=v1,name=mdatabase.kb.io,admissionReviewVersions={v1,v1beta1}
var _ webhook.Defaulter = &Database{}
// Default implements webhook.Defaulter
func (r *Database) Default() {
databaselog.Info("default", "name", r.Name)
// Set default values
if r.Spec.Storage.Size == "" {
r.Spec.Storage.Size = "10Gi"
}
if r.Spec.Storage.Class == "" {
r.Spec.Storage.Class = "standard"
}
if r.Spec.Backup.Schedule == "" && r.Spec.Backup.Enabled {
r.Spec.Backup.Schedule = "0 2 * * *" // 2 AM daily
}
if r.Spec.Backup.Retention == 0 && r.Spec.Backup.Enabled {
r.Spec.Backup.Retention = 7
}
}
// +kubebuilder:webhook:path=/validate-apps-example-com-v1-database,mutating=false,failurePolicy=fail,sideEffects=None,groups=apps.example.com,resources=databases,verbs=create;update,versions=v1,name=vdatabase.kb.io,admissionReviewVersions={v1,v1beta1}
var _ webhook.Validator = &Database{}
// ValidateCreate implements webhook.Validator
func (r *Database) ValidateCreate() error {
databaselog.Info("validate create", "name", r.Name)
// Validate engine-specific constraints
switch r.Spec.Engine {
case "postgres":
if r.Spec.Version < "12" {
return fmt.Errorf("PostgreSQL version must be 12 or higher")
}
case "mysql":
if r.Spec.Version < "8.0" {
return fmt.Errorf("MySQL version must be 8.0 or higher")
}
case "mongodb":
if r.Spec.Version < "4.4" {
return fmt.Errorf("MongoDB version must be 4.4 or higher")
}
}
// Validate storage size
size, err := resource.ParseQuantity(r.Spec.Storage.Size)
if err != nil {
return fmt.Errorf("invalid storage size: %v", err)
}
minSize := resource.MustParse("1Gi")
if size.Cmp(minSize) < 0 {
return fmt.Errorf("storage size must be at least 1Gi")
}
return nil
}
// ValidateUpdate implements webhook.Validator
func (r *Database) ValidateUpdate(old runtime.Object) error {
databaselog.Info("validate update", "name", r.Name)
oldDB := old.(*Database)
// Prevent engine changes
if r.Spec.Engine != oldDB.Spec.Engine {
return fmt.Errorf("database engine cannot be changed")
}
// Prevent downgrades
if r.Spec.Version < oldDB.Spec.Version {
return fmt.Errorf("database version cannot be downgraded")
}
// Prevent storage shrinking
newSize, _ := resource.ParseQuantity(r.Spec.Storage.Size)
oldSize, _ := resource.ParseQuantity(oldDB.Spec.Storage.Size)
if newSize.Cmp(oldSize) < 0 {
return fmt.Errorf("storage size cannot be reduced")
}
return nil
}
// ValidateDelete implements webhook.Validator
func (r *Database) ValidateDelete() error {
databaselog.Info("validate delete", "name", r.Name)
// Could add logic to prevent deletion of databases with active connections
// or require confirmation annotation
if r.Annotations["confirm-delete"] != "true" {
return fmt.Errorf("database deletion requires 'confirm-delete: true' annotation")
}
return nil
}
Testing Your Operator
1. Unit Tests
// controllers/database_controller_test.go
package controllers
import (
"context"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
dbv1 "github.com/example/database-operator/api/v1"
)
var _ = Describe("Database Controller", func() {
const (
DatabaseName = "test-database"
DatabaseNamespace = "default"
timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)
Context("When creating Database", func() {
It("Should create StatefulSet and Service", func() {
ctx := context.Background()
database := &dbv1.Database{
ObjectMeta: metav1.ObjectMeta{
Name: DatabaseName,
Namespace: DatabaseNamespace,
},
Spec: dbv1.DatabaseSpec{
Engine: "postgres",
Version: "14",
Replicas: 3,
Storage: dbv1.StorageSpec{
Size: "10Gi",
Class: "fast-ssd",
},
},
}
Expect(k8sClient.Create(ctx, database)).Should(Succeed())
databaseLookupKey := types.NamespacedName{Name: DatabaseName, Namespace: DatabaseNamespace}
createdDatabase := &dbv1.Database{}
Eventually(func() bool {
err := k8sClient.Get(ctx, databaseLookupKey, createdDatabase)
return err == nil
}, timeout, interval).Should(BeTrue())
// Check StatefulSet creation
Eventually(func() bool {
statefulSet := &appsv1.StatefulSet{}
err := k8sClient.Get(ctx, types.NamespacedName{
Name: DatabaseName + "-postgres",
Namespace: DatabaseNamespace,
}, statefulSet)
if err != nil {
return false
}
return *statefulSet.Spec.Replicas == 3
}, timeout, interval).Should(BeTrue())
// Check Service creation
Eventually(func() bool {
service := &corev1.Service{}
err := k8sClient.Get(ctx, types.NamespacedName{
Name: DatabaseName + "-postgres",
Namespace: DatabaseNamespace,
}, service)
return err == nil
}, timeout, interval).Should(BeTrue())
// Check status update
Eventually(func() string {
err := k8sClient.Get(ctx, databaseLookupKey, createdDatabase)
if err != nil {
return ""
}
return string(createdDatabase.Status.Phase)
}, timeout, interval).Should(Equal(string(dbv1.DatabasePhaseCreating)))
})
})
})
2. Integration Tests
// test/e2e/database_test.go
package e2e
import (
"fmt"
"testing"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func TestDatabaseOperator(t *testing.T) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
t.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
// Create test database
database := &dbv1.Database{
ObjectMeta: metav1.ObjectMeta{
Name: "e2e-test-db",
Namespace: "default",
},
Spec: dbv1.DatabaseSpec{
Engine: "postgres",
Version: "14",
Replicas: 1,
Storage: dbv1.StorageSpec{
Size: "5Gi",
},
Backup: dbv1.BackupSpec{
Enabled: true,
Schedule: "*/5 * * * *", // Every 5 minutes for testing
Retention: 1,
},
},
}
// Create database
_, err = dbClient.Create(database)
if err != nil {
t.Fatal(err)
}
// Wait for database to be ready
err = waitForDatabaseReady(dbClient, database.Name, 5*time.Minute)
if err != nil {
t.Fatal(err)
}
// Test database connection
endpoint, err := getDatabaseEndpoint(dbClient, database.Name)
if err != nil {
t.Fatal(err)
}
err = testDatabaseConnection(endpoint)
if err != nil {
t.Fatal(err)
}
// Test backup creation
err = waitForBackupJob(clientset, database.Name, 10*time.Minute)
if err != nil {
t.Fatal(err)
}
// Clean up
err = dbClient.Delete(database.Name, &metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
}
Deployment and Operations
1. Building and Publishing
# Dockerfile
FROM golang:1.19 as builder
WORKDIR /workspace
COPY go.mod go.sum ./
RUN go mod download
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY /workspace/manager .
USER 65532:65532
ENTRYPOINT ["/manager"]
2. Deployment Manifest
# config/manager/manager.yaml
apiVersion: v1
kind: Namespace
metadata:
name: database-operator-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: database-operator-controller-manager
namespace: database-operator-system
spec:
selector:
matchLabels:
control-plane: controller-manager
replicas: 1
template:
metadata:
labels:
control-plane: controller-manager
spec:
securityContext:
runAsNonRoot: true
containers:
- command:
- /manager
args:
- --leader-elect
image: database-operator:latest
name: manager
securityContext:
allowPrivilegeEscalation: false
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
serviceAccountName: database-operator-controller-manager
terminationGracePeriodSeconds: 10
3. Monitoring with Prometheus
# Prometheus ServiceMonitor for operator metrics
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: database-operator-metrics
namespace: database-operator-system
spec:
endpoints:
- path: /metrics
port: https
scheme: https
tlsConfig:
insecureSkipVerify: true
selector:
matchLabels:
control-plane: controller-manager
Best Practices
- Resource Ownership: Always set owner references to ensure proper garbage collection
- Status Updates: Keep status separate from spec and update it regularly
- Idempotency: Ensure reconciliation logic is idempotent
- Error Handling: Use exponential backoff for retries
- Observability: Export metrics and use structured logging
- Testing: Write comprehensive unit and integration tests
- Security: Run with minimal privileges and use RBAC properly
Conclusion
Kubernetes Operators represent a powerful pattern for extending Kubernetes to manage complex applications. By encoding operational knowledge into software, operators automate tasks that would otherwise require manual intervention. Whether you're managing databases, message queues, or custom applications, the operator pattern provides a robust framework for cloud-native automation.
The combination of Custom Resource Definitions and controllers gives you the flexibility to define your own abstractions while leveraging Kubernetes' proven reconciliation loop pattern. As you build operators, remember that the goal is to make the complex simple—abstracting away operational complexity while providing a declarative API that fits naturally into the Kubernetes ecosystem.