您好,登錄后才能下訂單哦!
本篇內容主要講解“apiserver的list-watch怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“apiserver的list-watch怎么使用”吧!
上圖是一個典型的Pod創建過程,在這個過程中,每次當kubectl創建了ReplicaSet對象后,controller-manager都是通過list-watch這種方式得到了最新的ReplicaSet對象,并執行自己的邏輯來創建Pod對象。其他的幾個組件,Scheduler/Kubelet也是一樣,通過list-watch得知變化并進行處理。這是組件的處理端代碼:
c.NodeLister.Store, c.nodePopulator = framework.NewInformer( c.createNodeLW(), ...(1) &api.Node{}, ...(2) 0, ...(3) framework.ResourceEventHandlerFuncs{ ...(4) AddFunc: c.addNodeToCache, ...(5) UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, )
其中(1)是list-watch函數,(4)(5)則是相應事件觸發操作的入口。
list-watch操作需要做這么幾件事:
由組件向apiserver而不是etcd發起watch請求,在組件啟動時就進行訂閱,告訴apiserver需要知道什么數據發生變化。Watch是一個典型的發布-訂閱模式。
組件向apiserver發起的watch請求是可以帶條件的,例如,scheduler想要watch的是所有未被調度的Pod,也就是滿足Pod.destNode=""的Pod來進行調度操作;而kubelet只關心自己節點上的Pod列表。apiserver向etcd發起的watch是沒有條件的,只能知道某個數據發生了變化或創建、刪除,但不能過濾具體的值。也就是說對象數據的條件過濾必須在apiserver端而不是etcd端完成。
list是watch失敗,數據太過陳舊后的彌補手段,這方面詳見 基于list-watch的Kubernetes異步事件處理框架詳解-客戶端部分。list本身是一個簡單的列表操作,和其它apiserver的增刪改操作一樣,不再多描述細節。
既然watch本身是一個apiserver提供的http restful的API,那么就按照API的方式去閱讀它的代碼,按照apiserver的基礎功能實現一文所描述,我們來看它的代碼,
關鍵的處理API注冊代碼pkg/apiserver/api_installer.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,... ... lister, isLister := storage.(rest.Lister) watcher, isWatcher := storage.(rest.Watcher) ...(1) ... case "LIST": // List all resources of a kind. ...(2) doc := "list objects of kind " + kind if hasSubresource { doc = "list " + subresource + " of objects of kind " + kind } handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) ...(3)
一個rest.Storage
對象會被轉換為watcher
和lister
對象
提供list和watch服務的入口是同一個,在API接口中是通過 GET /pods?watch=true
這種方式來區分是list還是watch
API處理函數是由lister
和watcher
經過ListResource()
合體后完成的。
那么就看看ListResource()
的具體實現吧,/pkg/apiserver/resthandler.go
func ListResource(r rest.Lister, rw rest.Watcher,... { ... if (opts.Watch || forceWatch) && rw != nil { watcher, err := rw.Watch(ctx, &opts) ...(1) .... serveWatch(watcher, scope, req, res, timeout) return } result, err := r.List(ctx, &opts) ...(2) write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)
每次有一個watch的url請求過來,都會調用rw.Watch()
創建一個watcher
,好吧這里的名字和上面那一層的名字重復了,但我們可以區分開,然后使用serveWatch()
來處理這個請求。watcher的生命周期是每個http請求的,這一點非常重要。
list在這里是另外一個分支,和watch分別處理,可以忽略。
響應http請求的過程serveWatch()
的代碼在/pkg/apiserver/watch.go
里面
func serveWatch(watcher watch.Interface... { server.ServeHTTP(res.ResponseWriter, req.Request) } func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { for { select { case event, ok := <-s.watching.ResultChan(): obj := event.Object if err := s.embeddedEncoder.EncodeToStream(obj, buf); ... }
這段的操作基本毫無技術含量,就是從watcher
的結果channel中讀取一個event對象,然后持續不斷的編碼寫入到http response的流當中。
這是整個過程的圖形化描述:
所以,我們的問題就回到了
watcher
這個對象,嚴格來說是watch.Interface
的對象,位置在pkg/watch/watch.go
中,是怎么被創建出來的?
這個watcher
對象是怎么從etcd中獲得變化的數據的?又是怎么過濾條件的?
回到上面的代碼追蹤過程來看,watcher(watch.Interface)對象是被Rest.Storage對象創建出來的。從上一篇apiserver的基礎功能實現 可以知道,所有的Rest.Storage分兩層,一層是每個對象自己的邏輯,另一層則是通過通用的操作來搞定,像watch這樣的操作應該是通用的,所以我們看這個源代碼
/pkg/registry/generic/registry/store.go
func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) { ... return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion) } func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) ...(1) return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc) }
果然,我們在(1)這里找到了生成Watch的函數,但這個工作是由e.Storage來完成的,所以我們需要找一個具體的Storage的生成過程,以Pod為例子
/pkg/registry/pod/etcd/etcd.go
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage { prefix := "/pods" storageInterface := opts.Decorator( opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc) ...(1) store := ®istry.Store{ ... Storage: storageInterface, ...(2) } return PodStorage{ Pod: &REST{store, proxyTransport}, ...(3)
這(1)就是Storage的生成現場,傳入的參數包括了一個緩存Pod的數量。(2)(3)是和上面代碼的連接點。那么現在問題就轉化為追尋Decorator
這個東西具體是怎么生成的,需要重復剛才的過程,往上搜索opts是怎么搞進來的。
/pkg/master/master.go - GetRESTOptionsOrDie()
/pkg/genericapiserver/genericapiserver.go - StorageDecorator()
/pkg/registry/generic/registry/storage_factory.go - StorageWithCacher()
/pkg/storage/cacher.go
OK,這樣我們就來到正題,一個具體的watch緩存的實現了!
把上面這個過程用一幅圖表示:
看代碼,首要看的是數據結構,以及考慮這個數據結構和需要解決的問題之間的關系。
對于cacher這結構來說,我們從外看需求,可以知道這是一個Storage,用于提供某個類型的數據,例如Pod的增刪改查請求,同時它又用于watch,用于在client端需要對某個key的變化感興趣時,創建一個watcher來源源不斷的提供新的數據給客戶端。
那么cacher是怎么滿足這些需求的呢?答案就在它的結構里面:
type Cacher struct { // Underlying storage.Interface. storage Interface // "sliding window" of recent changes of objects and the current state. watchCache *watchCache reflector *cache.Reflector // Registered watchers. watcherIdx int watchers map[int]*cacheWatcher }
略去里面的鎖(在看代碼的時候一開始要忽略鎖的存在,鎖是后期為了避免破壞數據再加上去的,不影響數據流),略去里面的一些非關鍵的成員,現在我們剩下這3段重要的成員,其中
storage
是連接etcd的,也就是背后的裸存儲
watchCache
并不僅僅是和注釋里面說的那樣,是個滑動窗口,里面存儲了所有數據+滑動窗口
watchers
這是為每個請求創建的struct,每個watch的client上來后都會被創建一個,所以這里有個map
當然,這3個成員的作用是我看了所有代碼后,總結出來的,一開始讀代碼時不妨先在腦子里面有個定位,然后在看下面的方法時不斷修正這個定位。那么,接下來就看看具體的方法是怎么讓數據在這些結構里面流動的吧!
初始化方法
func NewCacherFromConfig(config CacherConfig) *Cacher { ... cacher.startCaching(stopCh) } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { ... if err := c.reflector.ListAndWatch(stopChannel); err != nil { glog.Errorf("unexpected ListAndWatch error: %v", err) } }
其他的部分都是陳詞濫調,只有startCaching()
這段有點意思,這里啟動一個go協程,最后啟動了c.reflector.ListAndWatch()
這個方法,如果對k8s的基本有了解的話,這個其實就是一個把遠端數據源源不斷的同步到本地的方法,那么數據落在什么地方呢?往上看可以看到
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
也就是說從創建cacher的實例開始,就會從etcd中把所有Pod的數據同步到watchCache里面來。這也就印證了watchCache是數據從etcd過來的第一站。
增刪改方法
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { return c.storage.Create(ctx, key, obj, out, ttl) }
大部分方法都很無聊,就是短路到底層的storage直接執行。
Watch方法
// Implements storage.Interface. func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) c.watchers[c.watcherIdx] = watcher c.watcherIdx++ return watcher, nil }
這里的邏輯就比較清晰,首先從watchCache中拿到從某個resourceVersion以來的所有數據——initEvents,然后用這個數據創建了一個watcher返回出去為某個客戶端提供服務。
List方法
// Implements storage.Interface. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error { filterFunc := filterFunction(key, c.keyFunc, filter) objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV) if err != nil { return fmt.Errorf("failed to wait for fresh list: %v", err) } for _, obj := range objs { if filterFunc(object) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) } } }
從這段代碼中我們可以看出2件事,一是list的數據都是從watchCache中獲取的,二是獲取后通過filterFunc過濾了一遍然后返回出去。
這個結構應該是緩存的核心結構,從上一層的代碼分析中我們已經知道了對這個結構的需求,包括存儲所有這個類型的數據,包括當有新的數據過來時把數據扔到cacheWatcher
里面去,總之,提供List和Watch兩大輸出。
type watchCache struct { // cache is used a cyclic buffer - its first element (with the smallest // resourceVersion) is defined by startIndex, its last element is defined // by endIndex (if cache is full it will be startIndex + capacity). // Both startIndex and endIndex can be greater than buffer capacity - // you should always apply modulo capacity to get an index in cache array. cache []watchCacheElement startIndex int endIndex int // store will effectively support LIST operation from the "end of cache // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. store cache.Store }
這里的關鍵數據結構依然是2個
cache
環形隊列,存儲有限個數的最新數據
store
底層實際上是個線程安全的hashMap,存儲全量數據
那么繼續看看方法是怎么運轉的吧~
增刪改方法
func (w *watchCache) Update(obj interface{}) error { event := watch.Event{Type: watch.Modified, Object: object} f := func(obj runtime.Object) error { return w.store.Update(obj) } return w.processEvent(event, resourceVersion, f) } func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error { previous, exists, err := w.store.Get(event.Object) watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion} w.onEvent(watchCacheEvent) w.updateCache(resourceVersion, watchCacheEvent) } // Assumes that lock is already held for write. func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) { w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} w.endIndex++ }
所有的增刪改方法做的事情都差不多,就是在store
里面存具體的數據,然后調用processEvent()
去增加環形隊列里面的數據,如果詳細看一下onEvent
的操作,就會發現這個操作的本質是落在cacher.go里面:
func (c *Cacher) processEvent(event watchCacheEvent) { for _, watcher := range c.watchers { watcher.add(event) } }
往所有的watcher里面挨個添加數據。總體來說,我們可以從上面的代碼中得出一個結論:cache
里面存儲的是Event,也就是有prevObject
的,對于所有操作都會在cache
里面保存,但對于store來說,只存儲當下的數據,刪了就刪了,改了就改了。
WaitUntilFreshAndList()
這里本來應該討論List()方法的,但在cacher
里面的List()
實際上使用的是這個,所以我們看這個方法。
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) { startTime := w.clock.Now() go func() { w.cond.Broadcast() }() for w.resourceVersion < resourceVersion { w.cond.Wait() } return w.store.List(), w.resourceVersion, nil }
這個方法比較繞,前面使用了一堆cond
通知來和其他協程通信,最后還是調用了store.List()
把數據返回出去。后面來具體分析這里的協調機制。
GetAllEventsSinceThreadUnsafe()
這個方法在cacher
的創建cacheWatcher
里面使用,把當前store
里面的所有數據都搞出來,然后把store
里面的數據都轉換為AddEvent
,配上cache
里面的Event,全部返回出去。
這個結構是每個watch的client都會擁有一個的,從上面的分析中我們也能得出這個結構的需求,就是從watchCache
里面搞一些數據,然后寫到客戶端那邊。
// cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event filter FilterFunc stopped bool forget func(bool) }
這段代碼比較簡單,就不去分析方法了,簡單說就是數據在增加的時候放到input
這個channel里面去,通過filter
然后輸出到result
這個channel里面去。
到此,相信大家對“apiserver的list-watch怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。