中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

# IT明星不是夢 # 圖解kubernetes容器狀態同步

發布時間:2020-02-14 11:07:52 來源:網絡 閱讀:819 作者:sdxin 欄目:云計算

在K8s中將Pod調度到某一臺Node節點之后,后續的狀態維護信息則是由對應機器上的kubelet進行維護,如何實時反饋本地運行狀態,并通知apiserver則是設計的難點, 本節主要是通過感知Pod狀態變化和探測狀態改變兩個流程來實際分析其核心數據結構,來了解內部設計

1. 狀態管理

1.1 靜態Pod

# IT明星不是夢 # 圖解kubernetes容器狀態同步
靜態Pod主要是指的那些不是通過感知apiserver創建的pod, 因為apiserver上并不包含,但是同時也需要維護和獲取這類Pod的狀態, k8s中就設計了一個鏡像Pod的概念,其實就是為靜態Pod鏡像出來一個Pod該Pod的主要信息與靜態Pod一致,并且在apiserver中進行創建,通過apiserver可以感知的這個鏡像Pod來反映真實的靜態Pod的狀態,?

1.2 狀態數據源

# IT明星不是夢 # 圖解kubernetes容器狀態同步
statusManager是進行狀態同步的關鍵組件其需要綜合當前Pod運行中的數據和apiserver存儲的數據,從而決定最終的狀態轉換, 這里先關注圖上畫出來的,更多的狀態等后續會一一介紹

2. 版本一致性

type versionedPodStatus struct {
    status v1.PodStatus
    // 單調遞增的版本號(每個pod)
    version uint64
    // Pod name & namespace, for sending updates to API server.
    podName      string
    podNamespace string
}

在Kubelet中為保證與apiserver端信息的同步,在本地保存了一個Pod狀態版本信息,其里面除了保存當前Pod的狀態數據還有一個版本版本號,通過單調遞增的版本號的對比來確定是否進行狀態的同步

3. 核心源碼實現

statusManager的流程其實還是蠻復雜的,今天我們就只講一個場景,即kubelet通過apiserver感知到一個Pod更新,然后順著該功能的數據流來進行梳理statusMangaer里面的數據流轉

3.1 核心數據結構

manager中的核心狀態相關的數據結構可以主要分為兩大類:映射數據維護(podManager、podStatuses、apiStatusVersions)數據通信管道(podStatusChannel), 剩余的則是對與apiserver通信的kublet和進行pod刪除檢查的?podDeletionSafety

type manager struct {
    kubeClient clientset.Interface
        // 管理緩存Pod,包含鏡像pod和靜態pod的映射
    podManager kubepod.Manager
    // 從pod UID映射到相應pod的版本狀態信息 。
    podStatuses      map[types.UID]versionedPodStatus
    podStatusesLock  sync.RWMutex
    podStatusChannel chan podStatusSyncRequest
    // 存儲鏡像pod的版本
    apiStatusVersions map[kubetypes.MirrorPodUID]uint64
    podDeletionSafety PodDeletionSafetyProvider
}

3.2 設置Pod狀態

設置Pod狀態主要是位于kubelet中的syncPod中,在接收到pod事件變更之后,會與apiserver進行 Pod最新數據的同步從而獲取當前pod在apiserver端的最新狀態

func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
    m.podStatusesLock.Lock()
    defer m.podStatusesLock.Unlock()

    for _, c := range pod.Status.Conditions {
        if !kubetypes.PodConditionByKubelet(c.Type) {
            klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
                "But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
        }
    }
    // Make sure we're caching a deep copy.
    status = *status.DeepCopy()

    // 如果Pod被刪除了則需要強制與apiserver進行信息的同步
    m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}

3.3 更新內部緩存狀態產生同步事件

# IT明星不是夢 # 圖解kubernetes容器狀態同步

3.3.1 獲取緩存狀態

    var oldStatus v1.PodStatus
    // 檢測之前的本地緩存數據
    cachedStatus, isCached := m.podStatuses[pod.UID]
    if isCached {
        oldStatus = cachedStatus.status
    } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
        oldStatus = mirrorPod.Status
    } else {
        oldStatus = pod.Status
    }

3.3.2 檢測容器狀態

檢測容器狀態主要是針對容器終止狀態轉發的合法性進行檢測,其實就是根據設定的Pod的RestartPolicy來檢測針對一個終止的容器是否可以進行重啟

    if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
        klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
        return false
    }
    if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
        klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
        return false
    }

3.3.3 更新PodCondition最后轉換時間

通過最新的status里面的condition設定對應PodCondition的LastTransitionTime更新時間未當前時間

    // Set ContainersReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)

    // Set ReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodReady)

    // Set InitializedCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)

    // Set PodScheduledCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)

3.3.4 校對時間截斷過長信息

首先會根據當前容器的個數,從而決定每個容器最大的字節數大小,然后對容器里面的終止狀態里面的Message信息,進行截斷,同時進行時間的校對

    normalizeStatus(pod, &status)

3.3.5 狀態更新條件檢測

如果之前已經緩存了對應的數據,并且緩存的數據與當前的狀態未發生改變,也不需要強制更新,就直接返回

    if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
        // 如果不強制更新 ,默認是true此處不會成立
        klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
        return false // No new status.
    }

3.3.6 生成同步事件更新緩存

生成最新的狀態緩存數據,并且遞增本地的版本信息

    // 構建新的狀態
    newStatus := versionedPodStatus{
        status:       status,
        version:      cachedStatus.version + 1, // 更新器緩存
        podName:      pod.Name,
        podNamespace: pod.Namespace,
    }
    // 更新新的緩存狀態
    m.podStatuses[pod.UID] = newStatus

    select {
    case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: // 構建一個新的同步請求
        klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
            pod.UID, newStatus.version, newStatus.status)

        return true
    default:
        // Let the periodic syncBatch handle the update if the channel is full.
        // We can't block, since we hold the mutex lock.
        klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
            format.Pod(pod), status)
        return false
    }

3.4 探測狀態更新

# IT明星不是夢 # 圖解kubernetes容器狀態同步
探測狀態其實就是Pod內容器的運行狀態,比如如果設置了Readiness探測,當某個容器探測失敗的時候,就會通知對應的service從后端的enpoint中移除該Pod, 讓我們一起看看Kubelet是如何將運行狀態通知到apiserver端的

3.4.1 獲取當前狀態

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
    m.podStatusesLock.Lock()
    defer m.podStatusesLock.Unlock()

    // 獲取本地的容器
    pod, ok := m.podManager.GetPodByUID(podUID)
    if !ok {
        klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
        return
    }

    // 獲取當前的狀態
    oldStatus, found := m.podStatuses[pod.UID]
    if !found {
        klog.Warningf("Container readiness changed before pod has synced: %q - %q",
            format.Pod(pod), containerID.String())
        return
    }

    // 獲取當前的容器狀態
    containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
    if !ok {
        klog.Warningf("Container readiness changed for unknown container: %q - %q",
            format.Pod(pod), containerID.String())
        return
    }

3.4.2 檢測狀態是否發生改變

    // 檢測前后的就緒狀態是否發生改變
    if containerStatus.Ready == ready {
        klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
            format.Pod(pod), containerID.String())
        return
    }

3.4.3 修改容器的就緒狀態

獲取容器的狀態,修改就緒為當前的狀態

    status := *oldStatus.status.DeepCopy()
    containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    containerStatus.Ready = ready

3.4.4 根據最新的容器狀態修改

會根據當前運行時的容器探測的狀態,來修改對應PodCondition里面的狀態,最后調用內部的更新邏輯

    updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
        conditionIndex := -1
        // 獲取Pod對應的PodCondition狀態
        for i, condition := range status.Conditions {
            if condition.Type == conditionType {
                conditionIndex = i
                break
            }
        }
        // 修改或追加Pod對應的PodCondition狀態
        if conditionIndex != -1 {
            status.Conditions[conditionIndex] = condition
        } else {
            klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
            status.Conditions = append(status.Conditions, condition)
        }
    }
    // 計算Ready狀態
    updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
    // 計算容器Ready狀態
    updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
    m.updateStatusInternal(pod, status, false)

3.5 啟動后臺同步任務

statusManager會啟動一個后臺的線程來進行更新管道里面同步請求的消費

func (m *manager) Start() {
    // 省略非核心代碼
    go wait.Forever(func() {
        select {
        case syncRequest := <-m.podStatusChannel:
            // 獲取最新的狀態信息,更新apiserver
            klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
            m.syncPod(syncRequest.podUID, syncRequest.status)
        case <-syncTicker:
            m.syncBatch()
        }
    }, 0)
}

3.6 同步Pod狀態

# IT明星不是夢 # 圖解kubernetes容器狀態同步

3.6.1 同步條件檢測

同步條件檢測主要是檢測鏡像Pod的版本是否發送變化、Pod當前是否被刪除,如果pod沒有被刪除則返回false,即對一個沒有刪除的Pod我們還是需要繼續更新其狀態的

    if !m.needsUpdate(uid, status) {
        klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
        return
    }

3.6.2 通過apiserver獲取最新Pod數據

如果沒有獲取到Pod信息,則直接進行退出即可

    pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
    if errors.IsNotFound(err) {
        klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
        // 如果Pod已經被刪除了,就直接退出就行
        return
    }
    if err != nil {
        klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
        return
    }

3.6.3 調用Patch接口進行更新

這里面會通過將最小的狀態與之前的狀態來進行merge合并,然后調用kubeClient進行apiserver端狀態的修改

    oldStatus := pod.Status.DeepCopy()
    // 更新服務端的狀態
    newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
    klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
    if err != nil {
        klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
        return
    }

3.6.4 更新本地的Apiserver端的版本信息

    // 當前是最新的狀態
    pod = newPod

    klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
    m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version

3.6.5 檢測刪除Pod

這里主要是最后階段,即Pod對應的資源都已經釋放了,則才最終刪除apiserver端的Pod

// 如果pod的DeletionTimestamp被設置,則對應的Pod需要被刪除
if m.canBeDeleted(pod, status.status) {
        deleteOptions := metav1.NewDeleteOptions(0)

        deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
        //  調用apiserver對Pod進行刪除
        err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
        if err != nil {
            klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
            return
        }
        klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
        m.deletePodStatus(uid)
    }

探活整體的設計大概就是這樣,希望大佬們多多關注,一起交流。
k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

万盛区| 金堂县| 株洲市| 洪洞县| 文登市| 武夷山市| 梨树县| 长岭县| 漳浦县| 河北省| 昭觉县| 洞头县| 芒康县| 扶余县| 客服| 济南市| 庆城县| 甘南县| 义马市| 泸水县| 阿克陶县| 怀宁县| 玉溪市| 武清区| 富裕县| 团风县| 星子县| 呼玛县| 昌图县| 抚顺市| 青田县| 兴安盟| 宣化县| 望谟县| 灵璧县| 靖宇县| 东乡| 松潘县| 三门县| 临夏市| 社旗县|