主题
K8s客户端开发
课程目标
通过本课程的学习,你将能够:
- 深入理解client-go架构和核心组件
- 掌握Informer机制和工作原理
- 使用WorkQueue实现异步任务处理
- 开发K8s管理工具和CLI应用
- 实现自定义控制器和Operator
- 应用client-go最佳实践
前置要求:已完成《自定义CRD开发》课程,具备Operator开发经验
一、client-go概述
1.1 什么是client-go
client-go是Kubernetes官方提供的Go语言客户端库,用于与Kubernetes API Server进行交互。
┌─────────────────────────────────────────────────────────────────┐
│ client-go架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 应用层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Controller│ │ CLI Tool │ │ Operator │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ └───────┼─────────────┼─────────────┼──────────────────────┘ │
│ │ │ │ │
│ ┌───────┼─────────────┼─────────────┼──────────────────────┐ │
│ │ │ client-go 核心组件 │ │ │
│ │ ┌────┴─────────────┴─────────────┴────┐ │ │
│ │ │ ClientSet / Dynamic │ │ │
│ │ │ (类型化客户端) │ │ │
│ │ └───────────────┬─────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────┼─────────────────────┐ │ │
│ │ │ RESTClient │ Informer │ │ │
│ │ │ (底层HTTP) │ (缓存+监听) │ │ │
│ │ └───────────────┴─────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Kubernetes API Server │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘1.2 核心组件
| 组件 | 说明 | 用途 |
|---|---|---|
| RESTClient | 最底层的HTTP客户端 | 直接发起HTTP请求 |
| ClientSet | 类型化的客户端集合 | 操作内置K8s资源 |
| DynamicClient | 动态客户端 | 操作CRD等动态资源 |
| Informer | 资源监听和缓存机制 | 高效监听资源变化 |
| WorkQueue | 任务队列 | 异步处理事件 |
| Indexer | 本地索引 | 快速查询缓存 |
二、基础客户端使用
2.1 创建客户端
go
package main
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
)
func createClient() (*kubernetes.Clientset, error) {
// 方式1:使用kubeconfig文件
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
// 方式2:使用in-cluster配置(在Pod中运行)
// config, err := rest.InClusterConfig()
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}2.2 操作资源
go
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// 获取Pod列表
func listPods(clientset *kubernetes.Clientset, namespace string) (*corev1.PodList, error) {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
return pods, err
}
// 创建Pod
func createPod(clientset *kubernetes.Clientset, namespace string, pod *corev1.Pod) (*corev1.Pod, error) {
return clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}
// 更新Pod
func updatePod(clientset *kubernetes.Clientset, namespace string, pod *corev1.Pod) (*corev1.Pod, error) {
return clientset.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
}
// 删除Pod
func deletePod(clientset *kubernetes.Clientset, namespace, name string) error {
return clientset.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}
// 获取Pod日志
func getPodLogs(clientset *kubernetes.Clientset, namespace, name string) (string, error) {
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &corev1.PodLogOptions{})
logs, err := req.Do(context.TODO()).Raw()
return string(logs), err
}2.3 动态客户端
操作CRD等动态资源:
go
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)
func dynamicClientExample() {
// 创建动态客户端
config, _ := clientcmd.BuildConfigFromFlags("", kubeconfig)
dynamicClient, _ := dynamic.NewForConfig(config)
// 定义GVR(Group Version Resource)
gvr := schema.GroupVersionResource{
Group: "apps.example.com",
Version: "v1",
Resource: "myapps",
}
// 创建CR实例
myapp := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps.example.com/v1",
"kind": "MyApp",
"metadata": map[string]interface{}{
"name": "myapp-sample",
"namespace": "default",
},
"spec": map[string]interface{}{
"replicas": 3,
"image": "nginx:1.25",
},
},
}
// 创建资源
result, err := dynamicClient.Resource(gvr).Namespace("default").Create(
context.TODO(), myapp, metav1.CreateOptions{},
)
// 获取资源
obj, err := dynamicClient.Resource(gvr).Namespace("default").Get(
context.TODO(), "myapp-sample", metav1.GetOptions{},
)
// 获取spec字段
spec, found, err := unstructured.NestedMap(obj.Object, "spec")
if found {
replicas, _ := unstructured.NestedInt64(spec, "replicas")
fmt.Printf("Replicas: %d\n", replicas)
}
}三、Informer机制
3.1 Informer原理
┌─────────────────────────────────────────────────────────────────┐
│ Informer工作原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ API Server │ │
│ └──────┬───────┘ │
│ │ List & Watch │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Informer │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Reflector │─────▶│ DeltaFIFO │ │ │
│ │ │ (ListWatch) │ │ (事件队列) │ │ │
│ │ └──────────────┘ └──────┬───────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ Indexer (本地缓存) │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Store │ │ Index │ │ ThreadSafe│ │ │ │
│ │ │ │ (对象存储)│ │ (索引) │ │ (并发安全)│ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ │ 通知 │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ EventHandler │ │ │
│ │ │ ├─ OnAdd() │ │ │
│ │ │ ├─ OnUpdate() │ │ │
│ │ │ └─ OnDelete() │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘3.2 使用Informer
go
package main
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
func informerExample(clientset *kubernetes.Clientset) {
// 停止信号
stopper := make(chan struct{})
defer close(stopper)
// 创建Informer工厂
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// 获取Pod Informer
podInformer := factory.Core().V1().Pods().Informer()
// 添加事件处理器
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod Added: %s/%s\n", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
fmt.Printf("Pod Updated: %s/%s (phase: %s -> %s)\n",
newPod.Namespace, newPod.Name,
oldPod.Status.Phase, newPod.Status.Phase)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Pod Deleted: %s/%s\n", pod.Namespace, pod.Name)
},
})
// 启动Informer
factory.Start(stopper)
// 等待缓存同步
if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// 从本地缓存获取Pod
podLister := factory.Core().V1().Pods().Lister()
pods, err := podLister.List(labels.Everything())
// 阻塞
<-stopper
}3.3 自定义Informer
go
package main
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func customInformerExample(clientset *kubernetes.Clientset) {
// 创建ListWatcher
listWatcher := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods("").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Pods("").Watch(context.TODO(), options)
},
}
// 创建Indexer
indexer, informer := cache.NewIndexerInformer(
listWatcher,
&corev1.Pod{},
30*time.Second, // resync周期
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 处理添加事件
},
UpdateFunc: func(oldObj, newObj interface{}) {
// 处理更新事件
},
DeleteFunc: func(obj interface{}) {
// 处理删除事件
},
},
cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
},
)
// 使用Indexer查询
pods, err := indexer.ByIndex(cache.NamespaceIndex, "default")
}四、WorkQueue
4.1 WorkQueue原理
┌─────────────────────────────────────────────────────────────────┐
│ WorkQueue工作原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Informer │ │
│ │ (事件源) │ │
│ └──────┬───────┘ │
│ │ Add() │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ WorkQueue │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Queue │ │ Dirty Set │ │ │
│ │ │ (处理队列) │ │ (去重集合) │ │ │
│ │ └──────┬───────┘ └──────────────┘ │ │
│ │ │ │ │
│ │ │ Get() │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Processor │ │ InProgress │ │ │
│ │ │ (处理中) │ │ Set │ │ │
│ │ └──────┬───────┘ └──────────────┘ │ │
│ │ │ │ │
│ │ │ Done() │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Handler │ │ │
│ │ │ (业务处理) │ │ │
│ │ └──────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘4.2 使用WorkQueue
go
package main
import (
"fmt"
"time"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
workqueue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
}
func NewController(informer cache.SharedIndexInformer) *Controller {
c := &Controller{
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
"MyApp",
),
informer: informer,
}
// 添加事件处理器
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(old, new interface{}) {
c.enqueue(new)
},
DeleteFunc: c.enqueue,
})
return c
}
func (c *Controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return
}
c.workqueue.Add(key)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer c.workqueue.ShutDown()
// 启动Informer
go c.informer.Run(stopCh)
// 等待缓存同步
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
return
}
// 启动工作线程
for i := 0; i < workers; i++ {
go c.runWorker()
}
<-stopCh
}
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
func (c *Controller) processNextItem() bool {
// 从队列获取元素
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// 处理完成后标记Done
defer c.workqueue.Done(obj)
key, ok := obj.(string)
if !ok {
c.workqueue.Forget(obj)
return true
}
// 处理业务逻辑
if err := c.syncHandler(key); err != nil {
// 失败时重新入队(带退避)
c.workqueue.AddRateLimited(key)
return true
}
// 成功,忘记该key
c.workqueue.Forget(obj)
return true
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 从缓存获取对象
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return err
}
if !exists {
// 对象被删除
fmt.Printf("Object %s/%s deleted\n", namespace, name)
return nil
}
// 处理对象
fmt.Printf("Processing %s/%s\n", namespace, name)
return nil
}五、开发K8s CLI工具
5.1 完整CLI示例
go
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
type K8sCLI struct {
clientset *kubernetes.Clientset
}
func NewK8sCLI() (*K8sCLI, error) {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &K8sCLI{clientset: clientset}, nil
}
func (cli *K8sCLI) ListPods(namespace string) error {
pods, err := cli.clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}
fmt.Printf("%-30s %-15s %-10s\n", "NAME", "STATUS", "AGE")
for _, pod := range pods.Items {
age := time.Since(pod.CreationTimestamp.Time).Round(time.Second)
fmt.Printf("%-30s %-15s %-10s\n", pod.Name, pod.Status.Phase, age)
}
return nil
}
func (cli *K8sCLI) GetPodStatus(name, namespace string) error {
pod, err := cli.clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
fmt.Printf("Name: %s\n", pod.Name)
fmt.Printf("Namespace: %s\n", pod.Namespace)
fmt.Printf("Status: %s\n", pod.Status.Phase)
fmt.Printf("Node: %s\n", pod.Spec.NodeName)
fmt.Printf("IP: %s\n", pod.Status.PodIP)
return nil
}
func main() {
cli, err := NewK8sCLI()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
if len(os.Args) < 2 {
fmt.Println("Usage: k8s-cli <command> [args]")
fmt.Println("Commands:")
fmt.Println(" list-pods [namespace]")
fmt.Println(" pod-status <name> [namespace]")
os.Exit(1)
}
switch os.Args[1] {
case "list-pods":
namespace := "default"
if len(os.Args) > 2 {
namespace = os.Args[2]
}
if err := cli.ListPods(namespace); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
case "pod-status":
if len(os.Args) < 3 {
fmt.Println("Usage: k8s-cli pod-status <name> [namespace]")
os.Exit(1)
}
name := os.Args[2]
namespace := "default"
if len(os.Args) > 3 {
namespace = os.Args[3]
}
if err := cli.GetPodStatus(name, namespace); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
default:
fmt.Printf("Unknown command: %s\n", os.Args[1])
os.Exit(1)
}
}六、最佳实践
6.1 错误处理
go
// ✅ 好的实践
// 1. 使用context控制超时
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
// 2. 检查特定错误类型
if errors.IsNotFound(err) {
// 资源不存在
} else if errors.IsConflict(err) {
// 冲突错误,可能需要重试
} else if err != nil {
// 其他错误
}
// 3. 使用重试机制
retry.OnError(retry.DefaultRetry, errors.IsConflict, func() error {
return updateResource()
})6.2 性能优化
go
// 1. 使用SharedInformerFactory共享Informer
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// 2. 使用ListOptions限制返回字段
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
LabelSelector: "app=myapp",
FieldSelector: "status.phase=Running",
})
// 3. 使用缓存而非直接查询API Server
podLister := factory.Core().V1().Pods().Lister()
pod, err := podLister.Pods(namespace).Get(name)七、总结
核心概念回顾
client-go开发
├── 基础客户端
│ ├── RESTClient(底层HTTP)
│ ├── ClientSet(类型化客户端)
│ └── DynamicClient(动态客户端)
│
├── Informer机制
│ ├── Reflector(ListWatch)
│ ├── DeltaFIFO(事件队列)
│ ├── Indexer(本地缓存)
│ └── EventHandler(事件处理)
│
├── WorkQueue
│ ├── 任务队列
│ ├── 去重机制
│ └── 退避重试
│
└── 实战应用
├── 自定义控制器
├── CLI工具开发
└── 性能优化💡 学习建议:
- 深入理解Informer机制,它是client-go的核心
- 使用WorkQueue处理异步任务,保证可靠性
- 善用本地缓存,减少对API Server的压力