跳转到内容

Operator开发基础

课程目标

通过本课程的学习,你将能够:

  • 理解Operator的概念、架构和工作原理
  • 掌握控制器模式和Reconcile循环
  • 使用Operator SDK创建Operator项目
  • 开发简单的Operator实现自定义资源管理
  • 部署和调试Operator
  • 应用Operator开发最佳实践

前置要求:已完成《云原生开发篇概述》课程,具备Go语言基础和K8S知识

一、Operator概述

1.1 为什么需要Operator

在Kubernetes上管理复杂有状态应用时,传统方式面临以下挑战:

挑战传统方式Operator解决方案
复杂配置手动编写大量YAML声明式API,简化配置
运维操作人工执行升级、备份自动化运维操作
故障恢复人工介入处理故障自动故障检测和恢复
领域知识依赖专家经验知识编码到软件中
生命周期难以管理完整生命周期全生命周期管理

1.2 什么是Operator

Operator = 自定义资源(CRD)+ 控制器(Controller)

Operator是一种特殊的Kubernetes控制器,它将运维专家的知识编码到软件中,实现复杂应用的自动化管理。

┌─────────────────────────────────────────────────────────────────┐
│                      Operator核心思想                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  声明式管理 + 控制循环 + 领域知识编码                              │
│                                                                 │
│  ┌─────────────┐         ┌─────────────┐         ┌───────────┐ │
│  │   用户声明    │────────▶│   自定义资源   │────────▶│  Operator │ │
│  │  (期望状态)   │         │   (CR)       │         │  (控制器)  │ │
│  └─────────────┘         └─────────────┘         └─────┬─────┘ │
│                                                        │       │
│                                                        │ 调谐   │
│                                                        ▼       │
│                                               ┌─────────────┐  │
│                                               │  Kubernetes │  │
│                                               │  原生资源    │  │
│                                               │ (Deployment │  │
│                                               │  Service等) │  │
│                                               └─────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.3 Operator的应用场景

典型应用场景

  1. 数据库管理

    • MySQL Operator:自动部署主从集群、自动故障切换
    • PostgreSQL Operator:备份恢复、连接池管理
    • MongoDB Operator:副本集管理、分片配置
  2. 消息队列管理

    • Kafka Operator:Topic管理、分区重平衡
    • RabbitMQ Operator:集群扩缩容、策略配置
  3. 监控系统

    • Prometheus Operator:自动发现监控目标、规则管理
    • Grafana Operator:Dashboard管理、数据源配置
  4. 其他复杂应用

    • Elasticsearch Operator:集群管理、索引策略
    • Vault Operator:密钥管理、自动轮换

二、Operator架构

2.1 核心组件

┌─────────────────────────────────────────────────────────────────┐
│                      Operator架构                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    Kubernetes Cluster                     │  │
│  │                                                          │  │
│  │  ┌──────────────┐      ┌──────────────┐                 │  │
│  │  │  Custom Resource │   │  CRD         │                 │  │
│  │  │  (CR实例)       │   │  (资源定义)    │                 │  │
│  │  └──────┬───────┘      └──────────────┘                 │  │
│  │         │                                                │  │
│  │         │ Watch                                          │  │
│  │         ▼                                                │  │
│  │  ┌──────────────────────────────────────────────────┐   │  │
│  │  │                  Operator                        │   │  │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐      │   │  │
│  │  │  │ Controller│  │ Informer │  │ WorkQueue│      │   │  │
│  │  │  │ (控制逻辑) │  │ (监听缓存)│  │ (事件队列)│      │   │  │
│  │  │  └────┬─────┘  └──────────┘  └──────────┘      │   │  │
│  │  │       │                                         │   │  │
│  │  │       │ Reconcile                               │   │  │
│  │  │       ▼                                         │   │  │
│  │  │  ┌──────────────────────────────────────────┐   │   │  │
│  │  │  │     Create/Update/Delete Native Resources │   │   │  │
│  │  │  │     (Deployment, Service, PVC等)          │   │   │  │
│  │  │  └──────────────────────────────────────────┘   │   │  │
│  │  └──────────────────────────────────────────────────┘   │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.2 控制器模式

控制器是Operator的核心,它实现了控制循环(Control Loop)

┌─────────────────────────────────────────────────────────────────┐
│                      控制循环(Reconcile Loop)                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│     ┌──────────┐                                               │
│     │   Start  │                                               │
│     └────┬─────┘                                               │
│          │                                                      │
│          ▼                                                      │
│  ┌──────────────┐                                               │
│  │  1. 获取当前CR  │◀──────────────────────────┐                │
│  │     (期望状态)  │                           │                │
│  └───────┬──────┘                           │                │
│          │                                   │                │
│          ▼                                   │                │
│  ┌──────────────┐                           │                │
│  │  2. 获取实际状态 │                           │                │
│  │  (查询K8s资源)  │                           │                │
│  └───────┬──────┘                           │                │
│          │                                   │                │
│          ▼                                   │                │
│  ┌──────────────┐                           │                │
│  │  3. 比较差异   │                           │                │
│  │  (期望vs实际)  │                           │                │
│  └───────┬──────┘                           │                │
│          │                                   │                │
│          ▼                                   │                │
│  ┌──────────────┐                           │                │
│  │  4. 执行调谐   │                           │                │
│  │  (创建/更新/删除)│                           │                │
│  └───────┬──────┘                           │                │
│          │                                   │                │
│          │ 返回结果                            │                │
│          │ (重新入队/成功)                      │                │
│          │                                   │                │
│          └───────────────────────────────────┘                │
│                     (循环继续)                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

2.3 Reconcile函数

Reconcile是控制器的核心函数,负责调谐实际状态以匹配期望状态:

go
// Reconcile 接收一个Request(包含Namespace和Name)
// 返回Result和error
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)
    log.Info("Reconciling", "resource", req.NamespacedName)
    
    // 1. 获取CR实例
    cr := &myappv1.MyApp{}
    if err := r.Get(ctx, req.NamespacedName, cr); err != nil {
        if errors.IsNotFound(err) {
            // CR已被删除,清理资源
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    
    // 2. 调谐Deployment
    if err := r.reconcileDeployment(ctx, cr); err != nil {
        return ctrl.Result{}, err
    }
    
    // 3. 调谐Service
    if err := r.reconcileService(ctx, cr); err != nil {
        return ctrl.Result{}, err
    }
    
    // 4. 更新CR状态
    if err := r.updateStatus(ctx, cr); err != nil {
        return ctrl.Result{}, err
    }
    
    // 5. 返回结果
    // Requeue: true 表示需要重新调谐
    // RequeueAfter: 表示延迟多久后重新调谐
    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

三、使用Operator SDK开发

3.1 创建Operator项目

bash
# 创建项目目录
mkdir -p ~/operators && cd ~/operators

# 初始化Operator项目
operator-sdk init --domain example.com --repo github.com/example/myapp-operator

# 输出:
# Writing kustomize manifests for you to edit...
# Writing scaffold for you to edit...
# Get controller runtime:
# $ go get sigs.k8s.io/controller-runtime@v0.16.3
# Update dependencies:
# $ go mod tidy

项目结构

myapp-operator/
├── api/                      # API定义(CRD)
│   └── v1/
│       ├── myapp_types.go    # CRD Go类型
│       └── zz_generated.deepcopy.go  # 自动生成的深拷贝代码
├── controllers/              # 控制器实现
│   └── myapp_controller.go   # 控制器逻辑
├── config/                   # K8s资源配置
│   ├── crd/                  # CRD YAML
│   ├── manager/              # Operator部署配置
│   ├── rbac/                 # RBAC权限配置
│   └── samples/              # CR示例
├── hack/                     # 脚本工具
├── main.go                   # 程序入口
├── go.mod                    # Go依赖
├── go.sum
├── Makefile                  # 构建脚本
├── Dockerfile                # 容器镜像构建
└── PROJECT                   # 项目元数据

3.2 定义CRD

创建API和CRD:

bash
# 创建API(CRD)
operator-sdk create api --group apps --version v1 --kind MyApp --resource --controller

# 输出:
# Writing kustomize manifests for you to edit...
# Writing scaffold for you to edit...
# api/v1/myapp_types.go
# controllers/myapp_controller.go

编辑 api/v1/myapp_types.go

go
package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// MyAppSpec 定义期望状态
type MyAppSpec struct {
    // +kubebuilder:validation:Minimum=1
    // +kubebuilder:validation:Maximum=10
    // +kubebuilder:default=1
    Replicas int32 `json:"replicas,omitempty"`
    
    // +kubebuilder:validation:Required
    Image string `json:"image"`
    
    // +kubebuilder:validation:Enum=info;debug;warn;error
    // +kubebuilder:default=info
    LogLevel string `json:"logLevel,omitempty"`
    
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
    
    ServicePort int32 `json:"servicePort,omitempty"`
}

// MyAppStatus 定义实际状态
type MyAppStatus struct {
    // +optional
    Conditions []metav1.Condition `json:"conditions,omitempty"`
    
    // +optional
    AvailableReplicas int32 `json:"availableReplicas,omitempty"`
    
    // +optional
    Phase string `json:"phase,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
// +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

type MyApp struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    
    Spec   MyAppSpec   `json:"spec,omitempty"`
    Status MyAppStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true
type MyAppList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []MyApp `json:"items"`
}

func init() {
    SchemeBuilder.Register(&MyApp{}, &MyAppList{})
}

3.3 生成代码和CRD

bash
# 生成深拷贝代码和CRD YAML
make generate
make manifests

# 输出:
# api/v1/zz_generated.deepcopy.go
# config/crd/bases/apps.example.com_myapps.yaml

3.4 实现控制器

编辑 controllers/myapp_controller.go

go
package controllers

import (
    "context"
    "fmt"
    "time"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"
    
    myappv1 "github.com/example/myapp-operator/api/v1"
)

// MyAppReconciler 协调MyApp资源
type MyAppReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=apps.example.com,resources=myapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=myapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.example.com,resources=myapps/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete

func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)
    log.Info("Reconciling MyApp", "namespace", req.Namespace, "name", req.Name)
    
    // 获取MyApp实例
    myapp := &myappv1.MyApp{}
    if err := r.Get(ctx, req.NamespacedName, myapp); err != nil {
        if errors.IsNotFound(err) {
            log.Info("MyApp resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        log.Error(err, "Failed to get MyApp")
        return ctrl.Result{}, err
    }
    
    // 调谐Deployment
    if err := r.reconcileDeployment(ctx, myapp); err != nil {
        return ctrl.Result{}, err
    }
    
    // 调谐Service
    if err := r.reconcileService(ctx, myapp); err != nil {
        return ctrl.Result{}, err
    }
    
    // 更新状态
    if err := r.updateStatus(ctx, myapp); err != nil {
        return ctrl.Result{}, err
    }
    
    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

// reconcileDeployment 调谐Deployment
func (r *MyAppReconciler) reconcileDeployment(ctx context.Context, myapp *myappv1.MyApp) error {
    log := log.FromContext(ctx)
    
    // 设置默认值
    replicas := myapp.Spec.Replicas
    if replicas == 0 {
        replicas = 1
    }
    
    // 定义期望的Deployment
    deployment := &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      myapp.Name,
            Namespace: myapp.Namespace,
            Labels:    r.labelsForMyApp(myapp),
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: r.labelsForMyApp(myapp),
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: r.labelsForMyApp(myapp),
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  "app",
                        Image: myapp.Spec.Image,
                        Ports: []corev1.ContainerPort{{
                            ContainerPort: myapp.Spec.ServicePort,
                            Name:          "http",
                        }},
                        Env: []corev1.EnvVar{{
                            Name:  "LOG_LEVEL",
                            Value: myapp.Spec.LogLevel,
                        }},
                        Resources: myapp.Spec.Resources,
                    }},
                },
            },
        },
    }
    
    // 设置OwnerReference
    if err := ctrl.SetControllerReference(myapp, deployment, r.Scheme); err != nil {
        return err
    }
    
    // 创建或更新Deployment
    found := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, found)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating Deployment", "name", deployment.Name)
        return r.Create(ctx, deployment)
    } else if err != nil {
        return err
    }
    
    // 更新Deployment
    found.Spec = deployment.Spec
    log.Info("Updating Deployment", "name", deployment.Name)
    return r.Update(ctx, found)
}

// reconcileService 调谐Service
func (r *MyAppReconciler) reconcileService(ctx context.Context, myapp *myappv1.MyApp) error {
    log := log.FromContext(ctx)
    
    port := myapp.Spec.ServicePort
    if port == 0 {
        port = 80
    }
    
    service := &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      myapp.Name,
            Namespace: myapp.Namespace,
            Labels:    r.labelsForMyApp(myapp),
        },
        Spec: corev1.ServiceSpec{
            Selector: r.labelsForMyApp(myapp),
            Ports: []corev1.ServicePort{{
                Port:       port,
                TargetPort: intstr.FromInt(int(port)),
                Name:       "http",
            }},
            Type: corev1.ServiceTypeClusterIP,
        },
    }
    
    if err := ctrl.SetControllerReference(myapp, service, r.Scheme); err != nil {
        return err
    }
    
    found := &corev1.Service{}
    err := r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, found)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating Service", "name", service.Name)
        return r.Create(ctx, service)
    } else if err != nil {
        return err
    }
    
    found.Spec = service.Spec
    log.Info("Updating Service", "name", service.Name)
    return r.Update(ctx, found)
}

// updateStatus 更新MyApp状态
func (r *MyAppReconciler) updateStatus(ctx context.Context, myapp *myappv1.MyApp) error {
    // 获取Deployment状态
    deployment := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{Name: myapp.Name, Namespace: myapp.Namespace}, deployment)
    if err != nil {
        return err
    }
    
    // 更新状态
    myapp.Status.AvailableReplicas = deployment.Status.AvailableReplicas
    
    if deployment.Status.AvailableReplicas == myapp.Spec.Replicas {
        myapp.Status.Phase = "Running"
    } else if deployment.Status.AvailableReplicas > 0 {
        myapp.Status.Phase = "Partial"
    } else {
        myapp.Status.Phase = "Pending"
    }
    
    return r.Status().Update(ctx, myapp)
}

// labelsForMyApp 返回MyApp的标签
func (r *MyAppReconciler) labelsForMyApp(myapp *myappv1.MyApp) map[string]string {
    return map[string]string{
        "app":     "myapp",
        "name":    myapp.Name,
        "managed-by": "myapp-operator",
    }
}

// SetupWithManager 设置控制器
func (r *MyAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&myappv1.MyApp{}).
        Owns(&appsv1.Deployment{}).
        Owns(&corev1.Service{}).
        Complete(r)
}

四、部署和调试

4.1 本地运行

bash
# 安装CRD到集群
make install

# 本地运行Operator
make run

# 输出:
# 2024-01-01T10:00:00.000Z	INFO	setup	starting manager
# 2024-01-01T10:00:00.001Z	INFO	starting server	{"path": "/metrics", "kind": "metrics"}

4.2 创建测试资源

bash
# 创建CR实例
cat > config/samples/myapp_v1_myapp.yaml <<EOF
apiVersion: apps.example.com/v1
kind: MyApp
metadata:
  name: myapp-sample
  namespace: default
spec:
  replicas: 3
  image: nginx:1.25
  logLevel: info
  servicePort: 80
  resources:
    requests:
      memory: "128Mi"
      cpu: "100m"
    limits:
      memory: "256Mi"
      cpu: "200m"
EOF

kubectl apply -f config/samples/myapp_v1_myapp.yaml

# 查看资源
kubectl get myapp
kubectl get deployment
kubectl get service

4.3 构建和部署Operator

bash
# 构建镜像
make docker-build IMG=myregistry/myapp-operator:v1.0.0

# 推送镜像
make docker-push IMG=myregistry/myapp-operator:v1.0.0

# 部署到集群
make deploy IMG=myregistry/myapp-operator:v1.0.0

# 查看Operator Pod
kubectl get pods -n myapp-operator-system

4.4 调试技巧

bash
# 查看Operator日志
kubectl logs -n myapp-operator-system -l control-plane=controller-manager -f

# 查看CR事件
kubectl describe myapp myapp-sample

# 查看生成的资源
kubectl get all -l app=myapp

# 删除CR(级联删除关联资源)
kubectl delete myapp myapp-sample

五、最佳实践

5.1 控制器设计原则

go
// ✅ 好的实践

// 1. 幂等性:多次执行结果相同
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 获取当前状态
    // 计算期望状态
    // 应用变更(如果不匹配)
    // 返回结果
}

// 2. 优雅的错误处理
if err != nil {
    // 记录详细错误
    log.Error(err, "Failed to reconcile", "resource", req.NamespacedName)
    
    // 更新状态为失败
    instance.Status.Phase = "Failed"
    r.Status().Update(ctx, instance)
    
    // 返回错误,触发重试
    return ctrl.Result{}, err
}

// 3. 使用OwnerReference管理资源生命周期
ctrl.SetControllerReference(owner, object, r.Scheme)

// 4. 合理设置重试间隔
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil

5.2 状态管理

go
// 使用Conditions记录详细状态
type MyAppStatus struct {
    Conditions []metav1.Condition `json:"conditions,omitempty"`
    Phase      string             `json:"phase,omitempty"`
}

// 更新Condition
func (r *Reconciler) updateCondition(ctx context.Context, myapp *myappv1.MyApp, 
    conditionType string, status metav1.ConditionStatus, reason, message string) {
    
    condition := metav1.Condition{
        Type:               conditionType,
        Status:             status,
        LastTransitionTime: metav1.Now(),
        Reason:             reason,
        Message:            message,
    }
    
    meta.SetStatusCondition(&myapp.Status.Conditions, condition)
}

5.3 资源清理(Finalizer)

go
// 添加Finalizer
const myAppFinalizer = "myapp.example.com/finalizer"

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    myapp := &myappv1.MyApp{}
    if err := r.Get(ctx, req.NamespacedName, myapp); err != nil {
        return ctrl.Result{}, err
    }
    
    // 检查是否被删除
    if !myapp.ObjectMeta.DeletionTimestamp.IsZero() {
        // 执行清理操作
        if err := r.cleanup(ctx, myapp); err != nil {
            return ctrl.Result{}, err
        }
        
        // 移除Finalizer
        controllerutil.RemoveFinalizer(myapp, myAppFinalizer)
        return ctrl.Result{}, r.Update(ctx, myapp)
    }
    
    // 添加Finalizer
    if !controllerutil.ContainsFinalizer(myapp, myAppFinalizer) {
        controllerutil.AddFinalizer(myapp, myAppFinalizer)
        return ctrl.Result{}, r.Update(ctx, myapp)
    }
    
    // 正常调谐逻辑
    // ...
}

六、总结

核心概念回顾

Operator开发
├── 核心概念
│   ├── Operator = CRD + Controller
│   ├── 声明式管理
│   └── 控制循环(Reconcile)

├── 开发流程
│   ├── 初始化项目(operator-sdk init)
│   ├── 定义API(create api)
│   ├── 实现控制器(Reconcile)
│   └── 部署运行

└── 最佳实践
    ├── 幂等性设计
    ├── OwnerReference
    ├── Finalizer清理
    └── 状态管理

下节预告

在《自定义CRD开发》中,我们将:

  • 深入学习CRD高级特性
  • 掌握OpenAPI验证
  • 实现多版本管理
  • 开发更复杂的自定义控制器

💡 学习建议

  1. 动手实践:跟随课程步骤创建自己的第一个Operator
  2. 阅读源码:研究开源Operator的实现(如Prometheus Operator)
  3. 理解原理:深入理解控制循环和Reconcile机制

评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径