有效的网络推广/乐山网站seo
参考网址:
Introduction - The Kubebuilder Book
深入解析 Kubebuilder:让编写 CRD 变得更简单 - 知乎
导读
推荐朋友们多看上面官方提供的文档。
前面我转发一篇博客 基于Kubebuilder开发Operator(入门使用)_chenxy02的博客-CSDN博客 记录了对kubebuilder的入门使用,本文旨在加深对kubebuilder深圳,进一步熟悉kubebuilder的项目代码。
核心概念
GVKs & GVRs
GVK = GroupVersionKind,GVR = GroupVersionResource
- 在编码过程中,资源数据的存储都是以结构体存储(称为Go type)
- 由于多版本version的存在(alpha1, beta1, v1等),不同版本中存储结构体的存在着差异,但是我们都会给其相同的Kind名字(比如Deployment)
- 因此,我们编码中只用kind名(如Deployment), 并不能准确获取到其使用哪个版本结构体
- 所以,采用GVK获取到一个具体的存储结构体,也就是GVK的三个信息(group/version/kind)确定一个Go type(结构体)
如何获取呢? —— 通过Scheme,Scheme存储了GVK和Go Type的映射关系
- 在创建资源过程中,我们编写yaml,提交请求:
- 编写yaml过程中,我们会写apiversion和kind,其实就是GVK
- 而客户端(也就是我们)与apiserver通信是http形式,就是将请求发送到某一http path
发送到哪个http path呢?——这个http path其实就是GVR
- /apis/batch/v1/nampspaces/default/job 这个就是表示default 命名空间的job资源
- 我们 kubectl get po 时 也是请求的路径,也可以称为GVR
- 其实 GVR 是由 GVK 转化而来 —— 通过REST映射的RESTMappers实现
Scheme
每一组Controllers都需要一个Scheme,提供了Kinds与对应的Go types的映射,也就是说给定Go type就知道他的GVK,给定GVK就知道他的Go type
Manager
Kubebuilder 的核心组件,具有 3 个职责:
-
负责运行所有的Controllers;
-
初始化共享cashes,包含listAndWath功能
-
初始化clients用于与Api Server通信
Cache
Kubebuilder的核心组件,负责在Controller进程里面根据Scheme同步Api Server中所有该Controller关心GVKs的GVRs,其核心是GVK->Informer的映射,Informer会负责监听对应GVK的GVRs的创建/删除/更新操作,以触发Controller的Reconcile逻辑。
Controller
Kubebuilder为我们生成的脚手架文件,我们只需要实现Reconcile方法即可。
Clients
在实现Controller的时候不可避免地需要对某些资源类型进行创建/删除/更新,就是通过该Clients实现的,其中查询功能实际查询是本地的Cache,写操作直接访问Api Server
Index
由于Controller经常要对Cache进行查询,Kubebuilder提供Index utility给Cache加索引提升查询效率。
Finalizer
在一般情况下,如果资源被删除之后,我们虽然能够触发删除事件,但是这个时候从Cache里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行。
K8s的Finalizer字段用于处理这种情况。在K8s中,只要对象ObjectMeta里面的Finalizers不为空,对该对象的delete操作就会转变为update操作,具体说就是update deletionTimestamp字段,其意义就是告诉K8s的GC“在deletionTimestamp 这个时刻之后,只要Finalizer为空,就立马删除掉该对象“。
所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。
OwnerReference
k8s GC在删除一个对象时,任何ownerReference是该对象的对象都会被清除,与此同时,kubebuilder支持所有对象的变更 都会触发Owner对象controller的Reconcile方法。
所有概念集合在一起如下图所示:
源码阅读
下面代码来源于 基于Kubebuilder开发Operator(入门使用)_chenxy02的博客-CSDN博客
从main.go开始
Kubebuilder 创建的 main.go 是整个项目的入口,逻辑十分简单:
var (scheme = runtime.NewScheme()setupLog = ctrl.Log.WithName("setup")
)func init() {utilruntime.Must(clientgoscheme.AddToScheme(scheme))utilruntime.Must(webappv1.AddToScheme(scheme))//+kubebuilder:scaffold:scheme
}func main() {... // 1、Managermgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme,MetricsBindAddress: metricsAddr,Port: 9443,HealthProbeBindAddress: probeAddr,LeaderElection: enableLeaderElection,LeaderElectionID: "ecaf1259.my.domain",})...// 2、init Reconciler(Controller)if err = (&controllers.GuestbookReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "Guestbook")os.Exit(1)}... // 3、start ManagersetupLog.Info("starting manager")if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)}
}
可以看到在 init方法里面我们将webappv1注册到Scheme里面去了,这样一来Cache就知道watch谁了,main方法里面的逻辑基本都是Manager的:
- 初始化了一个Manager;
- 将Manager 的 Client 传给 Controller,并且调用 SetupWithManager 方法传入 Manager 进行 Controller 的初始化;
- 启动Manager
Manager初始化
Manager初始化代码如下:
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {// Set default values for options fieldsoptions = setOptionsDefaults(options)cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {clusterOptions.Scheme = options.SchemeclusterOptions.MapperProvider = options.MapperProviderclusterOptions.Logger = options.LoggerclusterOptions.SyncPeriod = options.SyncPeriodclusterOptions.Namespace = options.NamespaceclusterOptions.NewCache = options.NewCacheclusterOptions.ClientBuilder = options.ClientBuilderclusterOptions.ClientDisableCacheFor = options.ClientDisableCacheForclusterOptions.DryRunClient = options.DryRunClientclusterOptions.EventBroadcaster = options.EventBroadcaster})... return &controllerManager{cluster: cluster,recorderProvider: recorderProvider,resourceLock: resourceLock,metricsListener: metricsListener,metricsExtraHandlers: metricsExtraHandlers,logger: options.Logger,elected: make(chan struct{}),port: options.Port,host: options.Host,certDir: options.CertDir,leaseDuration: *options.LeaseDuration,renewDeadline: *options.RenewDeadline,retryPeriod: *options.RetryPeriod,healthProbeListener: healthProbeListener,readinessEndpointName: options.ReadinessEndpointName,livenessEndpointName: options.LivenessEndpointName,gracefulShutdownTimeout: *options.GracefulShutdownTimeout,internalProceduresStop: make(chan struct{}),leaderElectionStopped: make(chan struct{}),}, nil
}
可以看到 主要是创建Cache与Clients等等等实例:
创建Cache
Cache 初始化代码如下:
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {opts, err := defaultOpts(config, opts)if err != nil {return nil, err}im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)return &informerCache{InformersMap: im}, nil
}// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,scheme *runtime.Scheme,mapper meta.RESTMapper,resync time.Duration,namespace string) *InformersMap {return &InformersMap{structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),Scheme: scheme,}
}
可以看到Cache主要就是创建了InformersMap, Scheme 里面的每个 GVK 都创建了对应的 Informer,通过 informersByGVK 这个 map 做 GVK 到 Informer 的映射,每个Informer会根据ListWatch函数对对应的GVK进行List和Watch。
创建 Clients
创建 Clients 很简单:
// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {// Create the Client for Write operations.c, err := client.New(config, options)if err != nil {return nil, err}return &client.DelegatingClient{Reader: &client.DelegatingReader{CacheReader: cache,ClientReader: c,},Writer: c,StatusClient: c,}, nil
}
读操作使用上面创建的 Cache,写操作使用 K8s go-client 直连。
Controller初始化
下面看看Controller的启动:
// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {return ctrl.NewControllerManagedBy(mgr).For(&webappv1.Guestbook{}).Complete(r)
}
使用的中Builder模式,NewControllerManagerBy和For方法都给Builder传参,最重要的是最后一个方法Complete,其逻辑是:
func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...// Set the Managerif err := blder.doManager(); err != nil {return nil, err}// Set the ControllerManagedByif err := blder.doController(r); err != nil {return nil, err}// Set the Watchif err := blder.doWatch(); err != nil {return nil, err}
...return blder.mgr, nil
}
主要是看看doController和doWatch方法:
doController方法
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {...// Inject dependencies into Reconcilerif err := mgr.SetFields(options.Reconciler); err != nil {return nil, err}// Create controller with dependencies setreturn &controller.Controller{Do: options.Reconciler,MakeQueue: func() workqueue.RateLimitingInterface {return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)},MaxConcurrentReconciles: options.MaxConcurrentReconciles,CacheSyncTimeout: options.CacheSyncTimeout,SetFields: mgr.SetFields,Name: name,Log: options.Log.WithName("controller").WithName(name),}, nil
}
该方法初始化了一个Controller,传入了一些很重要的参数:
- Do: Reconcile 逻辑;
- Cache:找Informer注册Watch
- Queue:Watch资源的CUD事件缓存
doWatch方法
func (blder *Builder) doWatch() error {// Reconcile typetypeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)if err != nil {return err}src := &source.Kind{Type: typeForSrc}hdler := &handler.EnqueueRequestForObject{}allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}// Watches the managed typesfor _, own := range blder.ownsInput {typeForSrc, err := blder.project(own.object, own.objectProjection)if err != nil {return err}src := &source.Kind{Type: typeForSrc}hdler := &handler.EnqueueRequestForOwner{OwnerType: blder.forInput.object,IsController: true,}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, own.predicates...)if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}// Do the watch requestsfor _, w := range blder.watchesInput {allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, w.predicates...)// If the source of this watch is of type *source.Kind, project it.if srckind, ok := w.src.(*source.Kind); ok {typeForSrc, err := blder.project(srckind.Type, w.objectProjection)if err != nil {return err}srckind.Type = typeForSrc}if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {return err}}return nil
}
可以看到该方法对本Controller负责的CRD进行了watch,同时底下还会watch本CRD管理的其它资源,这个managedObjects可以通过Controller初始化Builder的Owns方法传入,说到Watch我们关心两个逻辑:
1、注册的handler
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {...q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Meta.GetName(),Namespace: evt.Meta.GetNamespace(),}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {if evt.MetaOld != nil {q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.MetaOld.GetName(),Namespace: evt.MetaOld.GetNamespace(),}})} else {enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)}if evt.MetaNew != nil {q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.MetaNew.GetName(),Namespace: evt.MetaNew.GetNamespace(),}})} else {enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {...q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: evt.Meta.GetName(),Namespace: evt.Meta.GetNamespace(),}})
}
可以看到Kubebuilder为注册的Handler就是将发生变更的对象的NamespacedName入队列,如果在Reconcile逻辑需要判断创建/更新/删除,需要有自己的判断逻辑。
2、注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {...log.Info("Starting EventSource", "controller", c.Name, "source", src)return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,...is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})return nil
}
我们的Handler实际注册到Informer上面,这样整个逻辑就串起来了,通过Cache我们创建了所有Scheme里面GVKs的Informers,然后对应GVK的Controller注册了Watch Handler到对应的Informer,这样一来对应的GVK里面的资源有变更都会触发Handler,将变更事件写到Controller的事件队列中,之后触发我们的Reconcile方法。