中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的

發布時間:2021-11-15 23:32:07 來源:億速云 閱讀:229 作者:柒染 欄目:云計算

Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

ResoureQuota介紹

關于ResoureQuota和ResourceController的介紹和使用請參見如下官方文檔。這是你理解這篇博客的基礎。

  • https://kubernetes.io/docs/admin/resourcequota/

  • https://kubernetes.io/docs/admin/resourcequota/walkthrough/

  • https://kubernetes.io/docs/user-guide/compute-resources/

  • https://kubernetes.io/docs/admin/admission-controllers/

  • https://github.com/kubernetes/community/blob/master/contributors/design-proposals/admission_control_resource_quota.md

ResourceQuota Controller源碼目錄結構分析

ResourceQuota Controller作為Kubernetes Controller Manager管理的眾多Controller中的一員,其主要的源碼位于目錄k8s.io/kubernetes/pkg/quotak8s.io/kubernetes/pkg/controller/resourcequota,具體分析如下:

k8s.io/kubernetes/pkg/quota
.
├── evaluator    // 負責各種資源使用的統計
│   └── core
│       ├── configmap.go   // ConfigMapEvaluator的實現,負責ConfigMap資源的統計
│       ├── doc.go
│       ├── persistent_volume_claims.go    // PVCEvaluator的實現,負責PVC資源的統計
│       ├── persistent_volume_claims_test.go
│       ├── pods.go    //PodEvaluator的實現,負責Pod資源的統計
│       ├── pods_test.go
│       ├── registry.go    // 創建Registry時注冊所有的Evaluators
│       ├── replication_controllers.go    // RCEvaluator的實現,負責ReplicationController資源的統計
│       ├── resource_quotas.go    // ResourceQuotaEvaluator的實現,負責ResourceQuota資源的統計
│       ├── secrets.go    // SecretEvaluator的實現,負責Secret資源的統計
│       ├── services.go    // ServiceEvaluator的實現,負責Service資源的統計
│       └── services_test.go
├── generic    // genericEvaluator的定義和實現
│   ├── evaluator.go    // 實現了genericEvaluator的接口,包括最重要的CalculateUsageStats接口
│   └── registry.go    // 定義GenericRegistry
├── install
│   └── registry.go    // 定義了startResourceQuotaController時會調用創建ResourceQuota Registry的方法
├── interfaces.go    // 定義了Registry和Evaluator Interface
├── resources.go    // 定義Resources的集合操作以及CalculateUsage方法
└── resources_test.go
k8s.io/kubernetes/pkg/controller/resourcequota
.
├── doc.go
├── replenishment_controller.go    // 定義replenishmentControllerFactory,用來創建replenishmentController
├── replenishment_controller_test.go
├── resource_quota_controller.go    // 定義ResourceQuotaController及其Run方法,syncResourceQuota方法等,屬于核心文件。
└── resource_quota_controller_test.go

ResourceQuota Controller內部實現原理圖

請下載到本地放大查看。

Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的

具體各個模塊的功能和交互請看下面的源碼分析。

ResourceQuota Controller源碼分析

上面的內部實現原理圖顯示,ResourceQuotaController是Kubenetes Controller Manager啟動進行初始化眾多Controllers的時候,通過調用startResourceQuotaController來完成ResourceQuotaController的啟動。

###從kube-controller-manager的startResourceQuotaController開始

cmd/kube-controller-manager/app/core.go:76

func startResourceQuotaController(ctx ControllerContext) (bool, error) {
	resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
	resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)
	
	// 定義ReplenishmentController需要監控的資源對象
	groupKindsToReplenish := []schema.GroupKind{
		api.Kind("Pod"),
		api.Kind("Service"),
		api.Kind("ReplicationController"),
		api.Kind("PersistentVolumeClaim"),
		api.Kind("Secret"),
		api.Kind("ConfigMap"),
	}
	
	...
	
	go resourcequotacontroller.NewResourceQuotaController(
		resourceQuotaControllerOptions,
	).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)
	return true, nil
}

startResourceQuotaController啟動一個goroutine,通過NewResourceQuotaController創建一個ResourceQuotaController并執行其Run方法開始提供ResourceQuotaController。

下面是ResourceQuotaController和ResourceQuotaControllerOptions結構體的定義。ResourceQuotaController中定義了幾個關鍵Entity,分別是rqController、queue、missingUsageQueue、registry、replenishmentControllers,在上一節中的原理圖中也能看到它們的身影。

###ResourceQuotaController定義

pkg/controller/resourcequota/resource_quota_controller.go:40

// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
	// Must have authority to list all quotas, and update quota status
	KubeClient clientset.Interface
	// Controls full recalculation of quota usage
	ResyncPeriod controller.ResyncPeriodFunc
	// Knows how to calculate usage
	Registry quota.Registry
	// Knows how to build controllers that notify replenishment events
	ControllerFactory ReplenishmentControllerFactory
	// Controls full resync of objects monitored for replenihsment.
	ReplenishmentResyncPeriod controller.ResyncPeriodFunc
	// List of GroupKind objects that should be monitored for replenishment at
	// a faster frequency than the quota controller recalculation interval
	GroupKindsToReplenish []schema.GroupKind
}

// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
	// Must have authority to list all resources in the system, and update quota status
	kubeClient clientset.Interface
	// An index of resource quota objects by namespace
	rqIndexer cache.Indexer
	// Watches changes to all resource quota
	rqController *cache.Controller
	// ResourceQuota objects that need to be synchronized
	queue workqueue.RateLimitingInterface
	// missingUsageQueue holds objects that are missing the initial usage informatino
	missingUsageQueue workqueue.RateLimitingInterface
	// To allow injection of syncUsage for testing.
	syncHandler func(key string) error
	// function that controls full recalculation of quota usage
	resyncPeriod controller.ResyncPeriodFunc
	// knows how to calculate usage
	registry quota.Registry
	// controllers monitoring to notify for replenishment
	replenishmentControllers []cache.ControllerInterface
}

NewRegistry

接下來,我們看看startResourceQuotaController調用的NewRegistry、NewResourceQuotaController以及ResourceQuotaController的Run方法。

pkg/quota/evaluator/core/registry.go:29

// NewRegistry returns a registry that knows how to deal with core kubernetes resources
// If an informer factory is provided, evaluators will use them.
func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry {
	pod := NewPodEvaluator(kubeClient, f)
	service := NewServiceEvaluator(kubeClient)
	replicationController := NewReplicationControllerEvaluator(kubeClient)
	resourceQuota := NewResourceQuotaEvaluator(kubeClient)
	secret := NewSecretEvaluator(kubeClient)
	configMap := NewConfigMapEvaluator(kubeClient)
	persistentVolumeClaim := NewPersistentVolumeClaimEvaluator(kubeClient, f)
	return &generic.GenericRegistry{
		InternalEvaluators: map[schema.GroupKind]quota.Evaluator{
			pod.GroupKind():                   pod,
			service.GroupKind():               service,
			replicationController.GroupKind(): replicationController,
			secret.GroupKind():                secret,
			configMap.GroupKind():             configMap,
			resourceQuota.GroupKind():         resourceQuota,
			persistentVolumeClaim.GroupKind(): persistentVolumeClaim,
		},
	}
}

可見,NewRegistry負責這些資源對象(pod,service,rc,secret,configMap,resourceQuota,PVC)的的Evaluator的創建和注冊,供后面Worker中執行quota.CalculateUsage(...)對這些資源對象進行使用統計。

NewResourceQuotaController

NewRegistry執行完后,開始創建ResourceQuotaController,代碼如下。

pkg/controller/resourcequota/resource_quota_controller.go:78

func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
	// build the resource quota controller
	rq := &ResourceQuotaController{
		kubeClient:               options.KubeClient,
		queue:                    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
		missingUsageQueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
		resyncPeriod:             options.ResyncPeriod,
		registry:                 options.Registry,
		replenishmentControllers: []cache.ControllerInterface{},
	}
	
	...
	
	// set the synchronization handler
	rq.syncHandler = rq.syncResourceQuotaFromKey

	// build the controller that observes quota
	rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).List(options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.ResourceQuota{},
		rq.resyncPeriod(),
		cache.ResourceEventHandlerFuncs{
			AddFunc: rq.addQuota,
			UpdateFunc: func(old, cur interface{}) {
				oldResourceQuota := old.(*v1.ResourceQuota)
				curResourceQuota := cur.(*v1.ResourceQuota)
				if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
					return
				}
				rq.addQuota(curResourceQuota)
			},
			DeleteFunc: rq.enqueueResourceQuota,
		},
		cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
	)

	for _, groupKindToReplenish := range options.GroupKindsToReplenish {
		controllerOptions := &ReplenishmentControllerOptions{
			GroupKind:         groupKindToReplenish,
			ResyncPeriod:      options.ReplenishmentResyncPeriod,
			ReplenishmentFunc: rq.replenishQuota,
		}
		replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
		if err != nil {
			glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
		} else {
			rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
		}
	}
	return rq
}

NewResourceQuotaController負責創建ResourceQuotaController,包括queue, missingUsageQueue, syncHandler,rqIndexer, rqController,replenishmentControllers的Entity填充。重點關注 rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(...) 進行了rqController中注冊ResourceEventHandlerFuncs:addQuota和enqueueResourceQuota。另外, replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) 負責replenishmentController的創建,NewRegistry中注冊了6種replenishmentSource,所以這里replenishmentControllers會添加對應的6中replenishmentController。

###ResourceQuotaController.Run

創建完ResourceQuotaController之后,就執行Run方法開始進行任務處理了。

pkg/controller/resourcequota/resource_quota_controller.go:227

// Run begins quota controller using the specified number of workers
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
	...
	// 啟動rqController和rq.replenishmentControllers中的6中replenishmentController,開始watch對應的ResourceQuota加入到queue和missingUsageQueue。
	go rq.rqController.Run(stopCh)
	// the controllers that replenish other resources to respond rapidly to state changes
	for _, replenishmentController := range rq.replenishmentControllers {
		go replenishmentController.Run(stopCh)
	}
	
	// 啟動workers數量的worker協程,分別對queue和missingUsageQueue中的Item。
	for i := 0; i < workers; i++ {
		go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
		go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
	}
	
	// 定期的進行全量的quotas計算。
	go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
	<-stopCh
	glog.Infof("Shutting down ResourceQuotaController")
	rq.queue.ShutDown()
}

Worker

接下來的主要處理都交給了workers進行處理了,默認配置是有5個worker對queue進行處理,有5個worker對missingUsageQuota進行處理。下面來看看worker是怎么對Queue中的Item進行處理的。

pkg/controller/resourcequota/resource_quota_controller.go:199

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
	workFunc := func() bool {
		// 從queue中獲取Key
		key, quit := queue.Get()
		if quit {
			return true
		}
		defer queue.Done(key)
		
		// 執行NewResourceQuotaController時注冊的syncHandler(流程跳轉到syncResourceQuotaFromKey) 
		err := rq.syncHandler(key.(string))
		...
	}

	return func() {
		for {
			if quit := workFunc(); quit {
				glog.Infof("resource quota controller worker shutting down")
				return
			}
		}
	}
}

流程進入到syncResourceQuotaFromKey,下面看看它的實現:

pkg/controller/resourcequota/resource_quota_controller.go:247

// syncResourceQuotaFromKey syncs a quota key
func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
	...
	obj, exists, err := rq.rqIndexer.GetByKey(key)
	...
	quota := *obj.(*v1.ResourceQuota)
	return rq.syncResourceQuota(quota)
}

syncResourceQuotaFromKey根據從queue中獲得的key,從rqIndexer中得到該Object,然后執行rq.syncResourceQuota(quota)。

pkg/controller/resourcequota/resource_quota_controller.go:268

// syncResourceQuota runs a complete sync of resource quota status across all known kinds
func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) {
	...
	newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
	...

	// ensure set of used values match those that have hard constraints
	hardResources := quota.ResourceNames(hardLimits)
	used = quota.Mask(used, hardResources)
	usage := api.ResourceQuota{
		ObjectMeta: api.ObjectMeta{
			Name:            resourceQuota.Name,
			Namespace:       resourceQuota.Namespace,
			ResourceVersion: resourceQuota.ResourceVersion,
			Labels:          resourceQuota.Labels,
			Annotations:     resourceQuota.Annotations},
		Status: api.ResourceQuotaStatus{
			Hard: hardLimits,
			Used: used,
		},
	}

	dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)

	// there was a change observed by this controller that requires we update quota
	if dirty {
		v1Usage := &v1.ResourceQuota{}
		if err := v1.Convert_api_ResourceQuota_To_v1_ResourceQuota(&usage, v1Usage, nil); err != nil {
			return err
		}
		_, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(v1Usage)
		return err
	}
	return nil
}

syncResourceQuota中最關鍵的操作是: newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry) quota.CalculateUsage根據namespace, quota的Scope,hardLimits,registry對該Item(resourceQuota)進行CalculateUsage。

pkg/quota/resources.go:217

// CalculateUsage calculates and returns the requested ResourceList usage
func CalculateUsage(namespaceName string, scopes []api.ResourceQuotaScope, hardLimits api.ResourceList, registry Registry) (api.ResourceList, error) {
	// find the intersection between the hard resources on the quota
	// and the resources this controller can track to know what we can
	// look to measure updated usage stats for
	hardResources := ResourceNames(hardLimits)
	potentialResources := []api.ResourceName{}
	evaluators := registry.Evaluators()
	for _, evaluator := range evaluators {
		potentialResources = append(potentialResources, evaluator.MatchingResources(hardResources)...)
	}
	// NOTE: the intersection just removes duplicates since the evaluator match intersects wtih hard
	matchedResources := Intersection(hardResources, potentialResources)

	// sum the observed usage from each evaluator
	newUsage := api.ResourceList{}
	for _, evaluator := range evaluators {
		// only trigger the evaluator if it matches a resource in the quota, otherwise, skip calculating anything
		intersection := evaluator.MatchingResources(matchedResources)
		if len(intersection) == 0 {
			continue
		}

		usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection}
		stats, err := evaluator.UsageStats(usageStatsOptions)
		if err != nil {
			return nil, err
		}
		newUsage = Add(newUsage, stats.Used)
	}

	// mask the observed usage to only the set of resources tracked by this quota
	// merge our observed usage with the quota usage status
	// if the new usage is different than the last usage, we will need to do an update
	newUsage = Mask(newUsage, matchedResources)
	return newUsage, nil
}

CalculateUsage中最重要的一步是循環registry中注冊的所有Evaluators,執行對應Evaluator的UsageStats方法進資源使用統計。看到這里,你也許懵逼了,Evaluators又是個什么東西?

我們先來看看Registry和Evaluator的關系,以及Evaluator的定義。

pkg/quota/interfaces.go:62

// Registry holds the list of evaluators associated to a particular group kind
type Registry interface {
	// Evaluators returns the set Evaluator objects registered to a groupKind
	Evaluators() map[schema.GroupKind]Evaluator
}


pkg/quota/interfaces.go:43

// Evaluator knows how to evaluate quota usage for a particular group kind
type Evaluator interface {
	// Constraints ensures that each required resource is present on item
	Constraints(required []api.ResourceName, item runtime.Object) error
	// GroupKind returns the groupKind that this object knows how to evaluate
	GroupKind() schema.GroupKind
	// Handles determines if quota could be impacted by the specified operation.
	// If true, admission control must perform quota processing for the operation, otherwise it is safe to ignore quota.
	Handles(operation admission.Operation) bool
	// Matches returns true if the specified quota matches the input item
	Matches(resourceQuota *api.ResourceQuota, item runtime.Object) (bool, error)
	// MatchingResources takes the input specified list of resources and returns the set of resources evaluator matches.
	MatchingResources(input []api.ResourceName) []api.ResourceName
	// Usage returns the resource usage for the specified object
	Usage(item runtime.Object) (api.ResourceList, error)
	// UsageStats calculates latest observed usage stats for all objects
	UsageStats(options UsageStatsOptions) (UsageStats, error)
}

可見Evaluator就是一系列操作的集合,是一個Interface,而Registry就是資源類型到Evaluator的一個Map。

Kubernetes中定義了7種資源的Evaluator,都在pkg/quota/evaluator/core/*目錄下,比如pods.go就是PodEvaluator的實現,里面實現了關鍵的UsageStats方法。除了PodEvaluator之外,其他的Evaluator的UsageStats實現,都是genericEvaluator來完成的,其代碼在pkg/quota/generic/evaluator.go:177

具體Evaluator的代碼分析,請讀者自行完成。

下面我給出Worker的內部流程圖,供大家參考: Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的

###ReplenishmentController

rqController負責watch待sync的ResourceQuota,并將其加入到queue和missingUsageQueue中,而上面分析NewResourceQuotaController代碼時提到: replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) 負責replenishmentController的創建,那replenishmentController又是啥呢?我們有必要來看看replenishmentController的創建。

pkg/controller/resourcequota/replenishment_controller.go:131

func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) {
	...
	switch options.GroupKind {
	case api.Kind("Pod"):
		if r.sharedInformerFactory != nil {
			result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
				UpdateFunc: PodReplenishmentUpdateFunc(options),
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			})
			break
		}
		result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
	case api.Kind("Service"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().Services(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().Services(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.Service{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				UpdateFunc: ServiceReplenishmentUpdateFunc(options),
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("ReplicationController"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.ReplicationController{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("PersistentVolumeClaim"):
		if r.sharedInformerFactory != nil {
			result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			})
			break
		}
		// TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths...
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.PersistentVolumeClaim{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("Secret"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().Secrets(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().Secrets(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.Secret{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	case api.Kind("ConfigMap"):
		// TODO move to informer when defined
		_, result = cache.NewInformer(
			&cache.ListWatch{
				ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
					return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).List(options)
				},
				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
					return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).Watch(options)
				},
			},
			&v1.ConfigMap{},
			options.ResyncPeriod(),
			cache.ResourceEventHandlerFuncs{
				DeleteFunc: ObjectReplenishmentDeleteFunc(options),
			},
		)
	default:
		return nil, NewUnhandledGroupKindError(options.GroupKind)
	}
	return result, err
}

整個代碼結構非常清晰,就是根據不同的資源類型,返回對應的Controller。而每種資源的Controller的定義都是通過創建一個對應的Informer完成。Informer中注冊對應的ResourceEventHandlerFuncs:UpdateFunc和DeleteFunc用來出watch的對象發生對應的change時需要調用的方法。

以Pod為例,看看Pod注冊的UpdateFunc:PodReplenishmentUpdateFunc和DeleteFunc:ObjectReplenishmentDeleteFunc,你就知道replenishmentController是用來干啥的了。

pkg/controller/resourcequota/replenishment_controller.go:56

// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
	return func(oldObj, newObj interface{}) {
		oldPod := oldObj.(*v1.Pod)
		newPod := newObj.(*v1.Pod)
		if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) {
			options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
		}
	}
}

// ObjectReplenenishmentDeleteFunc will replenish on every delete
func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {
	return func(obj interface{}) {
		metaObject, err := meta.Accessor(obj)
		if err != nil {
			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
			if !ok {
				glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())
				utilruntime.HandleError(err)
				return
			}
			metaObject, err = meta.Accessor(tombstone.Obj)
			if err != nil {
				glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())
				utilruntime.HandleError(err)
				return
			}
		}
		options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)
	}
}

在NewResourceQuotaController中創建replenishmentController時,已經指定了對應的ReplenishmentFunc為rq.replenishQuota,PodReplenishmentUpdateFunc和ObjectReplenishmentDeleteFunc最終都是調用ReplenishmentFunc(rq.replenishQuota)來進行quota recalculated。

pkg/controller/resourcequota/resource_quota_controller.go:330

// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) {
	...
	for i := range resourceQuotas {
		resourceQuota := resourceQuotas[i].(*v1.ResourceQuota)
		internalResourceQuota := &api.ResourceQuota{}
		if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil {
			glog.Error(err)
			continue
		}
		resourceQuotaResources := quota.ResourceNames(internalResourceQuota.Status.Hard)
		if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
			// 將該resourceQuota加入到隊列queue
			rq.enqueueResourceQuota(resourceQuota)
		}
	}
}

因此replenishmentController就是用來捕獲對應資源的Update/Delete事件,將其對應的ResourceQuota加入到queue中,然后worker再對其進行重新計算Usage。

總結

  • Kubernetes Controller Manager在初始化Controllers時執行startResourceQuotaController啟動創建ResourceQuotaController并執行其Run方法來啟動ResourceQuotaController。

  • ResourceQuotaController中包括兩個隊列:

    • queue:用來存放待sync和recalculate的ResourceQuota

    • missingUsageQueue:用來存放那些丟失Usage信息的ResourceQuota

  • ResourceQuotaController中有兩種Controller:

    • rqController:通過List/Watch對應的資源及變化,根據情況,將ResourceQuota加入到queue和missingUsageQueue。

    • replenishmentControllers:通過監控資源的Update/Delete操作,將ResourceQuota加入到queue。

  • ResourceQuotaController中存在一個Registry對象,用來存放各種資源的Evaluator,包括:

    • PodEvaluator

    • ConfigMapEvaluator

    • PersistentVolumeClaimEvaluator

    • ResourceQuotaEvaluator

    • ReplicationControllerEvaluator

    • ServiceEvaluator

    • SecretEvaluator

  • ResourceQuotaController中的replenishmentControllers包含以下replenishmentController:

    • PodReplenishController

    • ConfigMapReplenishController

    • PersistentVolumeClaimReplenishController

    • ReplicationControllerReplenishController

    • ServiceReplenishController

    • SecretReplenishController

  • ResourceQuotaController中默認存在5個worker對queue中的ResourceQuota Item進行處理。可通過kube-controller-manager的--concurrent-resource-quota-syncs配置。

  • ResourceQuotaController中默認存在5個worker對missingUsageQueue中的ResourceQuota Item進行處理。可通過kube-controller-manager的--concurrent-resource-quota-syncs配置。

  • ResourceQuotaController默認5min會做一次全量的quota usage同步。可通過kube-controller-manager的--resource-quota-sync-period

關于Kubernetes ResourceQuotaController內部實現原理及源碼分析是怎樣的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新巴尔虎右旗| 浮山县| 高安市| 五华县| 广州市| 石台县| 佛学| 射洪县| 永兴县| 松江区| 保德县| 喜德县| 巴彦淖尔市| 延安市| 讷河市| 达州市| 沙湾县| 蛟河市| 濮阳县| 镇远县| 景宁| 齐齐哈尔市| 天等县| 屏山县| 广州市| 宜宾县| 聊城市| 揭阳市| 闽侯县| 新泰市| 宁夏| 驻马店市| 乐平市| 怀来县| 阿坝县| 仙居县| 于田县| 增城市| 额济纳旗| 定安县| 伊春市|