Kubernetes Operators: Extending Kubernetes with Custom Resources and Controllers

Kubernetes has revolutionized container orchestration, but managing complex stateful applications requires more than just deployments and services. Enter Kubernetes Operators—a method of packaging, deploying, and managing applications that extends Kubernetes' capabilities through custom resources and intelligent controllers. This comprehensive guide will teach you how to build operators that automate operational knowledge into software.

Understanding Kubernetes Operators

An Operator is a software extension to Kubernetes that uses custom resources to manage applications and their components. Operators follow Kubernetes principles, notably the control loop pattern, to maintain the desired state of your applications.

The Operator Pattern

┌─────────────────────────────────────────────────────┐
│                   Kubernetes API                     │
└────────────────┬───────────────┬────────────────────┘
                 │               │
         ┌───────▼───────┐       │
         │   Custom      │       │
         │  Resource     │       │
         │ Definition    │       │
         └───────┬───────┘       │
                 │               │
         ┌───────▼───────────────▼────────┐
         │         Controller              │
         │  ┌─────────────────────────┐   │
         │  │   Reconciliation Loop   │   │
         │  │  1. Observe            │   │
         │  │  2. Analyze            │   │
         │  │  3. Act                │   │
         │  └─────────────────────────┘   │
         └─────────────┬──────────────────┘
                       │
              ┌────────▼────────┐
              │   Application   │
              │   Resources     │
              └─────────────────┘

Creating Custom Resource Definitions (CRDs)

CRDs allow you to store and retrieve structured data in Kubernetes. Let's create a CRD for a database cluster:

# database-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.example.com
spec:
  group: example.com
  versions:
  - name: v1
    served: true
    storage: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            required: ["engine", "version", "replicas"]
            properties:
              engine:
                type: string
                enum: ["postgres", "mysql", "mongodb"]
              version:
                type: string
              replicas:
                type: integer
                minimum: 1
                maximum: 9
              storage:
                type: object
                properties:
                  size:
                    type: string
                    pattern: "^[0-9]+Gi$"
                  class:
                    type: string
              backup:
                type: object
                properties:
                  enabled:
                    type: boolean
                  schedule:
                    type: string
                  retention:
                    type: integer
          status:
            type: object
            properties:
              phase:
                type: string
                enum: ["Creating", "Running", "Failed", "Deleting"]
              ready:
                type: boolean
              replicas:
                type: integer
              endpoint:
                type: string
              conditions:
                type: array
                items:
                  type: object
                  properties:
                    type:
                      type: string
                    status:
                      type: string
                    reason:
                      type: string
                    message:
                      type: string
                    lastTransitionTime:
                      type: string
    additionalPrinterColumns:
    - name: Engine
      type: string
      jsonPath: .spec.engine
    - name: Version
      type: string
      jsonPath: .spec.version
    - name: Replicas
      type: integer
      jsonPath: .spec.replicas
    - name: Status
      type: string
      jsonPath: .status.phase
    - name: Age
      type: date
      jsonPath: .metadata.creationTimestamp
  scope: Namespaced
  names:
    plural: databases
    singular: database
    kind: Database
    shortNames:
    - db

Building a Controller with Kubebuilder

Let's build a controller using Kubebuilder, a framework for building Kubernetes APIs:

1. Initialize the Project

# Install kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/

# Create project
mkdir database-operator && cd database-operator
kubebuilder init --domain example.com --repo github.com/example/database-operator

# Create API
kubebuilder create api --group apps --version v1 --kind Database

2. Define the API Types

// api/v1/database_types.go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DatabaseSpec defines the desired state of Database
type DatabaseSpec struct {
    // Engine is the database engine to use
    // +kubebuilder:validation:Enum=postgres;mysql;mongodb
    Engine string `json:"engine"`
    
    // Version is the database version
    Version string `json:"version"`
    
    // Replicas is the number of database instances
    // +kubebuilder:validation:Minimum=1
    // +kubebuilder:validation:Maximum=9
    Replicas int32 `json:"replicas"`
    
    // Storage configuration
    Storage StorageSpec `json:"storage,omitempty"`
    
    // Backup configuration
    Backup BackupSpec `json:"backup,omitempty"`
    
    // Resources for the database pods
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}

type StorageSpec struct {
    // Size of the persistent volume
    // +kubebuilder:validation:Pattern="^[0-9]+Gi$"
    Size string `json:"size,omitempty"`
    
    // StorageClass to use
    Class string `json:"class,omitempty"`
}

type BackupSpec struct {
    // Enabled determines if backups are enabled
    Enabled bool `json:"enabled,omitempty"`
    
    // Schedule in cron format
    Schedule string `json:"schedule,omitempty"`
    
    // Retention days
    Retention int32 `json:"retention,omitempty"`
}

// DatabaseStatus defines the observed state of Database
type DatabaseStatus struct {
    // Phase represents the current phase of database
    Phase DatabasePhase `json:"phase,omitempty"`
    
    // Ready indicates if the database is ready
    Ready bool `json:"ready,omitempty"`
    
    // Replicas is the number of ready replicas
    Replicas int32 `json:"replicas,omitempty"`
    
    // Endpoint for database connection
    Endpoint string `json:"endpoint,omitempty"`
    
    // Conditions represent the latest available observations
    Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:validation:Enum=Creating;Running;Failed;Deleting
type DatabasePhase string

const (
    DatabasePhaseCreating DatabasePhase = "Creating"
    DatabasePhaseRunning  DatabasePhase = "Running"
    DatabasePhaseFailed   DatabasePhase = "Failed"
    DatabasePhaseDeleting DatabasePhase = "Deleting"
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.spec.engine`
// +kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=`.metadata.creationTimestamp`

// Database is the Schema for the databases API
type Database struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    
    Spec   DatabaseSpec   `json:"spec,omitempty"`
    Status DatabaseStatus `json:"status,omitempty"`
}

3. Implement the Controller

// controllers/database_controller.go
package controllers

import (
    "context"
    "fmt"
    
    "github.com/go-logr/logr"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    
    appsv1 "github.com/example/database-operator/api/v1"
)

// DatabaseReconciler reconciles a Database object
type DatabaseReconciler struct {
    client.Client
    Log    logr.Logger
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=apps.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete

func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := r.Log.WithValues("database", req.NamespacedName)
    
    // Fetch the Database instance
    database := &appsv1.Database{}
    if err := r.Get(ctx, req.NamespacedName, database); err != nil {
        if errors.IsNotFound(err) {
            log.Info("Database resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        log.Error(err, "Failed to get Database")
        return ctrl.Result{}, err
    }
    
    // Add finalizer for graceful cleanup
    finalizerName := "database.example.com/finalizer"
    if database.ObjectMeta.DeletionTimestamp.IsZero() {
        if !controllerutil.ContainsFinalizer(database, finalizerName) {
            controllerutil.AddFinalizer(database, finalizerName)
            if err := r.Update(ctx, database); err != nil {
                return ctrl.Result{}, err
            }
        }
    } else {
        // Handle deletion
        if controllerutil.ContainsFinalizer(database, finalizerName) {
            if err := r.deleteExternalResources(database); err != nil {
                return ctrl.Result{}, err
            }
            
            controllerutil.RemoveFinalizer(database, finalizerName)
            if err := r.Update(ctx, database); err != nil {
                return ctrl.Result{}, err
            }
        }
        return ctrl.Result{}, nil
    }
    
    // Update status phase
    database.Status.Phase = appsv1.DatabasePhaseCreating
    if err := r.Status().Update(ctx, database); err != nil {
        log.Error(err, "Failed to update Database status")
        return ctrl.Result{}, err
    }
    
    // Create or update ConfigMap
    configMap := r.configMapForDatabase(database)
    if err := ctrl.SetControllerReference(database, configMap, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }
    
    foundConfigMap := &corev1.ConfigMap{}
    err := r.Get(ctx, types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}, foundConfigMap)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating a new ConfigMap", "ConfigMap.Namespace", configMap.Namespace, "ConfigMap.Name", configMap.Name)
        err = r.Create(ctx, configMap)
        if err != nil {
            return ctrl.Result{}, err
        }
    } else if err != nil {
        return ctrl.Result{}, err
    }
    
    // Create or update Service
    service := r.serviceForDatabase(database)
    if err := ctrl.SetControllerReference(database, service, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }
    
    foundService := &corev1.Service{}
    err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating a new Service", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
        err = r.Create(ctx, service)
        if err != nil {
            return ctrl.Result{}, err
        }
    } else if err != nil {
        return ctrl.Result{}, err
    }
    
    // Create or update StatefulSet
    statefulSet := r.statefulSetForDatabase(database)
    if err := ctrl.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }
    
    foundStatefulSet := &appsv1.StatefulSet{}
    err = r.Get(ctx, types.NamespacedName{Name: statefulSet.Name, Namespace: statefulSet.Namespace}, foundStatefulSet)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating a new StatefulSet", "StatefulSet.Namespace", statefulSet.Namespace, "StatefulSet.Name", statefulSet.Name)
        err = r.Create(ctx, statefulSet)
        if err != nil {
            return ctrl.Result{}, err
        }
        return ctrl.Result{Requeue: true}, nil
    } else if err != nil {
        return ctrl.Result{}, err
    }
    
    // Update StatefulSet if needed
    if foundStatefulSet.Spec.Replicas != &database.Spec.Replicas {
        foundStatefulSet.Spec.Replicas = &database.Spec.Replicas
        err = r.Update(ctx, foundStatefulSet)
        if err != nil {
            return ctrl.Result{}, err
        }
        return ctrl.Result{Requeue: true}, nil
    }
    
    // Update status
    database.Status.Replicas = foundStatefulSet.Status.ReadyReplicas
    database.Status.Ready = foundStatefulSet.Status.ReadyReplicas == database.Spec.Replicas
    if database.Status.Ready {
        database.Status.Phase = appsv1.DatabasePhaseRunning
        database.Status.Endpoint = fmt.Sprintf("%s.%s.svc.cluster.local:%d", 
            service.Name, database.Namespace, r.getPort(database))
    }
    
    // Update conditions
    r.updateConditions(database, foundStatefulSet)
    
    if err := r.Status().Update(ctx, database); err != nil {
        log.Error(err, "Failed to update Database status")
        return ctrl.Result{}, err
    }
    
    return ctrl.Result{}, nil
}

func (r *DatabaseReconciler) statefulSetForDatabase(db *appsv1.Database) *appsv1.StatefulSet {
    labels := map[string]string{
        "app":      "database",
        "database": db.Name,
        "engine":   db.Spec.Engine,
    }
    
    replicas := db.Spec.Replicas
    
    statefulSet := &appsv1.StatefulSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-" + db.Spec.Engine,
            Namespace: db.Namespace,
            Labels:    labels,
        },
        Spec: appsv1.StatefulSetSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            ServiceName: db.Name + "-" + db.Spec.Engine,
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  db.Spec.Engine,
                        Image: r.getImage(db),
                        Ports: []corev1.ContainerPort{{
                            ContainerPort: r.getPort(db),
                            Name:          db.Spec.Engine,
                        }},
                        VolumeMounts: []corev1.VolumeMount{{
                            Name:      "data",
                            MountPath: r.getDataPath(db),
                        }},
                        Env: r.getEnvVars(db),
                        Resources: db.Spec.Resources,
                    }},
                },
            },
            VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
                ObjectMeta: metav1.ObjectMeta{
                    Name: "data",
                },
                Spec: corev1.PersistentVolumeClaimSpec{
                    AccessModes: []corev1.PersistentVolumeAccessMode{
                        corev1.ReadWriteOnce,
                    },
                    StorageClassName: &db.Spec.Storage.Class,
                    Resources: corev1.ResourceRequirements{
                        Requests: corev1.ResourceList{
                            corev1.ResourceStorage: resource.MustParse(db.Spec.Storage.Size),
                        },
                    },
                },
            }},
        },
    }
    
    return statefulSet
}

func (r *DatabaseReconciler) getImage(db *appsv1.Database) string {
    switch db.Spec.Engine {
    case "postgres":
        return fmt.Sprintf("postgres:%s", db.Spec.Version)
    case "mysql":
        return fmt.Sprintf("mysql:%s", db.Spec.Version)
    case "mongodb":
        return fmt.Sprintf("mongo:%s", db.Spec.Version)
    default:
        return ""
    }
}

func (r *DatabaseReconciler) getPort(db *appsv1.Database) int32 {
    switch db.Spec.Engine {
    case "postgres":
        return 5432
    case "mysql":
        return 3306
    case "mongodb":
        return 27017
    default:
        return 0
    }
}

func (r *DatabaseReconciler) getDataPath(db *appsv1.Database) string {
    switch db.Spec.Engine {
    case "postgres":
        return "/var/lib/postgresql/data"
    case "mysql":
        return "/var/lib/mysql"
    case "mongodb":
        return "/data/db"
    default:
        return "/data"
    }
}

func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1.Database{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Owns(&corev1.ConfigMap{}).
        Complete(r)
}

Advanced Operator Patterns

1. Backup Controller

// controllers/backup_controller.go
package controllers

import (
    "context"
    "time"
    
    batchv1 "k8s.io/api/batch/v1"
    batchv1beta1 "k8s.io/api/batch/v1beta1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (r *DatabaseReconciler) ensureBackupCronJob(ctx context.Context, db *appsv1.Database) error {
    if !db.Spec.Backup.Enabled {
        // Delete CronJob if backup is disabled
        cronJob := &batchv1beta1.CronJob{}
        err := r.Get(ctx, types.NamespacedName{
            Name:      db.Name + "-backup",
            Namespace: db.Namespace,
        }, cronJob)
        
        if err == nil {
            return r.Delete(ctx, cronJob)
        }
        return nil
    }
    
    cronJob := &batchv1beta1.CronJob{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-backup",
            Namespace: db.Namespace,
        },
        Spec: batchv1beta1.CronJobSpec{
            Schedule: db.Spec.Backup.Schedule,
            JobTemplate: batchv1beta1.JobTemplateSpec{
                Spec: batchv1.JobSpec{
                    Template: corev1.PodTemplateSpec{
                        Spec: corev1.PodSpec{
                            RestartPolicy: corev1.RestartPolicyOnFailure,
                            Containers: []corev1.Container{{
                                Name:  "backup",
                                Image: r.getBackupImage(db),
                                Command: r.getBackupCommand(db),
                                Env: []corev1.EnvVar{
                                    {
                                        Name:  "DATABASE_HOST",
                                        Value: db.Status.Endpoint,
                                    },
                                    {
                                        Name: "DATABASE_PASSWORD",
                                        ValueFrom: &corev1.EnvVarSource{
                                            SecretKeyRef: &corev1.SecretKeySelector{
                                                LocalObjectReference: corev1.LocalObjectReference{
                                                    Name: db.Name + "-secret",
                                                },
                                                Key: "password",
                                            },
                                        },
                                    },
                                    {
                                        Name:  "S3_BUCKET",
                                        Value: "database-backups",
                                    },
                                    {
                                        Name:  "RETENTION_DAYS",
                                        Value: fmt.Sprintf("%d", db.Spec.Backup.Retention),
                                    },
                                },
                            }},
                        },
                    },
                },
            },
        },
    }
    
    if err := ctrl.SetControllerReference(db, cronJob, r.Scheme); err != nil {
        return err
    }
    
    found := &batchv1beta1.CronJob{}
    err := r.Get(ctx, types.NamespacedName{Name: cronJob.Name, Namespace: cronJob.Namespace}, found)
    if err != nil && errors.IsNotFound(err) {
        return r.Create(ctx, cronJob)
    }
    
    return err
}

func (r *DatabaseReconciler) getBackupCommand(db *appsv1.Database) []string {
    timestamp := time.Now().Format("20060102-150405")
    
    switch db.Spec.Engine {
    case "postgres":
        return []string{
            "/bin/bash",
            "-c",
            fmt.Sprintf(`
                pg_dump -h $DATABASE_HOST -U postgres -d postgres | \
                gzip | \
                aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.sql.gz
            `, db.Name, timestamp),
        }
    case "mysql":
        return []string{
            "/bin/bash",
            "-c",
            fmt.Sprintf(`
                mysqldump -h $DATABASE_HOST -u root -p$DATABASE_PASSWORD --all-databases | \
                gzip | \
                aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.sql.gz
            `, db.Name, timestamp),
        }
    case "mongodb":
        return []string{
            "/bin/bash",
            "-c",
            fmt.Sprintf(`
                mongodump --uri="mongodb://$DATABASE_HOST" --archive | \
                gzip | \
                aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.archive.gz
            `, db.Name, timestamp),
        }
    default:
        return []string{}
    }
}

2. Monitoring Integration

// controllers/monitoring.go
package controllers

import (
    monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
)

func (r *DatabaseReconciler) ensureServiceMonitor(ctx context.Context, db *appsv1.Database) error {
    serviceMonitor := &monitoringv1.ServiceMonitor{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-monitor",
            Namespace: db.Namespace,
            Labels: map[string]string{
                "app":      "database",
                "database": db.Name,
            },
        },
        Spec: monitoringv1.ServiceMonitorSpec{
            Selector: metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app":      "database",
                    "database": db.Name,
                },
            },
            Endpoints: []monitoringv1.Endpoint{
                {
                    Port:     "metrics",
                    Interval: "30s",
                    Path:     "/metrics",
                },
            },
        },
    }
    
    if err := ctrl.SetControllerReference(db, serviceMonitor, r.Scheme); err != nil {
        return err
    }
    
    found := &monitoringv1.ServiceMonitor{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      serviceMonitor.Name,
        Namespace: serviceMonitor.Namespace,
    }, found)
    
    if err != nil && errors.IsNotFound(err) {
        return r.Create(ctx, serviceMonitor)
    }
    
    return err
}

// Prometheus rules for alerting
func (r *DatabaseReconciler) ensurePrometheusRule(ctx context.Context, db *appsv1.Database) error {
    rule := &monitoringv1.PrometheusRule{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-rules",
            Namespace: db.Namespace,
        },
        Spec: monitoringv1.PrometheusRuleSpec{
            Groups: []monitoringv1.RuleGroup{
                {
                    Name: db.Name + "-alerts",
                    Rules: []monitoringv1.Rule{
                        {
                            Alert: "DatabaseDown",
                            Expr:  intstr.FromString(fmt.Sprintf(`up{job="%s-monitor"} == 0`, db.Name)),
                            For:   "5m",
                            Labels: map[string]string{
                                "severity": "critical",
                                "database": db.Name,
                            },
                            Annotations: map[string]string{
                                "summary":     "Database {{ $labels.database }} is down",
                                "description": "Database {{ $labels.database }} has been down for more than 5 minutes.",
                            },
                        },
                        {
                            Alert: "DatabaseHighConnections",
                            Expr:  intstr.FromString(fmt.Sprintf(`database_connections{database="%s"} > 80`, db.Name)),
                            For:   "10m",
                            Labels: map[string]string{
                                "severity": "warning",
                                "database": db.Name,
                            },
                            Annotations: map[string]string{
                                "summary":     "High connection count on {{ $labels.database }}",
                                "description": "Database {{ $labels.database }} has more than 80 active connections.",
                            },
                        },
                    },
                },
            },
        },
    }
    
    if err := ctrl.SetControllerReference(db, rule, r.Scheme); err != nil {
        return err
    }
    
    return r.Create(ctx, rule)
}

3. Webhook Validation

// api/v1/database_webhook.go
package v1

import (
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    logf "sigs.k8s.io/controller-runtime/pkg/log"
    "sigs.k8s.io/controller-runtime/pkg/webhook"
)

var databaselog = logf.Log.WithName("database-resource")

func (r *Database) SetupWebhookWithManager(mgr ctrl.Manager) error {
    return ctrl.NewWebhookManagedBy(mgr).
        For(r).
        Complete()
}

// +kubebuilder:webhook:path=/mutate-apps-example-com-v1-database,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps.example.com,resources=databases,verbs=create;update,versions=v1,name=mdatabase.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Defaulter = &Database{}

// Default implements webhook.Defaulter
func (r *Database) Default() {
    databaselog.Info("default", "name", r.Name)
    
    // Set default values
    if r.Spec.Storage.Size == "" {
        r.Spec.Storage.Size = "10Gi"
    }
    
    if r.Spec.Storage.Class == "" {
        r.Spec.Storage.Class = "standard"
    }
    
    if r.Spec.Backup.Schedule == "" && r.Spec.Backup.Enabled {
        r.Spec.Backup.Schedule = "0 2 * * *" // 2 AM daily
    }
    
    if r.Spec.Backup.Retention == 0 && r.Spec.Backup.Enabled {
        r.Spec.Backup.Retention = 7
    }
}

// +kubebuilder:webhook:path=/validate-apps-example-com-v1-database,mutating=false,failurePolicy=fail,sideEffects=None,groups=apps.example.com,resources=databases,verbs=create;update,versions=v1,name=vdatabase.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Validator = &Database{}

// ValidateCreate implements webhook.Validator
func (r *Database) ValidateCreate() error {
    databaselog.Info("validate create", "name", r.Name)
    
    // Validate engine-specific constraints
    switch r.Spec.Engine {
    case "postgres":
        if r.Spec.Version < "12" {
            return fmt.Errorf("PostgreSQL version must be 12 or higher")
        }
    case "mysql":
        if r.Spec.Version < "8.0" {
            return fmt.Errorf("MySQL version must be 8.0 or higher")
        }
    case "mongodb":
        if r.Spec.Version < "4.4" {
            return fmt.Errorf("MongoDB version must be 4.4 or higher")
        }
    }
    
    // Validate storage size
    size, err := resource.ParseQuantity(r.Spec.Storage.Size)
    if err != nil {
        return fmt.Errorf("invalid storage size: %v", err)
    }
    
    minSize := resource.MustParse("1Gi")
    if size.Cmp(minSize) < 0 {
        return fmt.Errorf("storage size must be at least 1Gi")
    }
    
    return nil
}

// ValidateUpdate implements webhook.Validator
func (r *Database) ValidateUpdate(old runtime.Object) error {
    databaselog.Info("validate update", "name", r.Name)
    
    oldDB := old.(*Database)
    
    // Prevent engine changes
    if r.Spec.Engine != oldDB.Spec.Engine {
        return fmt.Errorf("database engine cannot be changed")
    }
    
    // Prevent downgrades
    if r.Spec.Version < oldDB.Spec.Version {
        return fmt.Errorf("database version cannot be downgraded")
    }
    
    // Prevent storage shrinking
    newSize, _ := resource.ParseQuantity(r.Spec.Storage.Size)
    oldSize, _ := resource.ParseQuantity(oldDB.Spec.Storage.Size)
    
    if newSize.Cmp(oldSize) < 0 {
        return fmt.Errorf("storage size cannot be reduced")
    }
    
    return nil
}

// ValidateDelete implements webhook.Validator
func (r *Database) ValidateDelete() error {
    databaselog.Info("validate delete", "name", r.Name)
    
    // Could add logic to prevent deletion of databases with active connections
    // or require confirmation annotation
    
    if r.Annotations["confirm-delete"] != "true" {
        return fmt.Errorf("database deletion requires 'confirm-delete: true' annotation")
    }
    
    return nil
}

Testing Your Operator

1. Unit Tests

// controllers/database_controller_test.go
package controllers

import (
    "context"
    "time"
    
    . "github.com/onsi/ginkgo"
    . "github.com/onsi/gomega"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/types"
    
    dbv1 "github.com/example/database-operator/api/v1"
)

var _ = Describe("Database Controller", func() {
    const (
        DatabaseName      = "test-database"
        DatabaseNamespace = "default"
        
        timeout  = time.Second * 10
        duration = time.Second * 10
        interval = time.Millisecond * 250
    )
    
    Context("When creating Database", func() {
        It("Should create StatefulSet and Service", func() {
            ctx := context.Background()
            database := &dbv1.Database{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      DatabaseName,
                    Namespace: DatabaseNamespace,
                },
                Spec: dbv1.DatabaseSpec{
                    Engine:   "postgres",
                    Version:  "14",
                    Replicas: 3,
                    Storage: dbv1.StorageSpec{
                        Size:  "10Gi",
                        Class: "fast-ssd",
                    },
                },
            }
            
            Expect(k8sClient.Create(ctx, database)).Should(Succeed())
            
            databaseLookupKey := types.NamespacedName{Name: DatabaseName, Namespace: DatabaseNamespace}
            createdDatabase := &dbv1.Database{}
            
            Eventually(func() bool {
                err := k8sClient.Get(ctx, databaseLookupKey, createdDatabase)
                return err == nil
            }, timeout, interval).Should(BeTrue())
            
            // Check StatefulSet creation
            Eventually(func() bool {
                statefulSet := &appsv1.StatefulSet{}
                err := k8sClient.Get(ctx, types.NamespacedName{
                    Name:      DatabaseName + "-postgres",
                    Namespace: DatabaseNamespace,
                }, statefulSet)
                
                if err != nil {
                    return false
                }
                
                return *statefulSet.Spec.Replicas == 3
            }, timeout, interval).Should(BeTrue())
            
            // Check Service creation
            Eventually(func() bool {
                service := &corev1.Service{}
                err := k8sClient.Get(ctx, types.NamespacedName{
                    Name:      DatabaseName + "-postgres",
                    Namespace: DatabaseNamespace,
                }, service)
                
                return err == nil
            }, timeout, interval).Should(BeTrue())
            
            // Check status update
            Eventually(func() string {
                err := k8sClient.Get(ctx, databaseLookupKey, createdDatabase)
                if err != nil {
                    return ""
                }
                return string(createdDatabase.Status.Phase)
            }, timeout, interval).Should(Equal(string(dbv1.DatabasePhaseCreating)))
        })
    })
})

2. Integration Tests

// test/e2e/database_test.go
package e2e

import (
    "fmt"
    "testing"
    "time"
    
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func TestDatabaseOperator(t *testing.T) {
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        t.Fatal(err)
    }
    
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        t.Fatal(err)
    }
    
    // Create test database
    database := &dbv1.Database{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "e2e-test-db",
            Namespace: "default",
        },
        Spec: dbv1.DatabaseSpec{
            Engine:   "postgres",
            Version:  "14",
            Replicas: 1,
            Storage: dbv1.StorageSpec{
                Size: "5Gi",
            },
            Backup: dbv1.BackupSpec{
                Enabled:   true,
                Schedule:  "*/5 * * * *", // Every 5 minutes for testing
                Retention: 1,
            },
        },
    }
    
    // Create database
    _, err = dbClient.Create(database)
    if err != nil {
        t.Fatal(err)
    }
    
    // Wait for database to be ready
    err = waitForDatabaseReady(dbClient, database.Name, 5*time.Minute)
    if err != nil {
        t.Fatal(err)
    }
    
    // Test database connection
    endpoint, err := getDatabaseEndpoint(dbClient, database.Name)
    if err != nil {
        t.Fatal(err)
    }
    
    err = testDatabaseConnection(endpoint)
    if err != nil {
        t.Fatal(err)
    }
    
    // Test backup creation
    err = waitForBackupJob(clientset, database.Name, 10*time.Minute)
    if err != nil {
        t.Fatal(err)
    }
    
    // Clean up
    err = dbClient.Delete(database.Name, &metav1.DeleteOptions{})
    if err != nil {
        t.Fatal(err)
    }
}

Deployment and Operations

1. Building and Publishing

# Dockerfile
FROM golang:1.19 as builder

WORKDIR /workspace
COPY go.mod go.sum ./
RUN go mod download

COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/

RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go

FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532

ENTRYPOINT ["/manager"]

2. Deployment Manifest

# config/manager/manager.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: database-operator-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: database-operator-controller-manager
  namespace: database-operator-system
spec:
  selector:
    matchLabels:
      control-plane: controller-manager
  replicas: 1
  template:
    metadata:
      labels:
        control-plane: controller-manager
    spec:
      securityContext:
        runAsNonRoot: true
      containers:
      - command:
        - /manager
        args:
        - --leader-elect
        image: database-operator:latest
        name: manager
        securityContext:
          allowPrivilegeEscalation: false
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8081
          initialDelaySeconds: 15
          periodSeconds: 20
        readinessProbe:
          httpGet:
            path: /readyz
            port: 8081
          initialDelaySeconds: 5
          periodSeconds: 10
        resources:
          limits:
            cpu: 500m
            memory: 128Mi
          requests:
            cpu: 10m
            memory: 64Mi
      serviceAccountName: database-operator-controller-manager
      terminationGracePeriodSeconds: 10

3. Monitoring with Prometheus

# Prometheus ServiceMonitor for operator metrics
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: database-operator-metrics
  namespace: database-operator-system
spec:
  endpoints:
  - path: /metrics
    port: https
    scheme: https
    tlsConfig:
      insecureSkipVerify: true
  selector:
    matchLabels:
      control-plane: controller-manager

Best Practices

  1. Resource Ownership: Always set owner references to ensure proper garbage collection
  2. Status Updates: Keep status separate from spec and update it regularly
  3. Idempotency: Ensure reconciliation logic is idempotent
  4. Error Handling: Use exponential backoff for retries
  5. Observability: Export metrics and use structured logging
  6. Testing: Write comprehensive unit and integration tests
  7. Security: Run with minimal privileges and use RBAC properly

Conclusion

Kubernetes Operators represent a powerful pattern for extending Kubernetes to manage complex applications. By encoding operational knowledge into software, operators automate tasks that would otherwise require manual intervention. Whether you're managing databases, message queues, or custom applications, the operator pattern provides a robust framework for cloud-native automation.

The combination of Custom Resource Definitions and controllers gives you the flexibility to define your own abstractions while leveraging Kubernetes' proven reconciliation loop pattern. As you build operators, remember that the goal is to make the complex simple—abstracting away operational complexity while providing a declarative API that fits naturally into the Kubernetes ecosystem.