icon-cookie
The website uses cookies to optimize your user experience. Using this website grants us the permission to collect certain information essential to the provision of our services to you, but you may change the cookie settings within your browser any time you wish. Learn more
I agree
blank_error__heading
blank_error__body
Text direction?

深入分析kubelet(2)——创建Pod

紧接着上一篇继续学习。上一篇讲到生产者,本篇将介绍消费者。

background

PodManager

k8s.io\kubernetes\pkg\kubelet\pod\pod_manager.go

// Manager stores and manages access to pods, maintaining the mappings
// between static pods and mirror pods.
//
// The kubelet discovers pod updates from 3 sources: file, http, and
// apiserver. Pods from non-apiserver sources are called static pods, and API
// server is not aware of the existence of static pods. In order to monitor
// the status of such pods, the kubelet creates a mirror pod for each static
// pod via the API server.
//
// A mirror pod has the same pod full name (name and namespace) as its static
// counterpart (albeit different metadata such as UID, etc). By leveraging the
// fact that the kubelet reports the pod status using the pod full name, the
// status of the mirror pod always reflects the actual status of the static
// pod. When a static pod gets deleted, the associated orphaned mirror pod
// will also be removed.
type Manager interface {
    // GetPods returns the regular pods bound to the kubelet and their spec.
    GetPods() []*v1.Pod
    // GetPodByFullName returns the (non-mirror) pod that matches full name, as well as
    // whether the pod was found.
    GetPodByFullName(podFullName string) (*v1.Pod, bool)
    // GetPodByName provides the (non-mirror) pod that matches namespace and
    // name, as well as whether the pod was found.
    GetPodByName(namespace, name string) (*v1.Pod, bool)
    // GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as
    // whether the pod is found.
    GetPodByUID(types.UID) (*v1.Pod, bool)
    // GetPodByMirrorPod returns the static pod for the given mirror pod and
    // whether it was known to the pod manger.
    GetPodByMirrorPod(*v1.Pod) (*v1.Pod, bool)
    // GetMirrorPodByPod returns the mirror pod for the given static pod and
    // whether it was known to the pod manager.
    GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
    // GetPodsAndMirrorPods returns the both regular and mirror pods.
    GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
    // SetPods replaces the internal pods with the new pods.
    // It is currently only used for testing.
    SetPods(pods []*v1.Pod)
    // AddPod adds the given pod to the manager.
    AddPod(pod *v1.Pod)
    // UpdatePod updates the given pod in the manager.
    UpdatePod(pod *v1.Pod)
    // DeletePod deletes the given pod from the manager.  For mirror pods,
    // this means deleting the mappings related to mirror pods.  For non-
    // mirror pods, this means deleting from indexes for all non-mirror pods.
    DeletePod(pod *v1.Pod)
    // DeleteOrphanedMirrorPods deletes all mirror pods which do not have
    // associated static pods. This method sends deletion requests to the API
    // server, but does NOT modify the internal pod storage in basicManager.
    DeleteOrphanedMirrorPods()
    // TranslatePodUID returns the actual UID of a pod. If the UID belongs to
    // a mirror pod, returns the UID of its static pod. Otherwise, returns the
    // original UID.
    //
    // All public-facing functions should perform this translation for UIDs
    // because user may provide a mirror pod UID, which is not recognized by
    // internal Kubelet functions.
    TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
    // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
    // UIDs and mirror pod UIDs to static pod UIDs.
    GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
    // IsMirrorPodOf returns true if mirrorPod is a correct representation of
    // pod; false otherwise.
    IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool

    MirrorClient
}

上一篇介绍过static pod,与之对应的概念是mirror pod

Pod status

参考k8s.io\api\core\v1\types.go

  • Pending: Pending状态是指api-server已经接受了Pod的创建请求,但是有容器还没有启动。Pending包括Pod未被调度以及拉取镜像阶段。PodPending means the pod has been accepted by the system, but one or more of the containers has not been started. This includes time before being bound to a node, as well as time spent pulling images onto the host.
  • Running: Running是指所有的容器都已经运行过了,其中至少一个容器正在运行或者正在重新运行。PodRunning means the pod has been bound to a node and all of the containers have been started. At least one container is still running or is in the process of being restarted.
  • Succeeded: Succeeded表示所有容器都成功运行结束了,并且K8S不会重新启动这些容器。PodSucceeded means that all containers in the pod have voluntarily terminated with a container exit code of 0, and the system is not going to restart any of these containers.
  • Failed: Failed表示所有容器都不运行了,并且至少一个容器异常退出。PodFailed means that all containers in the pod have terminated, and at least one container has terminated in a failure (exited with a non-zero exit code or was stopped by the system).
  • Unknown: Unknown表示因为默写原因无法获知Pod状态,常见于Node失联。PodUnknown means that for some reason the state of the pod could not be obtained, typically due to an error in communicating with the host of the pod.

PodWorkers

k8s.io\kubernetes\pkg\kubelet\pod_workers.go

// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
    UpdatePod(options *UpdatePodOptions)
    ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
    ForgetWorker(uid types.UID)
}

type podWorkers struct {
    // Protects all per worker fields.
    podLock sync.Mutex

    // Tracks all running per-pod goroutines - per-pod goroutine will be
    // processing updates received through its corresponding channel.
    podUpdates map[types.UID]chan UpdatePodOptions
    // Track the current state of per-pod goroutines.
    // Currently all update request for a given pod coming when another
    // update of this pod is being processed are ignored.
    isWorking map[types.UID]bool
    // Tracks the last undelivered work item for this pod - a work item is
    // undelivered if it comes in while the worker is working.
    lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions

    workQueue queue.WorkQueue

    // This function is run to sync the desired stated of pod.
    // NOTE: This function has to be thread-safe - it can be called for
    // different pods at the same time.
    syncPodFn syncPodFnType

    // The EventRecorder to use
    recorder record.EventRecorder

    // backOffPeriod is the duration to back off when there is a sync error.
    backOffPeriod time.Duration

    // resyncInterval is the duration to wait until the next sync.
    resyncInterval time.Duration

    // podCache stores kubecontainer.PodStatus for all pods.
    podCache kubecontainer.Cache
}

code

k8s.io\kubernetes\pkg\kubelet\kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    kl.syncLoop(updates, kl)
}

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    // The resyncTicker wakes up kubelet to checks if there are any pod workers
    // that need to be sync'd. A one-second period is sufficient because the
    // sync interval is defaulted to 10s.
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    // 2s
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    plegCh := kl.pleg.Watch()
    for {
        kl.syncLoopMonitor.Store(kl.clock.Now())
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

可以看到,updates贯穿整个过程,是一个非常重要的概念,所以上一篇整篇都在分析updates的由来。这里我们重点关注syncLoopIteration

// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1.  configCh:       a channel to read config events from
// 2.  handler:        the SyncHandler to dispatch pods to
// 3.  syncCh:         a channel to read periodic sync events from
// 4.  houseKeepingCh: a channel to read housekeeping events from
// 5.  plegCh:         a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
//             handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
//                     containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            glog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.RESTORE:
            // These are pods restored from the checkpoint. Treat them as new
            // pods.
            handler.HandlePodAdditions(u.Pods)
        }
    }        
    return true
}

本文只关注syncLoopIteration函数的configCh分支,在可预见的未来,将分析其他分支。

这里只是简单地将Pods分给对应的handler处理。

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    for _, pod := range pods {
        // Always add the pod to the pod manager. Kubelet relies on the pod
        // manager as the source of truth for the desired state. If a pod does
        // not exist in the pod manager, it means that it has been deleted in
        // the apiserver and no action (other than cleanup) is required.
        kl.podManager.AddPod(pod)

        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
}

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {},
    })
}

调用podWorkers.UpdatePod执行操作。

k8s.io\kubernetes\pkg\kubelet\pod_workers.go

// Apply the new setting to the specified pod.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    pod := options.Pod
    uid := pod.UID

    if podUpdates, exists = p.podUpdates[uid]; !exists {
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } 
}

给Pod创建一个goroutine,并创建一个channel管理它。

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    for update := range podUpdates {
        err := func() error {  
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            return err
        }()
    }
}

这里其实就是回调了podWorkers.syncPodFn方法。

k8s.io\kubernetes\pkg\kubelet\kubelet.go

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)   
}

500多行的函数,里面就有我们需要的答案klet.syncPod。这是一个很复杂的函数,包括了所有Pod状态的处理过程,我们慢慢拆解。

// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
//   start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
//   already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType
    
    // if we want to kill a pod, do it now!
    if updateType == kubetypes.SyncPodKill {
        return kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride)
    }
    
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    
    // Create Cgroups for the pod and apply resource parameters
    // to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // If pod has already been terminated then we need not create
    // or update the pod's cgroup
    if !kl.podIsTerminated(pod) {
        // Create and Update pod's Cgroups
        pcm.EnsureExists(pod)
    }
    
    // Create Mirror Pod for Static Pod if it doesn't already exist
    if kubepod.IsStaticPod(pod) {
        if mirrorPod == nil || deleted {
            kl.podManager.CreateMirrorPod(pod)
        }
    }
    
    // Make data directories for the pod
    kl.makePodDataDirs(pod)
    
    // Volume manager will not mount volumes for terminated pods
    if !kl.podIsTerminated(pod) {
        // Wait for volumes to attach/mount
        kl.volumeManager.WaitForAttachAndMount(pod)
    }
    
    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)
    
    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
}

创建Pod过程:

  1. 如果需要kill,直接kill

  2. 给Pod创建PodStatus对象

  3. 创建cgroups

  4. 如果是static pod,就创建mirror pod,方便通过apiserver查询 static pod,只能查询,其他操作都不可以

  5. 创建数据目录,比如挂载目录

  6. 挂载目录

  7. 获取ImagePullSecrets

  8. 调用CRI创建Pod

k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_manager.go

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create init containers.
//  6. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    
    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
    }else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil)
        }
    }
    
    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    
    // Step 5: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit)
    }
    
    // Step 6: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]
        m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular)
    }
}

这个函数的注释写得很赞,过程如下:

  1. 比较网络和容器变化
  2. 如果网络有变化,就把之前的容器删掉
  3. 创建容器网络
  4. 启动init容器
  5. 启动容器

启动init容器也是有说法的,全部逻辑都在computePodActions里面,必须先按顺序将init容器全部启动之后,再启动容器,大概过程如下:

  1. 第一次只启动pod.Spec.InitContainers[0]changes.ContainersToStart为空
  2. 之后每次启动下一个init容器
  3. init容器启动完,启动容器

k8s.io\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_container.go

// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
    // Step 1: pull the image.
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)   
    
    // Step 2: create the container.
    m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    
    // Step 3: start the container.
    m.runtimeService.StartContainer(containerID)
    
    legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)
    m.osInterface.Symlink(containerLog, legacySymlink)
}

经过一步步抽丝剥茧,这里终于真相了,

  1. 拉镜像
  2. create 容器,runtimeService其实就是通过grpc调用CRI
  3. start 容器
  4. 给容器日志创建soft link,增加K8S相关信息,这里日志采集的时候就很有用,详见K8S Fluentd Mongo日志采集
Measure
Measure
Related Notes
Get a free MyMarkup account to save this article and view it later on any device.
Create account

End User License Agreement

Summary | 3 Annotations
syncPod
2020/06/21 08:44
创建cgroups
2020/06/21 08:44
kuberuntime_manager.go
2020/06/21 08:45