主题
Operator开发基础
课程目标
通过本课程的学习,你将能够:
- 理解Operator的概念、架构和工作原理
- 掌握控制器模式和Reconcile循环
- 使用Operator SDK创建Operator项目
- 开发简单的Operator实现自定义资源管理
- 部署和调试Operator
- 应用Operator开发最佳实践
前置要求:已完成《云原生开发篇概述》课程,具备Go语言基础和K8S知识
一、Operator概述
1.1 为什么需要Operator
在Kubernetes上管理复杂有状态应用时,传统方式面临以下挑战:
| 挑战 | 传统方式 | Operator解决方案 |
|---|---|---|
| 复杂配置 | 手动编写大量YAML | 声明式API,简化配置 |
| 运维操作 | 人工执行升级、备份 | 自动化运维操作 |
| 故障恢复 | 人工介入处理故障 | 自动故障检测和恢复 |
| 领域知识 | 依赖专家经验 | 知识编码到软件中 |
| 生命周期 | 难以管理完整生命周期 | 全生命周期管理 |
1.2 什么是Operator
Operator = 自定义资源(CRD)+ 控制器(Controller)
Operator是一种特殊的Kubernetes控制器,它将运维专家的知识编码到软件中,实现复杂应用的自动化管理。
┌─────────────────────────────────────────────────────────────────┐
│ Operator核心思想 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 声明式管理 + 控制循环 + 领域知识编码 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ 用户声明 │────────▶│ 自定义资源 │────────▶│ Operator │ │
│ │ (期望状态) │ │ (CR) │ │ (控制器) │ │
│ └─────────────┘ └─────────────┘ └─────┬─────┘ │
│ │ │
│ │ 调谐 │
│ ▼ │
│ ┌─────────────┐ │
│ │ Kubernetes │ │
│ │ 原生资源 │ │
│ │ (Deployment │ │
│ │ Service等) │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘1.3 Operator的应用场景
典型应用场景:
数据库管理
- MySQL Operator:自动部署主从集群、自动故障切换
- PostgreSQL Operator:备份恢复、连接池管理
- MongoDB Operator:副本集管理、分片配置
消息队列管理
- Kafka Operator:Topic管理、分区重平衡
- RabbitMQ Operator:集群扩缩容、策略配置
监控系统
- Prometheus Operator:自动发现监控目标、规则管理
- Grafana Operator:Dashboard管理、数据源配置
其他复杂应用
- Elasticsearch Operator:集群管理、索引策略
- Vault Operator:密钥管理、自动轮换
二、Operator架构
2.1 核心组件
┌─────────────────────────────────────────────────────────────────┐
│ Operator架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Kubernetes Cluster │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Custom Resource │ │ CRD │ │ │
│ │ │ (CR实例) │ │ (资源定义) │ │ │
│ │ └──────┬───────┘ └──────────────┘ │ │
│ │ │ │ │
│ │ │ Watch │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ Operator │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Controller│ │ Informer │ │ WorkQueue│ │ │ │
│ │ │ │ (控制逻辑) │ │ (监听缓存)│ │ (事件队列)│ │ │ │
│ │ │ └────┬─────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ Reconcile │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌──────────────────────────────────────────┐ │ │ │
│ │ │ │ Create/Update/Delete Native Resources │ │ │ │
│ │ │ │ (Deployment, Service, PVC等) │ │ │ │
│ │ │ └──────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘2.2 控制器模式
控制器是Operator的核心,它实现了控制循环(Control Loop):
┌─────────────────────────────────────────────────────────────────┐
│ 控制循环(Reconcile Loop) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ Start │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 1. 获取当前CR │◀──────────────────────────┐ │
│ │ (期望状态) │ │ │
│ └───────┬──────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ 2. 获取实际状态 │ │ │
│ │ (查询K8s资源) │ │ │
│ └───────┬──────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ 3. 比较差异 │ │ │
│ │ (期望vs实际) │ │ │
│ └───────┬──────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────┐ │ │
│ │ 4. 执行调谐 │ │ │
│ │ (创建/更新/删除)│ │ │
│ └───────┬──────┘ │ │
│ │ │ │
│ │ 返回结果 │ │
│ │ (重新入队/成功) │ │
│ │ │ │
│ └───────────────────────────────────┘ │
│ (循环继续) │
│ │
└─────────────────────────────────────────────────────────────────┘2.3 Reconcile函数
Reconcile是控制器的核心函数,负责调谐实际状态以匹配期望状态:
go
// Reconcile 接收一个Request(包含Namespace和Name)
// 返回Result和error
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling", "resource", req.NamespacedName)
// 1. 获取CR实例
cr := &myappv1.MyApp{}
if err := r.Get(ctx, req.NamespacedName, cr); err != nil {
if errors.IsNotFound(err) {
// CR已被删除,清理资源
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// 2. 调谐Deployment
if err := r.reconcileDeployment(ctx, cr); err != nil {
return ctrl.Result{}, err
}
// 3. 调谐Service
if err := r.reconcileService(ctx, cr); err != nil {
return ctrl.Result{}, err
}
// 4. 更新CR状态
if err := r.updateStatus(ctx, cr); err != nil {
return ctrl.Result{}, err
}
// 5. 返回结果
// Requeue: true 表示需要重新调谐
// RequeueAfter: 表示延迟多久后重新调谐
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}三、使用Operator SDK开发
3.1 创建Operator项目
bash
# 创建项目目录
mkdir -p ~/operators && cd ~/operators
# 初始化Operator项目
operator-sdk init --domain example.com --repo github.com/example/myapp-operator
# 输出:
# Writing kustomize manifests for you to edit...
# Writing scaffold for you to edit...
# Get controller runtime:
# $ go get sigs.k8s.io/controller-runtime@v0.16.3
# Update dependencies:
# $ go mod tidy项目结构:
myapp-operator/
├── api/ # API定义(CRD)
│ └── v1/
│ ├── myapp_types.go # CRD Go类型
│ └── zz_generated.deepcopy.go # 自动生成的深拷贝代码
├── controllers/ # 控制器实现
│ └── myapp_controller.go # 控制器逻辑
├── config/ # K8s资源配置
│ ├── crd/ # CRD YAML
│ ├── manager/ # Operator部署配置
│ ├── rbac/ # RBAC权限配置
│ └── samples/ # CR示例
├── hack/ # 脚本工具
├── main.go # 程序入口
├── go.mod # Go依赖
├── go.sum
├── Makefile # 构建脚本
├── Dockerfile # 容器镜像构建
└── PROJECT # 项目元数据3.2 定义CRD
创建API和CRD:
bash
# 创建API(CRD)
operator-sdk create api --group apps --version v1 --kind MyApp --resource --controller
# 输出:
# Writing kustomize manifests for you to edit...
# Writing scaffold for you to edit...
# api/v1/myapp_types.go
# controllers/myapp_controller.go编辑 api/v1/myapp_types.go:
go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// MyAppSpec 定义期望状态
type MyAppSpec struct {
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=10
// +kubebuilder:default=1
Replicas int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Required
Image string `json:"image"`
// +kubebuilder:validation:Enum=info;debug;warn;error
// +kubebuilder:default=info
LogLevel string `json:"logLevel,omitempty"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
ServicePort int32 `json:"servicePort,omitempty"`
}
// MyAppStatus 定义实际状态
type MyAppStatus struct {
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`
// +optional
AvailableReplicas int32 `json:"availableReplicas,omitempty"`
// +optional
Phase string `json:"phase,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
// +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type MyApp struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyAppSpec `json:"spec,omitempty"`
Status MyAppStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
type MyAppList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyApp `json:"items"`
}
func init() {
SchemeBuilder.Register(&MyApp{}, &MyAppList{})
}3.3 生成代码和CRD
bash
# 生成深拷贝代码和CRD YAML
make generate
make manifests
# 输出:
# api/v1/zz_generated.deepcopy.go
# config/crd/bases/apps.example.com_myapps.yaml3.4 实现控制器
编辑 controllers/myapp_controller.go:
go
package controllers
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
myappv1 "github.com/example/myapp-operator/api/v1"
)
// MyAppReconciler 协调MyApp资源
type MyAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=apps.example.com,resources=myapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=myapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.example.com,resources=myapps/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling MyApp", "namespace", req.Namespace, "name", req.Name)
// 获取MyApp实例
myapp := &myappv1.MyApp{}
if err := r.Get(ctx, req.NamespacedName, myapp); err != nil {
if errors.IsNotFound(err) {
log.Info("MyApp resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get MyApp")
return ctrl.Result{}, err
}
// 调谐Deployment
if err := r.reconcileDeployment(ctx, myapp); err != nil {
return ctrl.Result{}, err
}
// 调谐Service
if err := r.reconcileService(ctx, myapp); err != nil {
return ctrl.Result{}, err
}
// 更新状态
if err := r.updateStatus(ctx, myapp); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// reconcileDeployment 调谐Deployment
func (r *MyAppReconciler) reconcileDeployment(ctx context.Context, myapp *myappv1.MyApp) error {
log := log.FromContext(ctx)
// 设置默认值
replicas := myapp.Spec.Replicas
if replicas == 0 {
replicas = 1
}
// 定义期望的Deployment
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: myapp.Name,
Namespace: myapp.Namespace,
Labels: r.labelsForMyApp(myapp),
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: r.labelsForMyApp(myapp),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: r.labelsForMyApp(myapp),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "app",
Image: myapp.Spec.Image,
Ports: []corev1.ContainerPort{{
ContainerPort: myapp.Spec.ServicePort,
Name: "http",
}},
Env: []corev1.EnvVar{{
Name: "LOG_LEVEL",
Value: myapp.Spec.LogLevel,
}},
Resources: myapp.Spec.Resources,
}},
},
},
},
}
// 设置OwnerReference
if err := ctrl.SetControllerReference(myapp, deployment, r.Scheme); err != nil {
return err
}
// 创建或更新Deployment
found := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating Deployment", "name", deployment.Name)
return r.Create(ctx, deployment)
} else if err != nil {
return err
}
// 更新Deployment
found.Spec = deployment.Spec
log.Info("Updating Deployment", "name", deployment.Name)
return r.Update(ctx, found)
}
// reconcileService 调谐Service
func (r *MyAppReconciler) reconcileService(ctx context.Context, myapp *myappv1.MyApp) error {
log := log.FromContext(ctx)
port := myapp.Spec.ServicePort
if port == 0 {
port = 80
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: myapp.Name,
Namespace: myapp.Namespace,
Labels: r.labelsForMyApp(myapp),
},
Spec: corev1.ServiceSpec{
Selector: r.labelsForMyApp(myapp),
Ports: []corev1.ServicePort{{
Port: port,
TargetPort: intstr.FromInt(int(port)),
Name: "http",
}},
Type: corev1.ServiceTypeClusterIP,
},
}
if err := ctrl.SetControllerReference(myapp, service, r.Scheme); err != nil {
return err
}
found := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, found)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating Service", "name", service.Name)
return r.Create(ctx, service)
} else if err != nil {
return err
}
found.Spec = service.Spec
log.Info("Updating Service", "name", service.Name)
return r.Update(ctx, found)
}
// updateStatus 更新MyApp状态
func (r *MyAppReconciler) updateStatus(ctx context.Context, myapp *myappv1.MyApp) error {
// 获取Deployment状态
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: myapp.Name, Namespace: myapp.Namespace}, deployment)
if err != nil {
return err
}
// 更新状态
myapp.Status.AvailableReplicas = deployment.Status.AvailableReplicas
if deployment.Status.AvailableReplicas == myapp.Spec.Replicas {
myapp.Status.Phase = "Running"
} else if deployment.Status.AvailableReplicas > 0 {
myapp.Status.Phase = "Partial"
} else {
myapp.Status.Phase = "Pending"
}
return r.Status().Update(ctx, myapp)
}
// labelsForMyApp 返回MyApp的标签
func (r *MyAppReconciler) labelsForMyApp(myapp *myappv1.MyApp) map[string]string {
return map[string]string{
"app": "myapp",
"name": myapp.Name,
"managed-by": "myapp-operator",
}
}
// SetupWithManager 设置控制器
func (r *MyAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&myappv1.MyApp{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Complete(r)
}四、部署和调试
4.1 本地运行
bash
# 安装CRD到集群
make install
# 本地运行Operator
make run
# 输出:
# 2024-01-01T10:00:00.000Z INFO setup starting manager
# 2024-01-01T10:00:00.001Z INFO starting server {"path": "/metrics", "kind": "metrics"}4.2 创建测试资源
bash
# 创建CR实例
cat > config/samples/myapp_v1_myapp.yaml <<EOF
apiVersion: apps.example.com/v1
kind: MyApp
metadata:
name: myapp-sample
namespace: default
spec:
replicas: 3
image: nginx:1.25
logLevel: info
servicePort: 80
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
EOF
kubectl apply -f config/samples/myapp_v1_myapp.yaml
# 查看资源
kubectl get myapp
kubectl get deployment
kubectl get service4.3 构建和部署Operator
bash
# 构建镜像
make docker-build IMG=myregistry/myapp-operator:v1.0.0
# 推送镜像
make docker-push IMG=myregistry/myapp-operator:v1.0.0
# 部署到集群
make deploy IMG=myregistry/myapp-operator:v1.0.0
# 查看Operator Pod
kubectl get pods -n myapp-operator-system4.4 调试技巧
bash
# 查看Operator日志
kubectl logs -n myapp-operator-system -l control-plane=controller-manager -f
# 查看CR事件
kubectl describe myapp myapp-sample
# 查看生成的资源
kubectl get all -l app=myapp
# 删除CR(级联删除关联资源)
kubectl delete myapp myapp-sample五、最佳实践
5.1 控制器设计原则
go
// ✅ 好的实践
// 1. 幂等性:多次执行结果相同
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 获取当前状态
// 计算期望状态
// 应用变更(如果不匹配)
// 返回结果
}
// 2. 优雅的错误处理
if err != nil {
// 记录详细错误
log.Error(err, "Failed to reconcile", "resource", req.NamespacedName)
// 更新状态为失败
instance.Status.Phase = "Failed"
r.Status().Update(ctx, instance)
// 返回错误,触发重试
return ctrl.Result{}, err
}
// 3. 使用OwnerReference管理资源生命周期
ctrl.SetControllerReference(owner, object, r.Scheme)
// 4. 合理设置重试间隔
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil5.2 状态管理
go
// 使用Conditions记录详细状态
type MyAppStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
Phase string `json:"phase,omitempty"`
}
// 更新Condition
func (r *Reconciler) updateCondition(ctx context.Context, myapp *myappv1.MyApp,
conditionType string, status metav1.ConditionStatus, reason, message string) {
condition := metav1.Condition{
Type: conditionType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
meta.SetStatusCondition(&myapp.Status.Conditions, condition)
}5.3 资源清理(Finalizer)
go
// 添加Finalizer
const myAppFinalizer = "myapp.example.com/finalizer"
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
myapp := &myappv1.MyApp{}
if err := r.Get(ctx, req.NamespacedName, myapp); err != nil {
return ctrl.Result{}, err
}
// 检查是否被删除
if !myapp.ObjectMeta.DeletionTimestamp.IsZero() {
// 执行清理操作
if err := r.cleanup(ctx, myapp); err != nil {
return ctrl.Result{}, err
}
// 移除Finalizer
controllerutil.RemoveFinalizer(myapp, myAppFinalizer)
return ctrl.Result{}, r.Update(ctx, myapp)
}
// 添加Finalizer
if !controllerutil.ContainsFinalizer(myapp, myAppFinalizer) {
controllerutil.AddFinalizer(myapp, myAppFinalizer)
return ctrl.Result{}, r.Update(ctx, myapp)
}
// 正常调谐逻辑
// ...
}六、总结
核心概念回顾
Operator开发
├── 核心概念
│ ├── Operator = CRD + Controller
│ ├── 声明式管理
│ └── 控制循环(Reconcile)
│
├── 开发流程
│ ├── 初始化项目(operator-sdk init)
│ ├── 定义API(create api)
│ ├── 实现控制器(Reconcile)
│ └── 部署运行
│
└── 最佳实践
├── 幂等性设计
├── OwnerReference
├── Finalizer清理
└── 状态管理下节预告
在《自定义CRD开发》中,我们将:
- 深入学习CRD高级特性
- 掌握OpenAPI验证
- 实现多版本管理
- 开发更复杂的自定义控制器
💡 学习建议:
- 动手实践:跟随课程步骤创建自己的第一个Operator
- 阅读源码:研究开源Operator的实现(如Prometheus Operator)
- 理解原理:深入理解控制循环和Reconcile机制