The website uses cookies to optimize your user experience. Using this website grants us the permission to collect certain information essential to the provision of our services to you, but you may change the cookie settings within your browser any time you wish. Learn more
I agree
Text direction?





  1. 创建Pod
  2. 管理Pod
  3. 上报Node信息
  4. GC



总的来说,kubelet可以通过三种方式创建Pod。分别是StaticPodPathStaticPodURLapiserver,前两个都是用来创建static pod,已这三个为切入点,分析kubelet。


简单来说,不是通过apiserver创建的Pod,都是static pod。应用场景就是kubeadm,除了kubelet以外的组件,都是static pod,因为当时apiserver还没起,所以只能用static pod创建。

创建static pod有两种方式,配置文件和HTTP。详见static pod

staticPodPath is the path to the directory containing local (static) pods to run, or the path to a single static pod file.
staticPodURL is the URL for accessing static pods to run



// PodConfig is a configuration mux that merges many sources of pod configuration into a single
// consistent structure, and then delivers incremental change notifications to listeners
// in order.
type PodConfig struct {
    pods *podStorage
    mux  *config.Mux

    // the channel of denormalized changes passed to listeners
    updates chan kubetypes.PodUpdate

    // contains the list of all configured sources
    sourcesLock       sync.Mutex
    sources           sets.String
    checkpointManager checkpointmanager.CheckpointManager

// PodUpdate defines an operation sent on the channel. You can add or remove single services by
// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required).
// For setting the state of the system to a given state for this source configuration, set
// Pods as desired and Op to SET, which will reset the system state to that specified in this
// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET.
type PodUpdate struct {
    Pods   []*v1.Pod
    Op     PodOperation
    Source string



// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

    // define file config source
    if kubeCfg.StaticPodPath != "" {
        config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))

    // define url config source
    if kubeCfg.StaticPodURL != "" {
        config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
    var updatechannel chan<- interface{}
    if kubeDeps.KubeClient != nil {
        if updatechannel == nil {
            updatechannel = cfg.Channel(kubetypes.ApiserverSource)
        config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
    return cfg, nil



// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
    send := func(objs []interface{}) {
        var pods []*v1.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
    go r.Run(wait.NeverStop)



func (u *UndeltaStore) Add(obj interface{}) error {
    if err := u.Store.Add(obj); err != nil {
        return err
    return nil

func (u *UndeltaStore) Update(obj interface{}) error {
    if err := u.Store.Update(obj); err != nil {
        return err
    return nil

func (u *UndeltaStore) Delete(obj interface{}) error {
    if err := u.Store.Delete(obj); err != nil {
        return err
    return nil



// Channel creates or returns a config source channel.  The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
    return c.mux.Channel(source)

func (m *Mux) Channel(source string) chan interface{} {
    go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
    return newChannel

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
    for update := range listenChannel {
        m.merger.Merge(source, update)

// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel.  Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
    adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    // deliver update notifications
    switch s.mode {
    case PodConfigNotificationIncremental:
        if len(removes.Pods) > 0 {
            s.updates <- *removes
        if len(adds.Pods) > 0 {
            s.updates <- *adds
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        if len(restores.Pods) > 0 {
            s.updates <- *restores

    return nil


// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
    updates := make(chan kubetypes.PodUpdate, 50)
    storage := newPodStorage(updates, mode, recorder)
    podConfig := &PodConfig{
        pods:    storage,
        mux:     config.NewMux(storage),
        updates: updates,
        sources: sets.String{},
    return podConfig


Related Notes
Get a free MyMarkup account to save this article and view it later on any device.
Create account

End User License Agreement

Summary | 3 Annotations
2020/06/21 08:41
2020/06/21 08:42
2020/06/21 08:42