中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何分析SuperEdge 拓撲算法

發布時間:2022-01-17 10:53:49 來源:億速云 閱讀:128 作者:柒染 欄目:云計算

這篇文章主要為大家分析了如何分析SuperEdge 拓撲算法的相關知識點,內容詳細易懂,操作細節合理,具有一定參考價值。如果感興趣的話,不妨跟著跟隨小編一起來看看,下面跟著小編一起深入學習“如何分析SuperEdge 拓撲算法”的知識吧。


前言

 

SuperEdge 介紹

SuperEdge 是基于原生 Kubernetes 的邊緣容器管理系統。該系統把云原生能力擴展到邊緣側,很好的實現了云端對邊緣端的管理和控制。同時 superedge 自研了 service group 實現了基于邊緣計算的服務訪問控制,極大簡化了應用從云端部署到邊緣端的過程。  

SuperEdge service group拓撲感知特性

SuperEdge service group 利用 application-grid-wrapper 實現拓撲感知,完成了同一個 nodeunit 內服務的閉環訪問。

在深入分析 application-grid-wrapper 之前,這里先簡單介紹一下社區 Kubernetes 原生支持的拓撲感知特性[1]

Kubernetes service topology awareness 特性于v1.17發布 alpha 版本,用于實現路由拓撲以及就近訪問特性。用戶需要在service 中添加 topologyKeys 字段標示拓撲key類型,只有具有相同拓撲域的 endpoint 會被訪問到,目前有三種 topologyKeys 可供選擇:

  • "kubernetes.io/hostname":訪問本節點內(     kubernetes.io/hostname      label value相同)的endpoint,如果沒有則service訪問失敗
  • "topology.kubernetes.io/zone":訪問相同zone域內(     topology.kubernetes.io/zone      label value相同)的endpoint,如果沒有則service訪問失敗
  • "topology.kubernetes.io/region":訪問相同region域內(     topology.kubernetes.io/region      label value 相同)的 endpoint,如果沒有則 service 訪問失敗

除了單獨填寫如上某一個拓撲key之外,還可以將這些key構造成列表進行填寫,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],這表示:優先訪問本節點內的 endpoint;如果不存在,則訪問同一個 zone 內的 endpoint;如果再不存在,則訪問同一個 region 內的 endpoint,如果都不存在則訪問失敗。

另外,還可以在列表最后(只能最后一項)添加"*"表示:如果前面拓撲域都失敗,則訪問任何有效的 endpoint,也即沒有限制拓撲了,示例如下:  
# A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.
apiVersion: v1
kind: Service
metadata:  
  name: my-service
spec:  
  selector:    
    app: my-app  
  ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 9376  
  topologyKeys:    
    - "kubernetes.io/hostname"
    - "topology.kubernetes.io/zone" 
    - "topology.kubernetes.io/region"
    - "*"
 

而service group 實現的拓撲感知和社區對比,有如下區別:

  • service group 拓撲 key 可以自定義,也即為 gridUniqKey,使用起來更加靈活;而社區實現目前只有三種選擇:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"。
  • service group 只能填寫一個拓撲 key,也即只能訪問本拓撲域內有效的 endpoint,無法訪問其它拓撲域的 endpoint;而社區可以通過 topologyKey 列表以及"*"實現其它備選拓撲域 endpoint 的訪問。

service group 實現的拓撲感知,service 配置如下:

# A Service that only prefers node zone1al endpoints.
apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
  name: servicegrid-demo-svc
spec:  
  ports:  
  - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo
 

在介紹完 service group 實現的拓撲感知后,我們深入到源碼分析實現細節。同樣的,這里以一個使用示例開始分析:

# step1: labels edge nodes
$ kubectl  get nodes
NAME    STATUS   ROLES    AGE   VERSIO
Nnode0   Ready    <none>   16d   v1.16.7
node1    Ready    <none>   16d   v1.16.7
node2    Ready    <none>   16d   v1.16.7
# nodeunit1(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  
# nodeunit2(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2
$ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2

...

# step3: deploy echo ServiceGrid
$ cat <<EOF | kubectl --kubeconfig config apply -f -
apiVersion: superedge.io/v1
kind: ServiceGrid
metadata:  
  name: servicegrid-demo  
  namespace: default
spec:  
  gridUniqKey: zone1  
  template:    
    selector:      
      appGrid: echo    
    ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 8080
EOF
servicegrid.superedge.io/servicegrid-demo created
# note that there is only one relevant service generated
$ kubectl  get svc
NAME                TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGE
kubernetes          ClusterIP   192.168.0.1       <none>        443/TCP   16d
servicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m
    
# step4: access servicegrid-demo-svc(service topology and closed-looped)
# execute on node0
$ curl 192.168.6.139|grep "node name"        node name:      node0
# execute on node1 and node2
$ curl 192.168.6.139|grep "node name" 
       node name:      node2
$ curl 192.168.6.139|grep "node name"
       node name:      node1
 

在創建完 ServiceGrid CR 后,ServiceGrid Controller 負責根據 ServiceGrid產生對應的 service (包含由 serviceGrid.Spec.GridUniqKey 構成的 topologyKeys annotations);而 application-grid-wrapper 根據 service 實現拓撲感知,下面依次分析。

 

ServiceGrid Controller 分析

ServiceGrid Controller 邏輯和 DeploymentGrid Controller 整體一致,如下:

  • 1、創建并維護 service group 需要的若干 CRDs(包括:ServiceGrid)
  • 2、監聽 ServiceGrid event,并填充 ServiceGrid 到工作隊列中;循環從隊列中取出 ServiceGrid 進行解析,創建并且維護對應的 service
  • 3、監聽 service event,并將相關的  ServiceGrid 塞到工作隊列中進行上述處理,協助上述邏輯達到整體 reconcile 邏輯

注意這里區別于 DeploymentGrid Controller:

  • 一個 ServiceGrid 對象只產生一個 service
  • 只需額外監聽 service event,無需監聽 node 事件。因為 node 的CRUD與 ServiceGrid 無關
  • ServiceGrid 對應產生的 service,命名為:     {ServiceGrid}-svc
func (sgc *ServiceGridController) syncServiceGrid(key string) error {    
    startTime := time.Now()    
    klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)    
    defer func() {        
      klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))    
    }()    
    
    namespace, name, err := cache.SplitMetaNamespaceKey(key)    
    if err != nil {        
      return err    
    }    
    
    sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)    
    if errors.IsNotFound(err) {              
        klog.V(2).Infof("service grid %v has been deleted", key)        
        return nil    
    }    
    if err != nil {        
        return err    
    }    
    
    if sg.Spec.GridUniqKey == "" {           
    sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key")        
        return nil    
    }    
    
    // get service workload list of this grid    
    svcList, err := sgc.getServiceForGrid(sg)    
    if err != nil {        
        return err    
    }    
    
    if sg.DeletionTimestamp != nil {        
        return nil    
    }    
    
    // sync service grid relevant services workload    
    return sgc.reconcile(sg, svcList)
    }
    
func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {    
  svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())    
  if err != nil {        
    return nil, err    
  }    
  
  labelSelector, err := common.GetDefaultSelector(sg.Name)    
  if err != nil {        
      return nil, err    
  }   
     
  canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error)
  {        
         fresh, err := 
  sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})        
      if err != nil {            
        return nil, err        
      }        
      if fresh.UID != sg.UID {           
           return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,                  
              sg.Name, fresh.UID, sg.UID)        
      }        
      return fresh, nil    
    })    
      
    cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)    
    return cm.ClaimService(svcList)
}
    
func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList 
[]*corev1.Service) error {    
    var (        
        adds    []*corev1.Service     
        updates []*corev1.Service     
        deletes []*corev1.Service    
    )    
    
    sgTargetSvcName := util.GetServiceName(g)    
    isExistingSvc := false    
    for _, svc := range svcList {     
        if svc.Name == sgTargetSvcName {            
            isExistingSvc = true     
            template := util.KeepConsistence(g, svc)            
            if !apiequality.Semantic.DeepEqual(template, svc) {           
                updates = append(updates, template)            
            }        
        } else {            
            deletes = append(deletes, svc)        
        }    
    }    
    
    if !isExistingSvc {        
        adds = append(adds, util.CreateService(g))    
    }    
    
    return sgc.syncService(adds, updates, deletes)
}

func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {    
    svc := &corev1.Service{        
        ObjectMeta: metav1.ObjectMeta{            
          Name:      GetServiceName(sg),            
          Namespace: sg.Namespace,   
          // Append existed ServiceGrid labels to service to be created  
          Labels: func() map[string]string {                
              if sg.Labels != nil { 
                  newLabels := sg.Labels                    
                  newLabels[common.GridSelectorName] = sg.Name             
                  newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey                    
                  return newLabels              
             } else {               
                  return map[string]string{                        
                      common.GridSelectorName:        sg.Name,             
                      common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey, 
              }               
          }            
      }(),            
      Annotations: make(map[string]string),       
    },        
    Spec: sg.Spec.Template,   
  }    
  
  keys := make([]string, 1)    
  keys[0] = sg.Spec.GridUniqKey    
  keyData, _ := json.Marshal(keys)     
  svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)    
  
  return svc
}
 

由于邏輯與 DeploymentGrid 類似,這里不展開細節,重點關注 application-grid-wrapper 部分。

 

application-grid-wrapper 分析

在 ServiceGrid Controller 創建完 service 之后,application-grid-wrapper 的作用就開始啟動了:

apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  creationTimestamp: "2021-03-03T07:33:30Z"  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
    name: servicegrid-demo-svc  
    namespace: default  
    ownerReferences:  
    - apiVersion: superedge.io/v1    
      blockOwnerDeletion: true    
      controller: true    
      kind: ServiceGrid    
      name: servicegrid-demo    
      uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581  
    resourceVersion: "127987090"  
    selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc  
    uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc
spec:  
    clusterIP: 192.168.161.1  
    ports:  
    - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo  
  sessionAffinity: None  
  type: ClusterIP
status:  
  loadBalancer: {}
 

為了實現 Kubernetes 零侵入,需要在 kube-proxy 與 apiserver 通信之間添加一層 wrapper,架構如下:

如何分析SuperEdge 拓撲算法  

調用鏈路如下:

kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
 

因此 application-grid-wrapper 會起服務,接受來自 kube-proxy 的請求,如下:

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
    ...    
    klog.Infof("Start to run interceptor server")    
    /* filter     
    */    
    server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
    
    if insecure {        
        return server.ListenAndServe()    
    }    
    ...    
    server.TLSConfig = tlsConfig    
    return server.ListenAndServeTLS("", "")
}

func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {   
    handler := http.Handler(http.NewServeMux())    
    
    handler = s.interceptEndpointsRequest(handler)    
    handler = s.interceptServiceRequest(handler)    
    handler = s.interceptEventRequest(handler)    
    handler = s.interceptNodeRequest(handler)    
    handler = s.logger(handler)    
    
  if debug {        
      handler = s.debugger(handler)   
  }    
  
  return handler
}
 
這里會首先創建 interceptorServer,然后注冊處理函數,由外到內依次如下:  
 
  • debug:接受 debug 請求,返回 wrapper pprof 運行信息

  • logger:打印請求日志

  • node:接受 kube-proxy node GET(/api/v1/nodes/{node})請求,并返回 node信息

  • event:接受 kube-proxy events POST(/events)請求,并將請求轉發給 lite-apiserver

func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {  
   return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {           if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {          
         handler.ServeHTTP(w, r)     
         return      
     }      
     
     targetURL, _ := url.Parse(s.restConfig.Host)      
     reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)      
     reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)      
     reverseProxy.ServeHTTP(w, r) 
    })
  }
 
  • service:接受 kube-proxy service List&Watch(/api/v1/services)請求,并根據 storageCache 內容返回(GetServices)

  • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)請求,并根據 storageCache 內容返回(GetEndpoints)

下面先重點分析 cache 部分的邏輯,然后再回過頭來分析具體的 http handler List&Watch 處理邏輯。

wrapper 為了實現拓撲感知,自己維護了一個 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注冊了這三類資源的處理函數:

type storageCache struct {    
    // hostName is the nodeName of node which application-grid-wrapper deploys on    
    hostName         string    
    wrapperInCluster bool    
    
    // mu lock protect the following map structure    
    mu           sync.RWMutex    
    servicesMap  map[types.NamespacedName]*serviceContainer    
    endpointsMap map[types.NamespacedName]*endpointsContainer    
    nodesMap     map[types.NamespacedName]*nodeContainer    
    
    // service watch channel    
    serviceChan chan<- watch.Event   
    // endpoints watch channel    
    endpointsChan chan<- watch.Event
}
...
func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {    
   msc := &storageCache{        
       hostName:         hostName,   
       wrapperInCluster: wrapperInCluster,        
       servicesMap:      make(map[types.NamespacedName]*serviceContainer), 
       endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),        
       nodesMap:         make(map[types.NamespacedName]*nodeContainer),  
       serviceChan:      serviceNotifier,        
       endpointsChan:    endpointsNotifier,    
    }    
    
    return msc
}
...
func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
    ...    
    if err := s.setupInformers(ctx.Done()); err != nil {        
        return err    
   }    
  
   klog.Infof("Start to run interceptor server")    
   /* filter     
   */    
  server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
  ...    
  return server.ListenAndServeTLS("", "")
}

func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {   
    klog.Infof("Start to run service and endpoints informers")    
    noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)    
    if err != nil {        
        klog.Errorf("can't parse proxy label, %v", err)        
        return err    
    }    
    
    noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)    
    if err != nil {        
        klog.Errorf("can't parse headless label, %v", err)        
        return err    
    }    
        
    labelSelector := labels.NewSelector()    
    labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) 
    
    resyncPeriod := time.Minute * 5   
    client := kubernetes.NewForConfigOrDie(s.restConfig)    
    nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)    
    informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,        
      informers.WithTweakListOptions(func(options *metav1.ListOptions) {          
          options.LabelSelector = labelSelector.String()        
       }))    
         
    nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()    
    serviceInformer := informerFactory.Core().V1().Services().Informer()    
    endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()   
    
    /*    
    */    
    nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)    
    serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)    
   
    endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)    
    
    go nodeInformer.Run(stop)    
    go serviceInformer.Run(stop)    
    go endpointsInformer.Run(stop)    
    
    if !cache.WaitForNamedCacheSync("node", stop,        
        nodeInformer.HasSynced,        
        serviceInformer.HasSynced,     
        endpointsInformer.HasSynced) {      
        return fmt.Errorf("can't sync informers")    
    }    
    
    return nil
}

func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {   
     return &nodeHandler{cache: sc}
}

func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {    
    return &serviceHandler{cache: sc}

}
func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {    
    return &endpointsHandler{cache: sc}
}
 
這里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:  
 

1、NodeEventHandler

NodeEventHandler 負責監聽 node 資源相關 event,并將 node 以及 node Labels 添加到 storageCache.nodesMap 中(key為nodeName,value為node以及node labels)。

func (nh *nodeHandler) add(node *v1.Node) {    
    sc := nh.cache    
    
    sc.mu.Lock()    
    
    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Adding node %v", nodeKey)    
    sc.nodesMap[nodeKey] = &nodeContainer{        
        node:   node,        
        labels: node.Labels,    
    }    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
        }
}

func (nh *nodeHandler) update(node *v1.Node) {    
    sc := nh.cache    
    
    sc.mu.Lock()    
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Updating node %v", nodeKey)    
    nodeContainer, found := sc.nodesMap[nodeKey]    
    if !found {        
        sc.mu.Unlock()        
        klog.Errorf("Updating non-existed node %v", nodeKey)        
        return    
    }    
    
    nodeContainer.node = node    
    // return directly when labels of node stay unchanged    
    if reflect.DeepEqual(node.Labels, nodeContainer.labels) {        
        sc.mu.Unlock()        
        return    
    }    
    nodeContainer.labels = node.Labels    
    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps {        
        sc.endpointsChan <- eps  
    }
}
...
 

同時由于 node 的改變會影響 endpoint,因此會調用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {   
    evts := make([]watch.Event, 0)   
    for name, endpointsContainer := range sc.endpointsMap {       
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)   
        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
            continue        
        }        
        sc.endpointsMap[name].modified = newEps        
        evts = append(evts, watch.Event{            
            Type:   watch.Modified,      
            Object: newEps,     
       })    
   }    
   return evts
}
 

rebuildEndpointsMap 是 cache 的核心函數,同時也是拓撲感知的算法實現:

// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels
func pruneEndpoints(hostName string,
    nodes map[types.NamespacedName]*nodeContainer,    
    services map[types.NamespacedName]*serviceContainer,    
    eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {   
    
    epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}  
    
    if wrapperInCluster {       
        eps = genLocalEndpoints(eps) 
    }   
    
    // dangling endpoints    
    svc, ok := services[epsKey] 
    if !ok {        
        klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)        
        return eps    
    }   
        
    // normal service    
    if len(svc.keys) == 0 {     
        klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)    
        return eps   
    }    
    
    // topology endpoints    
    newEps := eps.DeepCopy()    
    for si := range newEps.Subsets { 
        subnet := &newEps.Subsets[si]
        subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)        
        subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)    
    }    
    klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets) 
    
    return newEps
}

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
map[types.NamespacedName]*nodeContainer,    
    addresses []v1.EndpointAddress) []v1.EndpointAddress {    
    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
    if !found {        
        return nil    
    }    
    
    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
    for i := range addresses {       
        addr := addresses[i]        
        if nodeName := addr.NodeName; nodeName != nil {            
             epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]           
             if !found {             
                 continue            
             }            
             if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
                 filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
             }        
        }    
    }    
    
    return filteredEndpointAddresses 
}

func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { 
    if n1 == nil || n2 == nil {      
        return false    
    }    
    
    for _, key := range keys {       
        val1, v1found := n1[key]     
        val2, v2found := n2[key]    
        
        if v1found && v2found && val1 == val2 {            
        return true        
        }    
    }    
    
    return false   
}
 

算法邏輯如下:

  • 判斷 endpoint 是否為 default kubernetes service,如果是,則將該 endpoint 轉化為 wrapper 所在邊緣節點的 lite-apiserver 地址(127.0.0.1)和端口(5100     3)。
apiVersion: v1
kind: Endpoints
metadata:  
  annotations:    
    superedge.io/local-endpoint: 127.0.0.1    
    superedge.io/local-port: "51003" 
  name: kubernetes  
  namespace: default
subsets:
- addresses: 
  - ip: 172.31.0.60  
  ports:  
  - name: https    
  port: xxx    
  protocol: TCP
 
func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {    
    if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {        
        return eps    
    }    
    
    klog.V(4).Infof("begin to gen local ep %v", eps)    
    ipAddress, e := eps.Annotations[EdgeLocalEndpoint]    
    if !e {        
        return eps    
    }    
        
    portStr, e := eps.Annotations[EdgeLocalPort]    
    if !e {        
        return eps    
    }    
    
    klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)    
    port, err := strconv.ParseInt(portStr, 10, 32)    
    if err != nil {        
        klog.Errorf("parse int %s err %v", portStr, err)        
        return eps    
    }  
    
    ip := net.ParseIP(ipAddress)   
    if ip == nil {        
        klog.Warningf("parse ip %s nil", ipAddress)        
        return eps    
    }    
    
    nep := eps.DeepCopy()    
    nep.Subsets = []v1.EndpointSubset{        
       {            
           Addresses: []v1.EndpointAddress{                
              {                    
                   IP: ipAddress,   
                },            
            },            
            Ports: []v1.EndpointPort{                
                 {                    
                     Protocol: v1.ProtocolTCP,                    
                     Port:     int32(port),                    
                    Name:     "https",                
                },         
             },      
         },   
    }  
      
    klog.V(4).Infof("gen new endpoint complete %v", nep)      
    return nep
}
 
這樣做的目的是使邊緣節點上的服務采用集群內 (InCluster) 方式訪問的 apiserver 為本地的 lite-apiserver,而不是云端的 apiserver。  
 
  • 從 storageCache.servicesMap cache 中根據 endpoint 名稱(namespace/name) 取出對應 service,如果該 service 沒有 topologyKeys 則無需做拓撲轉化(非 service group)。

func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {   
    if !hasTopologyKey(objectMeta) { 
        return nil    
    }   
    
    var keys []string   
    keyData := objectMeta.Annotations[TopologyAnnotationsKey]    
    if err := json.Unmarshal([]byte(keyData), &keys); err != nil {        
        klog.Errorf("can't parse topology keys %s, %v", keyData, err)       
        return nil    
    }    
    
    return keys
}
 
  • 調用 filterConcernedAddresses 過濾 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一個 service topologyKeys 中的 endpoint。

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
map[types.NamespacedName]*nodeContainer,    
    addresses []v1.EndpointAddress) []v1.EndpointAddress {    
    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
    if !found {        
        return nil   
    }
        
    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
    for i := range addresses {       
        addr := addresses[i]        
        if nodeName := addr.NodeName; nodeName != nil {            
            epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]   
            if !found {                
                continue            
            }           
            if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
                filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
            }       
        }   
    }    
    
    return filteredEndpointAddresses
}
    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {   
        if n1 == nil || n2 == nil {       
            return false    
        }   
    
       for _, key := range keys {       
           val1, v1found := n1[key]       
           val2, v2found := n2[key]       
    
      if v1found && v2found && val1 == val2 {           
          return true        
       }    
   }    
  
   return false
}
 
注意:如果 wrapper 所在邊緣節點沒有 service topologyKeys 標簽,則也無法訪問該 service。  
回到 rebuildEndpointsMap,在調用 pruneEndpoints 刷新了同一個拓撲域內的 endpoint 后,會將修改后的 endpoints 賦值給 storageCache .endpointsMap [endpoint]. modified (該字段記錄了拓撲感知后修改的endpoints)。  
func (nh *nodeHandler) add(node *v1.Node) {    
    sc := nh.cache   
    
    sc.mu.Lock()    
    
    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Adding node %v", nodeKey)    
    sc.nodesMap[nodeKey] = &nodeContainer{        
        node:   node,        
        labels: node.Labels,    
    }    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
    }
}

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {    
    evts := make([]watch.Event, 0)  
    for name, endpointsContainer := range sc.endpointsMap {        
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)        
        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
            continue        
        }        
        sc.endpointsMap[name].modified = newEps        
        evts = append(evts, watch.Event{            
            Type:   watch.Modified,   
            Object: newEps,        
        })    
    }    
    return evts
}
 
另外,如果 endpoints (拓撲感知后修改的 endpoints)發生改變,會構建 watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。  

2、ServiceEventHandler

storageCache.servicesMap 結構體 key 為 service 名稱(namespace/name),value 為 serviceContainer,包含如下數據:

  • svc:service對象
  • keys:service topologyKeys
對于 service 資源的改動,這里用 Update event 說明:  
func (sh *serviceHandler) update(service *v1.Service) {    
    sc := sh.cache   
    
    sc.mu.Lock()    
    serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}    
    klog.Infof("Updating service %v", serviceKey)    
    newTopologyKeys := getTopologyKeys(&service.ObjectMeta)    
    serviceContainer, found := sc.servicesMap[serviceKey]    
    if !found {        
        sc.mu.Unlock()        
        klog.Errorf("update non-existed service, %v", serviceKey)        
        return    
    }   
   
    sc.serviceChan <- watch.Event{   
        Type:   watch.Modified,       
        Object: service,    
    }    
    
    serviceContainer.svc = service   
    // return directly when topologyKeys of service stay unchanged    
    if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {        
        sc.mu.Unlock()        
        return    
    }    
    
    serviceContainer.keys = newTopologyKeys    
    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
    }
}
 
邏輯如下:  
 
  • 獲取 service topologyKeys
  • 構建 service event.Modified event
  • 比較 service topologyKeys 與已經存在的是否有差異
  • 如果有差異則更新 topologyKeys,且調用 rebuildEndpointsMap 刷新該 service 對應的endpoints,如果 endpoints 發生變化,則構建 endpoints watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。

3、EndpointsEventHandler

storageCache.endpointsMap 結構體 key 為 endpoints 名稱(namespace/name),value 為 endpointsContainer,包含如下數據:

  • endpoints:拓撲修改前的 endpoints
  • modified:拓撲修改后的 endpoints
對于 endpoints 資源的改動,這里用 Update event 說   明:  
func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {    
    sc := eh.cache   
     
    sc.mu.Lock()   
    endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}    
    klog.Infof("Updating endpoints %v", endpointsKey)    
    
    endpointsContainer, found := sc.endpointsMap[endpointsKey]    
    if !found {        
        sc.mu.Unlock()       
        klog.Errorf("Updating non-existed endpoints %v", endpointsKey)    
        return    
    }    
    endpointsContainer.endpoints = endpoints    
    newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)    
    changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)    
    if changed {        
        endpointsContainer.modified = newEps   
    }    
    sc.mu.Unlock()    
    
    if changed {       
        sc.endpointsChan <- watch.Event{            
            Type:   watch.Modified,   
            Object: newEps,       
        }   
    }
}
 
邏輯如下:  
 
  • 更新 endpointsContainer.endpoint 為新的 endpoints 對象
  • 調用 pruneEndpoints 獲取拓撲刷新后的 endpoints
  • 比較 endpointsContainer.modified 與新刷新后的 endpoints
  • 如果有差異則更新 endpointsContainer.modified,則構建 endpoints watch event,傳遞給 endpoints handler (interceptEndpointsRequest)處理。
在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我們回到具體的 http handler List&Watch 處理邏輯上,這里以 endpoints 為例:      
func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {    
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
        if r.Method != http.MethodGet || 
!strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {            
            handler.ServeHTTP(w, r)   
            return       
        }        
        
        queries := r.URL.Query()     
        acceptType := r.Header.Get("Accept")        
        info, found := s.parseAccept(acceptType, s.mediaSerializer)       
        if !found {            
            klog.Errorf("can't find %s serializer", acceptType)           
            w.WriteHeader(http.StatusBadRequest)            
            return        
        }        
        
        encoder := 
scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)   
        // list request        
        if queries.Get("watch") == "" {           
            w.Header().Set("Content-Type", info.MediaType)            
            allEndpoints := s.cache.GetEndpoints()           
            epsItems := make([]v1.Endpoints, 0, len(allEndpoints))        
            for _, eps := range allEndpoints {                
                epsItems = append(epsItems, *eps)            
            }            
            
            epsList := &v1.EndpointsList{                
                Items: epsItems,     
             }           
             
          err := encoder.Encode(epsList, w)            
          if err != nil {             
              klog.Errorf("can't marshal endpoints list, %v", err)        
             w.WriteHeader(http.StatusInternalServerError)               
              return            
          }    
            
              return       
      }       
         
      // watch request       
      timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")   
      timeout := time.Minute        
      if timeoutSecondsStr != "" {  
          timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))       
      }        
        
      timer := time.NewTimer(timeout)      
      defer timer.Stop()       
      
      flusher, ok := w.(http.Flusher)      
      if !ok {            
          klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)            
         w.WriteHeader(http.StatusMethodNotAllowed)            
          return        
        }    
        
        e := restclientwatch.NewEncoder(           
           streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),       
                scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),            
          encoder)        
      if info.MediaType == runtime.ContentTypeProtobuf {         
          w.Header().Set("Content-Type", 
    runtime.ContentTypeProtobuf+";stream=watch")       
      } else {           
          w.Header().Set("Content-Type", runtime.ContentTypeJSON)        
      }        
      w.Header().Set("Transfer-Encoding", "chunked")        
      w.WriteHeader(http.StatusOK)   
      flusher.Flush()        
      for {            
          select {            
          case <-r.Context().Done(): 
              return            
          case <-timer.C:            
              return           
          case evt := <-s.endpointsWatchCh:               
              klog.V(4).Infof("Send endpoint watch event: %+#v", evt)     
              err := e.Encode(&evt)   
              if err != nil {         
              klog.Errorf("can't encode watch event, %v", err)             
              return                
          }               
          
          if len(s.endpointsWatchCh) == 0 {                    
              flusher.Flush()         
            }            
         }         
      }    
  })
}
 

邏輯如下:

  • 如果為 List請求,則調用 GetEndpoints 獲取拓撲修改后的 endpoints 列表,并返回

func (sc *storageCache) GetEndpoints() []*v1.Endpoints {    
    sc.mu.RLock()   
    defer sc.mu.RUnlock()   
    
    epList := make([]*v1.Endpoints, 0, 
len(sc.endpointsMap))    
    for _, v := range sc.endpointsMap {        
        epList = append(epList, v.modified)    
    }    
    return epList
}
 
  • 如果為 Watch 請求,則不斷從 storageCache.endpointsWatchCh 管道中接受 watch event,并返回 interceptServiceRequest 邏輯與 interceptEndpointsRequest 一致,這里不再贅述 。

 

總結

  • SuperEdge service group 利用 application-grid-wrapper 實現拓撲感知,完成了同一個 nodeunit 內服務的閉環訪問
  • service group 實現的拓撲感知和 Kubernetes 社區原生實現對比,有如下區別:
    • service group 拓撲 key 可以自定義,也即為 gridUniqKey,使用起來更加靈活;而社區實現目前只有三種選擇:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
    • service group 只能填寫一個拓撲 key,也即只能訪問本拓撲域內有效的 endpoint,無法訪問其它拓撲域的 endpoint;而社區可以通過 topologyKey 列表以及"*"實現其它備選拓撲域 endpoint的訪問
  • ServiceGrid Controller 負責根據 ServiceGrid 產生對應的 service(包含由serviceGrid.Spec.GridUniqKey 構成的 topologyKeys annotations),邏輯和 DeploymentGrid Controller 整體一致,如下:
    • 創建并維護 service group 需要的若干CRDs(包括:ServiceGrid)
    • 監聽 ServiceGrid event,并填充 ServiceGrid到工作隊列中;循環從隊列中取出 ServiceGrid 進行解析,創建并且維護對應的 service
    • 監聽 service event,并將相關的 ServiceGrid 塞到工作隊列中進行上述處理,協助上述邏輯達到整體 reconcile 邏輯
  • 為了實現 Kubernetes 零侵入,需要在 kube-proxy 與 apiserver 通信之間添加一層 wrapper,調用鏈路如下:     kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
  • application-grid-wrapper 是一個 http server,接受來自 kube-proxy 的請求,同時維護一個資源緩存,處理函數由外到內依次如下:
    • debug:接受 debug 請求,返回 wrapper pprof 運行信息
    • logger:打印請求日志
    • node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 請求,并返回 node 信息
    • event:接受 kube-proxy events POST (/events) 請求,并將請求轉發給 lite-apiserver
    • service:接受 kube-proxy service List&Watch (/api/v1/services) 請求,并根據 storageCache 內容返回 (GetServices)
    • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 請求,并根據 storageCache 內容返回(GetEndpoints)
  • wrapper 為了實現拓撲感知,維護了一個資源 cache,包括:node,service,endpoint,同時注冊了相關 event 處理函數。核心拓撲算法邏輯為:調用 filterConcernedAddresses 過濾 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一個 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在邊緣節點沒有 service topologyKeys 標簽,則也無法訪問該service
  • wrapper 接受來自 kube-proxy 對 endpoints 以及 service 的 List&Watch 請求,以endpoints 為例:如果為List 請求,則調用 GetEndpoints 獲取拓撲修改后的 endpoints 列表,并返回;如果為 Watch 請求,則不斷從storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 邏輯與 endpoints 一致

關于“如何分析SuperEdge 拓撲算法”就介紹到這了,更多相關內容可以搜索億速云以前的文章,希望能夠幫助大家答疑解惑,請多多支持億速云網站!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

上饶县| 冷水江市| 德阳市| 普兰店市| 塘沽区| 嵩明县| 红安县| 巴林右旗| 文化| 湖南省| 富宁县| 阳高县| 韶山市| 潞西市| 南投市| 浦江县| 荣成市| 巨野县| 西华县| 大埔县| 苍梧县| 鲁山县| 南丰县| 南安市| 太仓市| 葵青区| 施甸县| 永安市| 尚义县| 弥渡县| 肇庆市| 延寿县| 黎平县| 余姚市| 宾川县| 喀喇沁旗| 灌阳县| 乌兰浩特市| 海口市| 武安市| 民县|