您好,登錄后才能下訂單哦!
Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
關于ResoureQuota和ResourceController的介紹和使用請參見如下官方文檔。這是你理解這篇博客的基礎。
https://kubernetes.io/docs/admin/resourcequota/
https://kubernetes.io/docs/admin/resourcequota/walkthrough/
https://kubernetes.io/docs/user-guide/compute-resources/
https://kubernetes.io/docs/admin/admission-controllers/
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/admission_control_resource_quota.md
ResourceQuota Controller作為Kubernetes Controller Manager管理的眾多Controller中的一員,其主要的源碼位于目錄k8s.io/kubernetes/pkg/quota
和k8s.io/kubernetes/pkg/controller/resourcequota
,具體分析如下:
k8s.io/kubernetes/pkg/quota . ├── evaluator // 負責各種資源使用的統計 │ └── core │ ├── configmap.go // ConfigMapEvaluator的實現,負責ConfigMap資源的統計 │ ├── doc.go │ ├── persistent_volume_claims.go // PVCEvaluator的實現,負責PVC資源的統計 │ ├── persistent_volume_claims_test.go │ ├── pods.go //PodEvaluator的實現,負責Pod資源的統計 │ ├── pods_test.go │ ├── registry.go // 創建Registry時注冊所有的Evaluators │ ├── replication_controllers.go // RCEvaluator的實現,負責ReplicationController資源的統計 │ ├── resource_quotas.go // ResourceQuotaEvaluator的實現,負責ResourceQuota資源的統計 │ ├── secrets.go // SecretEvaluator的實現,負責Secret資源的統計 │ ├── services.go // ServiceEvaluator的實現,負責Service資源的統計 │ └── services_test.go ├── generic // genericEvaluator的定義和實現 │ ├── evaluator.go // 實現了genericEvaluator的接口,包括最重要的CalculateUsageStats接口 │ └── registry.go // 定義GenericRegistry ├── install │ └── registry.go // 定義了startResourceQuotaController時會調用創建ResourceQuota Registry的方法 ├── interfaces.go // 定義了Registry和Evaluator Interface ├── resources.go // 定義Resources的集合操作以及CalculateUsage方法 └── resources_test.go
k8s.io/kubernetes/pkg/controller/resourcequota . ├── doc.go ├── replenishment_controller.go // 定義replenishmentControllerFactory,用來創建replenishmentController ├── replenishment_controller_test.go ├── resource_quota_controller.go // 定義ResourceQuotaController及其Run方法,syncResourceQuota方法等,屬于核心文件。 └── resource_quota_controller_test.go
請下載到本地放大查看。
具體各個模塊的功能和交互請看下面的源碼分析。
上面的內部實現原理圖顯示,ResourceQuotaController是Kubenetes Controller Manager啟動進行初始化眾多Controllers的時候,通過調用startResourceQuotaController來完成ResourceQuotaController的啟動。
###從kube-controller-manager的startResourceQuotaController開始
cmd/kube-controller-manager/app/core.go:76 func startResourceQuotaController(ctx ControllerContext) (bool, error) { resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) // 定義ReplenishmentController需要監控的資源對象 groupKindsToReplenish := []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), api.Kind("Secret"), api.Kind("ConfigMap"), } ... go resourcequotacontroller.NewResourceQuotaController( resourceQuotaControllerOptions, ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) return true, nil }
startResourceQuotaController啟動一個goroutine,通過NewResourceQuotaController創建一個ResourceQuotaController并執行其Run方法開始提供ResourceQuotaController。
下面是ResourceQuotaController和ResourceQuotaControllerOptions結構體的定義。ResourceQuotaController中定義了幾個關鍵Entity,分別是rqController、queue、missingUsageQueue、registry、replenishmentControllers,在上一節中的原理圖中也能看到它們的身影。
###ResourceQuotaController定義
pkg/controller/resourcequota/resource_quota_controller.go:40 // ResourceQuotaControllerOptions holds options for creating a quota controller type ResourceQuotaControllerOptions struct { // Must have authority to list all quotas, and update quota status KubeClient clientset.Interface // Controls full recalculation of quota usage ResyncPeriod controller.ResyncPeriodFunc // Knows how to calculate usage Registry quota.Registry // Knows how to build controllers that notify replenishment events ControllerFactory ReplenishmentControllerFactory // Controls full resync of objects monitored for replenihsment. ReplenishmentResyncPeriod controller.ResyncPeriodFunc // List of GroupKind objects that should be monitored for replenishment at // a faster frequency than the quota controller recalculation interval GroupKindsToReplenish []schema.GroupKind } // ResourceQuotaController is responsible for tracking quota usage status in the system type ResourceQuotaController struct { // Must have authority to list all resources in the system, and update quota status kubeClient clientset.Interface // An index of resource quota objects by namespace rqIndexer cache.Indexer // Watches changes to all resource quota rqController *cache.Controller // ResourceQuota objects that need to be synchronized queue workqueue.RateLimitingInterface // missingUsageQueue holds objects that are missing the initial usage informatino missingUsageQueue workqueue.RateLimitingInterface // To allow injection of syncUsage for testing. syncHandler func(key string) error // function that controls full recalculation of quota usage resyncPeriod controller.ResyncPeriodFunc // knows how to calculate usage registry quota.Registry // controllers monitoring to notify for replenishment replenishmentControllers []cache.ControllerInterface }
接下來,我們看看startResourceQuotaController調用的NewRegistry、NewResourceQuotaController以及ResourceQuotaController的Run方法。
pkg/quota/evaluator/core/registry.go:29 // NewRegistry returns a registry that knows how to deal with core kubernetes resources // If an informer factory is provided, evaluators will use them. func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry { pod := NewPodEvaluator(kubeClient, f) service := NewServiceEvaluator(kubeClient) replicationController := NewReplicationControllerEvaluator(kubeClient) resourceQuota := NewResourceQuotaEvaluator(kubeClient) secret := NewSecretEvaluator(kubeClient) configMap := NewConfigMapEvaluator(kubeClient) persistentVolumeClaim := NewPersistentVolumeClaimEvaluator(kubeClient, f) return &generic.GenericRegistry{ InternalEvaluators: map[schema.GroupKind]quota.Evaluator{ pod.GroupKind(): pod, service.GroupKind(): service, replicationController.GroupKind(): replicationController, secret.GroupKind(): secret, configMap.GroupKind(): configMap, resourceQuota.GroupKind(): resourceQuota, persistentVolumeClaim.GroupKind(): persistentVolumeClaim, }, } }
可見,NewRegistry負責這些資源對象(pod,service,rc,secret,configMap,resourceQuota,PVC)的的Evaluator的創建和注冊,供后面Worker中執行quota.CalculateUsage(...)對這些資源對象進行使用統計。
NewRegistry執行完后,開始創建ResourceQuotaController,代碼如下。
pkg/controller/resourcequota/resource_quota_controller.go:78 func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { // build the resource quota controller rq := &ResourceQuotaController{ kubeClient: options.KubeClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"), missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"), resyncPeriod: options.ResyncPeriod, registry: options.Registry, replenishmentControllers: []cache.ControllerInterface{}, } ... // set the synchronization handler rq.syncHandler = rq.syncResourceQuotaFromKey // build the controller that observes quota rq.rqIndexer, rq.rqController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).Watch(options) }, }, &v1.ResourceQuota{}, rq.resyncPeriod(), cache.ResourceEventHandlerFuncs{ AddFunc: rq.addQuota, UpdateFunc: func(old, cur interface{}) { oldResourceQuota := old.(*v1.ResourceQuota) curResourceQuota := cur.(*v1.ResourceQuota) if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) { return } rq.addQuota(curResourceQuota) }, DeleteFunc: rq.enqueueResourceQuota, }, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}, ) for _, groupKindToReplenish := range options.GroupKindsToReplenish { controllerOptions := &ReplenishmentControllerOptions{ GroupKind: groupKindToReplenish, ResyncPeriod: options.ReplenishmentResyncPeriod, ReplenishmentFunc: rq.replenishQuota, } replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) if err != nil { glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err) } else { rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController) } } return rq }
NewResourceQuotaController負責創建ResourceQuotaController,包括queue, missingUsageQueue, syncHandler,rqIndexer, rqController,replenishmentControllers的Entity填充。重點關注 rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(...)
進行了rqController中注冊ResourceEventHandlerFuncs:addQuota和enqueueResourceQuota。另外, replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
負責replenishmentController的創建,NewRegistry中注冊了6種replenishmentSource,所以這里replenishmentControllers會添加對應的6中replenishmentController。
###ResourceQuotaController.Run
創建完ResourceQuotaController之后,就執行Run方法開始進行任務處理了。
pkg/controller/resourcequota/resource_quota_controller.go:227 // Run begins quota controller using the specified number of workers func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { ... // 啟動rqController和rq.replenishmentControllers中的6中replenishmentController,開始watch對應的ResourceQuota加入到queue和missingUsageQueue。 go rq.rqController.Run(stopCh) // the controllers that replenish other resources to respond rapidly to state changes for _, replenishmentController := range rq.replenishmentControllers { go replenishmentController.Run(stopCh) } // 啟動workers數量的worker協程,分別對queue和missingUsageQueue中的Item。 for i := 0; i < workers; i++ { go wait.Until(rq.worker(rq.queue), time.Second, stopCh) go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh) } // 定期的進行全量的quotas計算。 go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) <-stopCh glog.Infof("Shutting down ResourceQuotaController") rq.queue.ShutDown() }
接下來的主要處理都交給了workers進行處理了,默認配置是有5個worker對queue進行處理,有5個worker對missingUsageQuota進行處理。下面來看看worker是怎么對Queue中的Item進行處理的。
pkg/controller/resourcequota/resource_quota_controller.go:199 // worker runs a worker thread that just dequeues items, processes them, and marks them done. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() { workFunc := func() bool { // 從queue中獲取Key key, quit := queue.Get() if quit { return true } defer queue.Done(key) // 執行NewResourceQuotaController時注冊的syncHandler(流程跳轉到syncResourceQuotaFromKey) err := rq.syncHandler(key.(string)) ... } return func() { for { if quit := workFunc(); quit { glog.Infof("resource quota controller worker shutting down") return } } } }
流程進入到syncResourceQuotaFromKey,下面看看它的實現:
pkg/controller/resourcequota/resource_quota_controller.go:247 // syncResourceQuotaFromKey syncs a quota key func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) { ... obj, exists, err := rq.rqIndexer.GetByKey(key) ... quota := *obj.(*v1.ResourceQuota) return rq.syncResourceQuota(quota) }
syncResourceQuotaFromKey根據從queue中獲得的key,從rqIndexer中得到該Object,然后執行rq.syncResourceQuota(quota)。
pkg/controller/resourcequota/resource_quota_controller.go:268 // syncResourceQuota runs a complete sync of resource quota status across all known kinds func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) { ... newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry) ... // ensure set of used values match those that have hard constraints hardResources := quota.ResourceNames(hardLimits) used = quota.Mask(used, hardResources) usage := api.ResourceQuota{ ObjectMeta: api.ObjectMeta{ Name: resourceQuota.Name, Namespace: resourceQuota.Namespace, ResourceVersion: resourceQuota.ResourceVersion, Labels: resourceQuota.Labels, Annotations: resourceQuota.Annotations}, Status: api.ResourceQuotaStatus{ Hard: hardLimits, Used: used, }, } dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used) // there was a change observed by this controller that requires we update quota if dirty { v1Usage := &v1.ResourceQuota{} if err := v1.Convert_api_ResourceQuota_To_v1_ResourceQuota(&usage, v1Usage, nil); err != nil { return err } _, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(v1Usage) return err } return nil }
syncResourceQuota中最關鍵的操作是: newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
quota.CalculateUsage根據namespace, quota的Scope,hardLimits,registry對該Item(resourceQuota)進行CalculateUsage。
pkg/quota/resources.go:217 // CalculateUsage calculates and returns the requested ResourceList usage func CalculateUsage(namespaceName string, scopes []api.ResourceQuotaScope, hardLimits api.ResourceList, registry Registry) (api.ResourceList, error) { // find the intersection between the hard resources on the quota // and the resources this controller can track to know what we can // look to measure updated usage stats for hardResources := ResourceNames(hardLimits) potentialResources := []api.ResourceName{} evaluators := registry.Evaluators() for _, evaluator := range evaluators { potentialResources = append(potentialResources, evaluator.MatchingResources(hardResources)...) } // NOTE: the intersection just removes duplicates since the evaluator match intersects wtih hard matchedResources := Intersection(hardResources, potentialResources) // sum the observed usage from each evaluator newUsage := api.ResourceList{} for _, evaluator := range evaluators { // only trigger the evaluator if it matches a resource in the quota, otherwise, skip calculating anything intersection := evaluator.MatchingResources(matchedResources) if len(intersection) == 0 { continue } usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection} stats, err := evaluator.UsageStats(usageStatsOptions) if err != nil { return nil, err } newUsage = Add(newUsage, stats.Used) } // mask the observed usage to only the set of resources tracked by this quota // merge our observed usage with the quota usage status // if the new usage is different than the last usage, we will need to do an update newUsage = Mask(newUsage, matchedResources) return newUsage, nil }
CalculateUsage中最重要的一步是循環registry中注冊的所有Evaluators,執行對應Evaluator的UsageStats方法進資源使用統計。看到這里,你也許懵逼了,Evaluators又是個什么東西?
我們先來看看Registry和Evaluator的關系,以及Evaluator的定義。
pkg/quota/interfaces.go:62 // Registry holds the list of evaluators associated to a particular group kind type Registry interface { // Evaluators returns the set Evaluator objects registered to a groupKind Evaluators() map[schema.GroupKind]Evaluator } pkg/quota/interfaces.go:43 // Evaluator knows how to evaluate quota usage for a particular group kind type Evaluator interface { // Constraints ensures that each required resource is present on item Constraints(required []api.ResourceName, item runtime.Object) error // GroupKind returns the groupKind that this object knows how to evaluate GroupKind() schema.GroupKind // Handles determines if quota could be impacted by the specified operation. // If true, admission control must perform quota processing for the operation, otherwise it is safe to ignore quota. Handles(operation admission.Operation) bool // Matches returns true if the specified quota matches the input item Matches(resourceQuota *api.ResourceQuota, item runtime.Object) (bool, error) // MatchingResources takes the input specified list of resources and returns the set of resources evaluator matches. MatchingResources(input []api.ResourceName) []api.ResourceName // Usage returns the resource usage for the specified object Usage(item runtime.Object) (api.ResourceList, error) // UsageStats calculates latest observed usage stats for all objects UsageStats(options UsageStatsOptions) (UsageStats, error) }
可見Evaluator就是一系列操作的集合,是一個Interface,而Registry就是資源類型到Evaluator的一個Map。
Kubernetes中定義了7種資源的Evaluator,都在pkg/quota/evaluator/core/*
目錄下,比如pods.go
就是PodEvaluator的實現,里面實現了關鍵的UsageStats方法。除了PodEvaluator之外,其他的Evaluator的UsageStats實現,都是genericEvaluator來完成的,其代碼在pkg/quota/generic/evaluator.go:177
。
具體Evaluator的代碼分析,請讀者自行完成。
下面我給出Worker的內部流程圖,供大家參考:
###ReplenishmentController
rqController負責watch待sync的ResourceQuota,并將其加入到queue和missingUsageQueue中,而上面分析NewResourceQuotaController代碼時提到: replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
負責replenishmentController的創建,那replenishmentController又是啥呢?我們有必要來看看replenishmentController的創建。
pkg/controller/resourcequota/replenishment_controller.go:131 func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) { ... switch options.GroupKind { case api.Kind("Pod"): if r.sharedInformerFactory != nil { result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{ UpdateFunc: PodReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }) break } result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod()) case api.Kind("Service"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().Services(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().Services(v1.NamespaceAll).Watch(options) }, }, &v1.Service{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ UpdateFunc: ServiceReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("ReplicationController"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) }, }, &v1.ReplicationController{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("PersistentVolumeClaim"): if r.sharedInformerFactory != nil { result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }) break } // TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths... _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).Watch(options) }, }, &v1.PersistentVolumeClaim{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("Secret"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().Secrets(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().Secrets(v1.NamespaceAll).Watch(options) }, }, &v1.Secret{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("ConfigMap"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).Watch(options) }, }, &v1.ConfigMap{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) default: return nil, NewUnhandledGroupKindError(options.GroupKind) } return result, err }
整個代碼結構非常清晰,就是根據不同的資源類型,返回對應的Controller。而每種資源的Controller的定義都是通過創建一個對應的Informer完成。Informer中注冊對應的ResourceEventHandlerFuncs:UpdateFunc和DeleteFunc用來出watch的對象發生對應的change時需要調用的方法。
以Pod為例,看看Pod注冊的UpdateFunc:PodReplenishmentUpdateFunc和DeleteFunc:ObjectReplenishmentDeleteFunc,你就知道replenishmentController是用來干啥的了。
pkg/controller/resourcequota/replenishment_controller.go:56 // PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { oldPod := oldObj.(*v1.Pod) newPod := newObj.(*v1.Pod) if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) { options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod) } } } // ObjectReplenenishmentDeleteFunc will replenish on every delete func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) { return func(obj interface{}) { metaObject, err := meta.Accessor(obj) if err != nil { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod()) utilruntime.HandleError(err) return } metaObject, err = meta.Accessor(tombstone.Obj) if err != nil { glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod()) utilruntime.HandleError(err) return } } options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil) } }
在NewResourceQuotaController中創建replenishmentController時,已經指定了對應的ReplenishmentFunc為rq.replenishQuota,PodReplenishmentUpdateFunc和ObjectReplenishmentDeleteFunc最終都是調用ReplenishmentFunc(rq.replenishQuota)來進行quota recalculated。
pkg/controller/resourcequota/resource_quota_controller.go:330 // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) { ... for i := range resourceQuotas { resourceQuota := resourceQuotas[i].(*v1.ResourceQuota) internalResourceQuota := &api.ResourceQuota{} if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil { glog.Error(err) continue } resourceQuotaResources := quota.ResourceNames(internalResourceQuota.Status.Hard) if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 { // 將該resourceQuota加入到隊列queue rq.enqueueResourceQuota(resourceQuota) } } }
因此replenishmentController就是用來捕獲對應資源的Update/Delete事件,將其對應的ResourceQuota加入到queue
中,然后worker再對其進行重新計算Usage。
Kubernetes Controller Manager在初始化Controllers時執行startResourceQuotaController啟動創建ResourceQuotaController并執行其Run方法來啟動ResourceQuotaController。
ResourceQuotaController中包括兩個隊列:
queue:用來存放待sync和recalculate的ResourceQuota
missingUsageQueue:用來存放那些丟失Usage信息的ResourceQuota
ResourceQuotaController中有兩種Controller:
rqController:通過List/Watch對應的資源及變化,根據情況,將ResourceQuota加入到queue和missingUsageQueue。
replenishmentControllers:通過監控資源的Update/Delete操作,將ResourceQuota加入到queue。
ResourceQuotaController中存在一個Registry對象,用來存放各種資源的Evaluator,包括:
PodEvaluator
ConfigMapEvaluator
PersistentVolumeClaimEvaluator
ResourceQuotaEvaluator
ReplicationControllerEvaluator
ServiceEvaluator
SecretEvaluator
ResourceQuotaController中的replenishmentControllers包含以下replenishmentController:
PodReplenishController
ConfigMapReplenishController
PersistentVolumeClaimReplenishController
ReplicationControllerReplenishController
ServiceReplenishController
SecretReplenishController
ResourceQuotaController中默認存在5個worker對queue中的ResourceQuota Item進行處理。可通過kube-controller-manager的--concurrent-resource-quota-syncs
配置。
ResourceQuotaController中默認存在5個worker對missingUsageQueue中的ResourceQuota Item進行處理。可通過kube-controller-manager的--concurrent-resource-quota-syncs
配置。
ResourceQuotaController默認5min會做一次全量的quota usage同步。可通過kube-controller-manager的--resource-quota-sync-period
關于Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。