跳转到内容

K8s客户端开发

课程目标

通过本课程的学习,你将能够:

  • 深入理解client-go架构和核心组件
  • 掌握Informer机制和工作原理
  • 使用WorkQueue实现异步任务处理
  • 开发K8s管理工具和CLI应用
  • 实现自定义控制器和Operator
  • 应用client-go最佳实践

前置要求:已完成《自定义CRD开发》课程,具备Operator开发经验

一、client-go概述

1.1 什么是client-go

client-go是Kubernetes官方提供的Go语言客户端库,用于与Kubernetes API Server进行交互。

┌─────────────────────────────────────────────────────────────────┐
│                      client-go架构                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                      应用层                               │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐              │  │
│  │  │ Controller│  │  CLI Tool │  │ Operator  │              │  │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘              │  │
│  └───────┼─────────────┼─────────────┼──────────────────────┘  │
│          │             │             │                         │
│  ┌───────┼─────────────┼─────────────┼──────────────────────┐  │
│  │       │   client-go 核心组件      │                      │  │
│  │  ┌────┴─────────────┴─────────────┴────┐               │  │
│  │  │           ClientSet / Dynamic       │               │  │
│  │  │              (类型化客户端)          │               │  │
│  │  └───────────────┬─────────────────────┘               │  │
│  │                  │                                      │  │
│  │  ┌───────────────┼─────────────────────┐               │  │
│  │  │  RESTClient   │   Informer          │               │  │
│  │  │  (底层HTTP)   │   (缓存+监听)        │               │  │
│  │  └───────────────┴─────────────────────┘               │  │
│  └──────────────────────────────────────────────────────────┘  │
│                              │                                  │
│                              ▼                                  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Kubernetes API Server                        │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 核心组件

组件说明用途
RESTClient最底层的HTTP客户端直接发起HTTP请求
ClientSet类型化的客户端集合操作内置K8s资源
DynamicClient动态客户端操作CRD等动态资源
Informer资源监听和缓存机制高效监听资源变化
WorkQueue任务队列异步处理事件
Indexer本地索引快速查询缓存

二、基础客户端使用

2.1 创建客户端

go
package main

import (
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    "path/filepath"
)

func createClient() (*kubernetes.Clientset, error) {
    // 方式1:使用kubeconfig文件
    var kubeconfig string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = filepath.Join(home, ".kube", "config")
    }
    
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        return nil, err
    }
    
    // 方式2:使用in-cluster配置(在Pod中运行)
    // config, err := rest.InClusterConfig()
    
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }
    
    return clientset, nil
}

2.2 操作资源

go
package main

import (
    "context"
    "fmt"
    
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
)

// 获取Pod列表
func listPods(clientset *kubernetes.Clientset, namespace string) (*corev1.PodList, error) {
    pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
    return pods, err
}

// 创建Pod
func createPod(clientset *kubernetes.Clientset, namespace string, pod *corev1.Pod) (*corev1.Pod, error) {
    return clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}

// 更新Pod
func updatePod(clientset *kubernetes.Clientset, namespace string, pod *corev1.Pod) (*corev1.Pod, error) {
    return clientset.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
}

// 删除Pod
func deletePod(clientset *kubernetes.Clientset, namespace, name string) error {
    return clientset.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}

// 获取Pod日志
func getPodLogs(clientset *kubernetes.Clientset, namespace, name string) (string, error) {
    req := clientset.CoreV1().Pods(namespace).GetLogs(name, &corev1.PodLogOptions{})
    logs, err := req.Do(context.TODO()).Raw()
    return string(logs), err
}

2.3 动态客户端

操作CRD等动态资源:

go
package main

import (
    "context"
    "fmt"
    
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
)

func dynamicClientExample() {
    // 创建动态客户端
    config, _ := clientcmd.BuildConfigFromFlags("", kubeconfig)
    dynamicClient, _ := dynamic.NewForConfig(config)
    
    // 定义GVR(Group Version Resource)
    gvr := schema.GroupVersionResource{
        Group:    "apps.example.com",
        Version:  "v1",
        Resource: "myapps",
    }
    
    // 创建CR实例
    myapp := &unstructured.Unstructured{
        Object: map[string]interface{}{
            "apiVersion": "apps.example.com/v1",
            "kind":       "MyApp",
            "metadata": map[string]interface{}{
                "name":      "myapp-sample",
                "namespace": "default",
            },
            "spec": map[string]interface{}{
                "replicas": 3,
                "image":    "nginx:1.25",
            },
        },
    }
    
    // 创建资源
    result, err := dynamicClient.Resource(gvr).Namespace("default").Create(
        context.TODO(), myapp, metav1.CreateOptions{},
    )
    
    // 获取资源
    obj, err := dynamicClient.Resource(gvr).Namespace("default").Get(
        context.TODO(), "myapp-sample", metav1.GetOptions{},
    )
    
    // 获取spec字段
    spec, found, err := unstructured.NestedMap(obj.Object, "spec")
    if found {
        replicas, _ := unstructured.NestedInt64(spec, "replicas")
        fmt.Printf("Replicas: %d\n", replicas)
    }
}

三、Informer机制

3.1 Informer原理

┌─────────────────────────────────────────────────────────────────┐
│                      Informer工作原理                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐                                               │
│  │  API Server  │                                               │
│  └──────┬───────┘                                               │
│         │ List & Watch                                          │
│         ▼                                                       │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                      Informer                             │  │
│  │                                                          │  │
│  │  ┌──────────────┐      ┌──────────────┐                 │  │
│  │  │  Reflector   │─────▶│   DeltaFIFO  │                 │  │
│  │  │  (ListWatch) │      │   (事件队列)  │                 │  │
│  │  └──────────────┘      └──────┬───────┘                 │  │
│  │                               │                         │  │
│  │                               ▼                         │  │
│  │  ┌──────────────────────────────────────────────────┐   │  │
│  │  │              Indexer (本地缓存)                    │   │  │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐       │   │  │
│  │  │  │ Store    │  │ Index    │  │ ThreadSafe│       │   │  │
│  │  │  │ (对象存储)│  │ (索引)   │  │ (并发安全)│       │   │  │
│  │  │  └──────────┘  └──────────┘  └──────────┘       │   │  │
│  │  └──────────────────────────────────────────────────┘   │  │
│  │                               │                         │  │
│  │                               │ 通知                    │  │
│  │                               ▼                         │  │
│  │  ┌──────────────────────────────────────────────────┐   │  │
│  │  │              EventHandler                         │   │  │
│  │  │  ├─ OnAdd()                                      │   │  │
│  │  │  ├─ OnUpdate()                                   │   │  │
│  │  │  └─ OnDelete()                                   │   │  │
│  │  └──────────────────────────────────────────────────┘   │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3.2 使用Informer

go
package main

import (
    "fmt"
    "time"
    
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
)

func informerExample(clientset *kubernetes.Clientset) {
    // 停止信号
    stopper := make(chan struct{})
    defer close(stopper)
    
    // 创建Informer工厂
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    
    // 获取Pod Informer
    podInformer := factory.Core().V1().Pods().Informer()
    
    // 添加事件处理器
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("Pod Added: %s/%s\n", pod.Namespace, pod.Name)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldPod := oldObj.(*corev1.Pod)
            newPod := newObj.(*corev1.Pod)
            fmt.Printf("Pod Updated: %s/%s (phase: %s -> %s)\n", 
                newPod.Namespace, newPod.Name, 
                oldPod.Status.Phase, newPod.Status.Phase)
        },
        DeleteFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            fmt.Printf("Pod Deleted: %s/%s\n", pod.Namespace, pod.Name)
        },
    })
    
    // 启动Informer
    factory.Start(stopper)
    
    // 等待缓存同步
    if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
        runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
        return
    }
    
    // 从本地缓存获取Pod
    podLister := factory.Core().V1().Pods().Lister()
    pods, err := podLister.List(labels.Everything())
    
    // 阻塞
    <-stopper
}

3.3 自定义Informer

go
package main

import (
    "time"
    
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/watch"
    "k8s.io/client-go/tools/cache"
)

func customInformerExample(clientset *kubernetes.Clientset) {
    // 创建ListWatcher
    listWatcher := &cache.ListWatch{
        ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
            return clientset.CoreV1().Pods("").List(context.TODO(), options)
        },
        WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
            return clientset.CoreV1().Pods("").Watch(context.TODO(), options)
        },
    }
    
    // 创建Indexer
    indexer, informer := cache.NewIndexerInformer(
        listWatcher,
        &corev1.Pod{},
        30*time.Second,  // resync周期
        cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                // 处理添加事件
            },
            UpdateFunc: func(oldObj, newObj interface{}) {
                // 处理更新事件
            },
            DeleteFunc: func(obj interface{}) {
                // 处理删除事件
            },
        },
        cache.Indexers{
            cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
        },
    )
    
    // 使用Indexer查询
    pods, err := indexer.ByIndex(cache.NamespaceIndex, "default")
}

四、WorkQueue

4.1 WorkQueue原理

┌─────────────────────────────────────────────────────────────────┐
│                      WorkQueue工作原理                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐                                               │
│  │  Informer    │                                               │
│  │  (事件源)     │                                               │
│  └──────┬───────┘                                               │
│         │ Add()                                                 │
│         ▼                                                       │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                      WorkQueue                            │  │
│  │                                                          │  │
│  │  ┌──────────────┐      ┌──────────────┐                 │  │
│  │  │   Queue      │      │   Dirty Set  │                 │  │
│  │  │  (处理队列)   │      │  (去重集合)   │                 │  │
│  │  └──────┬───────┘      └──────────────┘                 │  │
│  │         │                                                │  │
│  │         │ Get()                                          │  │
│  │         ▼                                                │  │
│  │  ┌──────────────┐      ┌──────────────┐                 │  │
│  │  │   Processor  │      │  InProgress  │                 │  │
│  │  │  (处理中)     │      │   Set        │                 │  │
│  │  └──────┬───────┘      └──────────────┘                 │  │
│  │         │                                                │  │
│  │         │ Done()                                         │  │
│  │         ▼                                                │  │
│  │  ┌──────────────┐                                        │  │
│  │  │   Handler    │                                        │  │
│  │  │  (业务处理)   │                                        │  │
│  │  └──────────────┘                                        │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

4.2 使用WorkQueue

go
package main

import (
    "fmt"
    "time"
    
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    workqueue workqueue.RateLimitingInterface
    informer  cache.SharedIndexInformer
}

func NewController(informer cache.SharedIndexInformer) *Controller {
    c := &Controller{
        workqueue: workqueue.NewNamedRateLimitingQueue(
            workqueue.DefaultControllerRateLimiter(),
            "MyApp",
        ),
        informer: informer,
    }
    
    // 添加事件处理器
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: c.enqueue,
        UpdateFunc: func(old, new interface{}) {
            c.enqueue(new)
        },
        DeleteFunc: c.enqueue,
    })
    
    return c
}

func (c *Controller) enqueue(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        return
    }
    c.workqueue.Add(key)
}

func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
    defer c.workqueue.ShutDown()
    
    // 启动Informer
    go c.informer.Run(stopCh)
    
    // 等待缓存同步
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        return
}
    
    // 启动工作线程
    for i := 0; i < workers; i++ {
        go c.runWorker()
    }
    
    <-stopCh
}

func (c *Controller) runWorker() {
    for c.processNextItem() {
    }
}

func (c *Controller) processNextItem() bool {
    // 从队列获取元素
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    
    // 处理完成后标记Done
    defer c.workqueue.Done(obj)
    
    key, ok := obj.(string)
    if !ok {
        c.workqueue.Forget(obj)
        return true
    }
    
    // 处理业务逻辑
    if err := c.syncHandler(key); err != nil {
        // 失败时重新入队(带退避)
        c.workqueue.AddRateLimited(key)
        return true
    }
    
    // 成功,忘记该key
    c.workqueue.Forget(obj)
    return true
}

func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    
    // 从缓存获取对象
    obj, exists, err := c.informer.GetIndexer().GetByKey(key)
    if err != nil {
        return err
    }
    
    if !exists {
        // 对象被删除
        fmt.Printf("Object %s/%s deleted\n", namespace, name)
        return nil
    }
    
    // 处理对象
    fmt.Printf("Processing %s/%s\n", namespace, name)
    
    return nil
}

五、开发K8s CLI工具

5.1 完整CLI示例

go
package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "path/filepath"
    
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

type K8sCLI struct {
    clientset *kubernetes.Clientset
}

func NewK8sCLI() (*K8sCLI, error) {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "kubeconfig file")
    }
    flag.Parse()
    
    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        return nil, err
    }
    
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }
    
    return &K8sCLI{clientset: clientset}, nil
}

func (cli *K8sCLI) ListPods(namespace string) error {
    pods, err := cli.clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        return err
    }
    
    fmt.Printf("%-30s %-15s %-10s\n", "NAME", "STATUS", "AGE")
    for _, pod := range pods.Items {
        age := time.Since(pod.CreationTimestamp.Time).Round(time.Second)
        fmt.Printf("%-30s %-15s %-10s\n", pod.Name, pod.Status.Phase, age)
    }
    
    return nil
}

func (cli *K8sCLI) GetPodStatus(name, namespace string) error {
    pod, err := cli.clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
    if err != nil {
        return err
    }
    
    fmt.Printf("Name:        %s\n", pod.Name)
    fmt.Printf("Namespace:   %s\n", pod.Namespace)
    fmt.Printf("Status:      %s\n", pod.Status.Phase)
    fmt.Printf("Node:        %s\n", pod.Spec.NodeName)
    fmt.Printf("IP:          %s\n", pod.Status.PodIP)
    
    return nil
}

func main() {
    cli, err := NewK8sCLI()
    if err != nil {
        fmt.Fprintf(os.Stderr, "Error: %v\n", err)
        os.Exit(1)
    }
    
    if len(os.Args) < 2 {
        fmt.Println("Usage: k8s-cli <command> [args]")
        fmt.Println("Commands:")
        fmt.Println("  list-pods [namespace]")
        fmt.Println("  pod-status <name> [namespace]")
        os.Exit(1)
    }
    
    switch os.Args[1] {
    case "list-pods":
        namespace := "default"
        if len(os.Args) > 2 {
            namespace = os.Args[2]
        }
        if err := cli.ListPods(namespace); err != nil {
            fmt.Fprintf(os.Stderr, "Error: %v\n", err)
            os.Exit(1)
        }
    case "pod-status":
        if len(os.Args) < 3 {
            fmt.Println("Usage: k8s-cli pod-status <name> [namespace]")
            os.Exit(1)
        }
        name := os.Args[2]
        namespace := "default"
        if len(os.Args) > 3 {
            namespace = os.Args[3]
        }
        if err := cli.GetPodStatus(name, namespace); err != nil {
            fmt.Fprintf(os.Stderr, "Error: %v\n", err)
            os.Exit(1)
        }
    default:
        fmt.Printf("Unknown command: %s\n", os.Args[1])
        os.Exit(1)
    }
}

六、最佳实践

6.1 错误处理

go
// ✅ 好的实践

// 1. 使用context控制超时
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})

// 2. 检查特定错误类型
if errors.IsNotFound(err) {
    // 资源不存在
} else if errors.IsConflict(err) {
    // 冲突错误,可能需要重试
} else if err != nil {
    // 其他错误
}

// 3. 使用重试机制
retry.OnError(retry.DefaultRetry, errors.IsConflict, func() error {
    return updateResource()
})

6.2 性能优化

go
// 1. 使用SharedInformerFactory共享Informer
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)

// 2. 使用ListOptions限制返回字段
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
    LabelSelector: "app=myapp",
    FieldSelector: "status.phase=Running",
})

// 3. 使用缓存而非直接查询API Server
podLister := factory.Core().V1().Pods().Lister()
pod, err := podLister.Pods(namespace).Get(name)

七、总结

核心概念回顾

client-go开发
├── 基础客户端
│   ├── RESTClient(底层HTTP)
│   ├── ClientSet(类型化客户端)
│   └── DynamicClient(动态客户端)

├── Informer机制
│   ├── Reflector(ListWatch)
│   ├── DeltaFIFO(事件队列)
│   ├── Indexer(本地缓存)
│   └── EventHandler(事件处理)

├── WorkQueue
│   ├── 任务队列
│   ├── 去重机制
│   └── 退避重试

└── 实战应用
    ├── 自定义控制器
    ├── CLI工具开发
    └── 性能优化

💡 学习建议

  1. 深入理解Informer机制,它是client-go的核心
  2. 使用WorkQueue处理异步任务,保证可靠性
  3. 善用本地缓存,减少对API Server的压力

评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径