icon-cookie
The website uses cookies to optimize your user experience. Using this website grants us the permission to collect certain information essential to the provision of our services to you, but you may change the cookie settings within your browser any time you wish. Learn more
I agree
blank_error__heading
blank_error__body
Text direction?

深入分析kubelet(1)——获取Pod创建请求

kubelet组件巨复杂,所以将分成几个部分分析。

职责猜想

kubelet主要干以下工作:

  1. 创建Pod
  2. 管理Pod
  3. 上报Node信息
  4. GC

获取请求

创建Pod首先需要获取Pod创建请求,本文主要分析这个模块。与其他K8S组件不同,kubelet代码风格独树一帜,代码复杂度也高了几个档次,我也是经过一天的梳理之后,才找到入口。

总的来说,kubelet可以通过三种方式创建Pod。分别是StaticPodPathStaticPodURLapiserver,前两个都是用来创建static pod,已这三个为切入点,分析kubelet。

StaticPod

简单来说,不是通过apiserver创建的Pod,都是static pod。应用场景就是kubeadm,除了kubelet以外的组件,都是static pod,因为当时apiserver还没起,所以只能用static pod创建。

创建static pod有两种方式,配置文件和HTTP。详见static pod

staticPodPath is the path to the directory containing local (static) pods to run, or the path to a single static pod file.
staticPodURL is the URL for accessing static pods to run

code

k8s.io\kubernetes\pkg\kubelet\config\config.go

// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
    pods *podStorage
    mux  *config.Mux

    // the channel of denormalized changes passed to listeners
    updates chan kubetypes.PodUpdate

    // contains the list of all configured sources
    sourcesLock       sync.Mutex
    sources           sets.String
    checkpointManager checkpointmanager.CheckpointManager
}

// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
type PodUpdate struct {
    Pods   []*v1.Pod
    Op     PodOperation
    Source string
}

PodConfig中,我们需要重点关注podsupdates。其中updates是一个生产者消费者channel,所有的修改都必须通过它派发出去。所以大胆揣测,所有的Pod请求都发给updates,然后会有workers一直处理updates

k8s.io\kubernetes\pkg\kubelet\kubelet.go

// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

    // define file config source
    if kubeCfg.StaticPodPath != "" {
        config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
    }

    // define url config source
    if kubeCfg.StaticPodURL != "" {
        config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
    }
    
    var updatechannel chan<- interface{}
    if kubeDeps.KubeClient != nil {
        if updatechannel == nil {
            updatechannel = cfg.Channel(kubetypes.ApiserverSource)
        }
        config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
    }
    return cfg, nil
}

makePodSourceConfig函数我们可以清晰看到,生产者有三个。这里我们只分析NewSourceApiserver,其他的举一反三就好。

k8s.io\kubernetes\pkg\kubelet\config\apiserver.go

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
    send := func(objs []interface{}) {
        var pods []*v1.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        }
        updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    }
    r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
    go r.Run(wait.NeverStop)
}

这块还比较简单,就是通过listWatchPod,过滤条件是nodeName,这里就和之前scheduler结合起来了,并且调用send函数将Pods发给updates。如果对listWatch不太清楚,可以参考之前的ListAndWatch

k8s.io\client-go\tools\cache\undelta_store.go

func (u *UndeltaStore) Add(obj interface{}) error {
    if err := u.Store.Add(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

func (u *UndeltaStore) Update(obj interface{}) error {
    if err := u.Store.Update(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

func (u *UndeltaStore) Delete(obj interface{}) error {
    if err := u.Store.Delete(obj); err != nil {
        return err
    }
    u.PushFunc(u.Store.List())
    return nil
}

UndeltaStore很有意思,每次有变更,都会push全量数据,这里为什么用,还不太清楚。

从上面源代码看到所有的Pods都会push到updates里面,而这个updates并非PodConfg里面的,这里有必要看一下。

// Channel creates or returns a config source channel.  The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
    c.sources.Insert(source)
    return c.mux.Channel(source)
}

func (m *Mux) Channel(source string) chan interface{} {
    go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
    return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel.  Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
    adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    
    // deliver update notifications
    switch s.mode {
    case PodConfigNotificationIncremental:
        if len(removes.Pods) > 0 {
            s.updates <- *removes
        }
        if len(adds.Pods) > 0 {
            s.updates <- *adds
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }
        if len(restores.Pods) > 0 {
            s.updates <- *restores
        }
    }

    return nil
}

看似简单的Channel方法,其实干了很多事情,它将listWatch的全量数据与PodStorage做比对,得到对应操作的Pod集合。

// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
    updates := make(chan kubetypes.PodUpdate, 50)
    storage := newPodStorage(updates, mode, recorder)
    podConfig := &PodConfig{
        pods:    storage,
        mux:     config.NewMux(storage),
        updates: updates,
        sources: sets.String{},
    }
    return podConfig
}

PodStorageupdates就是PodConfigupdates,至此整个生产者过程终于理清楚了。

Measure
Measure
Related Notes
Get a free MyMarkup account to save this article and view it later on any device.
Create account

End User License Agreement

Summary | 3 Annotations
StaticPodPath
2020/06/21 08:41
StaticPodURL
2020/06/21 08:42
apiserver
2020/06/21 08:42