kubelet: add a generic pod lifecycle event generator
This change introduces pod lifecycle event generator (PLEG), and adds a generic PLEG. The generic PLEG relies on relisting to discover container events, and is container-runtime-agnostic. Both docker and rkt are changed to use generic PLEG.
This commit is contained in:
		@@ -33,6 +33,7 @@ type FakeRuntime struct {
 | 
				
			|||||||
	sync.Mutex
 | 
						sync.Mutex
 | 
				
			||||||
	CalledFunctions   []string
 | 
						CalledFunctions   []string
 | 
				
			||||||
	PodList           []*Pod
 | 
						PodList           []*Pod
 | 
				
			||||||
 | 
						AllPodList        []*Pod
 | 
				
			||||||
	ImageList         []Image
 | 
						ImageList         []Image
 | 
				
			||||||
	PodStatus         api.PodStatus
 | 
						PodStatus         api.PodStatus
 | 
				
			||||||
	StartedPods       []string
 | 
						StartedPods       []string
 | 
				
			||||||
@@ -89,6 +90,7 @@ func (f *FakeRuntime) ClearCalls() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	f.CalledFunctions = []string{}
 | 
						f.CalledFunctions = []string{}
 | 
				
			||||||
	f.PodList = []*Pod{}
 | 
						f.PodList = []*Pod{}
 | 
				
			||||||
 | 
						f.AllPodList = []*Pod{}
 | 
				
			||||||
	f.PodStatus = api.PodStatus{}
 | 
						f.PodStatus = api.PodStatus{}
 | 
				
			||||||
	f.StartedPods = []string{}
 | 
						f.StartedPods = []string{}
 | 
				
			||||||
	f.KilledPods = []string{}
 | 
						f.KilledPods = []string{}
 | 
				
			||||||
@@ -155,6 +157,9 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
 | 
				
			|||||||
	defer f.Unlock()
 | 
						defer f.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	f.CalledFunctions = append(f.CalledFunctions, "GetPods")
 | 
						f.CalledFunctions = append(f.CalledFunctions, "GetPods")
 | 
				
			||||||
 | 
						if all {
 | 
				
			||||||
 | 
							return f.AllPodList, f.Err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return f.PodList, f.Err
 | 
						return f.PodList, f.Err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -84,7 +84,7 @@ func TestDetectImagesInitialDetect(t *testing.T) {
 | 
				
			|||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(1),
 | 
									makeContainer(1),
 | 
				
			||||||
@@ -114,7 +114,7 @@ func TestDetectImagesWithNewImage(t *testing.T) {
 | 
				
			|||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(1),
 | 
									makeContainer(1),
 | 
				
			||||||
@@ -159,7 +159,7 @@ func TestDetectImagesContainerStopped(t *testing.T) {
 | 
				
			|||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(1),
 | 
									makeContainer(1),
 | 
				
			||||||
@@ -175,7 +175,7 @@ func TestDetectImagesContainerStopped(t *testing.T) {
 | 
				
			|||||||
	require.True(t, ok)
 | 
						require.True(t, ok)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Simulate container being stopped.
 | 
						// Simulate container being stopped.
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{}
 | 
						fakeRuntime.AllPodList = []*container.Pod{}
 | 
				
			||||||
	err = manager.detectImages(time.Now())
 | 
						err = manager.detectImages(time.Now())
 | 
				
			||||||
	require.NoError(t, err)
 | 
						require.NoError(t, err)
 | 
				
			||||||
	assert.Equal(manager.imageRecordsLen(), 2)
 | 
						assert.Equal(manager.imageRecordsLen(), 2)
 | 
				
			||||||
@@ -195,7 +195,7 @@ func TestDetectImagesWithRemovedImages(t *testing.T) {
 | 
				
			|||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(1),
 | 
									makeContainer(1),
 | 
				
			||||||
@@ -221,7 +221,7 @@ func TestFreeSpaceImagesInUseContainersAreIgnored(t *testing.T) {
 | 
				
			|||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(1),
 | 
									makeContainer(1),
 | 
				
			||||||
@@ -242,7 +242,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
 | 
				
			|||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(0),
 | 
									makeContainer(0),
 | 
				
			||||||
@@ -253,7 +253,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Make 1 be more recently used than 0.
 | 
						// Make 1 be more recently used than 0.
 | 
				
			||||||
	require.NoError(t, manager.detectImages(zero))
 | 
						require.NoError(t, manager.detectImages(zero))
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(1),
 | 
									makeContainer(1),
 | 
				
			||||||
@@ -261,7 +261,7 @@ func TestFreeSpaceRemoveByLeastRecentlyUsed(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	require.NoError(t, manager.detectImages(time.Now()))
 | 
						require.NoError(t, manager.detectImages(time.Now()))
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{},
 | 
								Containers: []*container.Container{},
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
@@ -281,7 +281,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
 | 
				
			|||||||
	fakeRuntime.ImageList = []container.Image{
 | 
						fakeRuntime.ImageList = []container.Image{
 | 
				
			||||||
		makeImage(0, 1024),
 | 
							makeImage(0, 1024),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				makeContainer(0),
 | 
									makeContainer(0),
 | 
				
			||||||
@@ -296,7 +296,7 @@ func TestFreeSpaceTiesBrokenByDetectedTime(t *testing.T) {
 | 
				
			|||||||
		makeImage(1, 2048),
 | 
							makeImage(1, 2048),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	require.NoError(t, manager.detectImages(time.Now()))
 | 
						require.NoError(t, manager.detectImages(time.Now()))
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{}
 | 
						fakeRuntime.AllPodList = []*container.Pod{}
 | 
				
			||||||
	require.NoError(t, manager.detectImages(time.Now()))
 | 
						require.NoError(t, manager.detectImages(time.Now()))
 | 
				
			||||||
	require.Equal(t, manager.imageRecordsLen(), 2)
 | 
						require.Equal(t, manager.imageRecordsLen(), 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -317,7 +317,7 @@ func TestFreeSpaceImagesAlsoDoesLookupByRepoTags(t *testing.T) {
 | 
				
			|||||||
			Size: 2048,
 | 
								Size: 2048,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	fakeRuntime.PodList = []*container.Pod{
 | 
						fakeRuntime.AllPodList = []*container.Pod{
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Containers: []*container.Container{
 | 
								Containers: []*container.Container{
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -55,6 +55,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/kubelet/envvars"
 | 
						"k8s.io/kubernetes/pkg/kubelet/envvars"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/metrics"
 | 
						"k8s.io/kubernetes/pkg/kubelet/metrics"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/network"
 | 
						"k8s.io/kubernetes/pkg/kubelet/network"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/kubelet/pleg"
 | 
				
			||||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
						kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/prober"
 | 
						"k8s.io/kubernetes/pkg/kubelet/prober"
 | 
				
			||||||
	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
						proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
				
			||||||
@@ -111,6 +112,15 @@ const (
 | 
				
			|||||||
	housekeepingPeriod = time.Second * 2
 | 
						housekeepingPeriod = time.Second * 2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	etcHostsPath = "/etc/hosts"
 | 
						etcHostsPath = "/etc/hosts"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Capacity of the channel for recieving pod lifecycle events. This number
 | 
				
			||||||
 | 
						// is a bit arbitrary and may be adjusted in the future.
 | 
				
			||||||
 | 
						plegChannelCapacity = 1000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Relisting is used to discover missing container events.
 | 
				
			||||||
 | 
						// Use a shorter period because generic PLEG relies on relisting for
 | 
				
			||||||
 | 
						// container events.
 | 
				
			||||||
 | 
						plegRelistPeriod = time.Second * 3
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
@@ -351,6 +361,7 @@ func NewMainKubelet(
 | 
				
			|||||||
			serializeImagePulls,
 | 
								serializeImagePulls,
 | 
				
			||||||
		)
 | 
							)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
 | 
				
			||||||
	case "rkt":
 | 
						case "rkt":
 | 
				
			||||||
		conf := &rkt.Config{
 | 
							conf := &rkt.Config{
 | 
				
			||||||
			Path:               rktPath,
 | 
								Path:               rktPath,
 | 
				
			||||||
@@ -372,6 +383,7 @@ func NewMainKubelet(
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		klet.containerRuntime = rktRuntime
 | 
							klet.containerRuntime = rktRuntime
 | 
				
			||||||
		klet.imageManager = rkt.NewImageManager(rktRuntime)
 | 
							klet.imageManager = rkt.NewImageManager(rktRuntime)
 | 
				
			||||||
 | 
							klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// No Docker daemon to put in a container.
 | 
							// No Docker daemon to put in a container.
 | 
				
			||||||
		dockerDaemonContainer = ""
 | 
							dockerDaemonContainer = ""
 | 
				
			||||||
@@ -558,6 +570,9 @@ type Kubelet struct {
 | 
				
			|||||||
	//    as it takes time to gather all necessary node information.
 | 
						//    as it takes time to gather all necessary node information.
 | 
				
			||||||
	nodeStatusUpdateFrequency time.Duration
 | 
						nodeStatusUpdateFrequency time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Generates pod events.
 | 
				
			||||||
 | 
						pleg pleg.PodLifecycleEventGenerator
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// The name of the resource-only container to run the Kubelet in (empty for no container).
 | 
						// The name of the resource-only container to run the Kubelet in (empty for no container).
 | 
				
			||||||
	// Name must be absolute.
 | 
						// Name must be absolute.
 | 
				
			||||||
	resourceContainer string
 | 
						resourceContainer string
 | 
				
			||||||
@@ -871,6 +886,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Run the system oom watcher forever.
 | 
						// Run the system oom watcher forever.
 | 
				
			||||||
	kl.statusManager.Start()
 | 
						kl.statusManager.Start()
 | 
				
			||||||
 | 
						// Start the pod lifecycle event generator.
 | 
				
			||||||
 | 
						kl.pleg.Start()
 | 
				
			||||||
	kl.syncLoop(updates, kl)
 | 
						kl.syncLoop(updates, kl)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -2124,20 +2141,21 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
 | 
				
			|||||||
	// sync interval is defaulted to 10s.
 | 
						// sync interval is defaulted to 10s.
 | 
				
			||||||
	syncTicker := time.NewTicker(time.Second)
 | 
						syncTicker := time.NewTicker(time.Second)
 | 
				
			||||||
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
 | 
						housekeepingTicker := time.NewTicker(housekeepingPeriod)
 | 
				
			||||||
 | 
						plegCh := kl.pleg.Watch()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		if rs := kl.runtimeState.errors(); len(rs) != 0 {
 | 
							if rs := kl.runtimeState.errors(); len(rs) != 0 {
 | 
				
			||||||
			glog.Infof("skipping pod synchronization - %v", rs)
 | 
								glog.Infof("skipping pod synchronization - %v", rs)
 | 
				
			||||||
			time.Sleep(5 * time.Second)
 | 
								time.Sleep(5 * time.Second)
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C) {
 | 
							if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
 | 
				
			||||||
			break
 | 
								break
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler,
 | 
					func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler SyncHandler,
 | 
				
			||||||
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time) bool {
 | 
						syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
 | 
				
			||||||
	kl.syncLoopMonitor.Store(time.Now())
 | 
						kl.syncLoopMonitor.Store(time.Now())
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case u, open := <-updates:
 | 
						case u, open := <-updates:
 | 
				
			||||||
@@ -2146,6 +2164,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
 | 
				
			|||||||
			return false
 | 
								return false
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		kl.addSource(u.Source)
 | 
							kl.addSource(u.Source)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		switch u.Op {
 | 
							switch u.Op {
 | 
				
			||||||
		case kubetypes.ADD:
 | 
							case kubetypes.ADD:
 | 
				
			||||||
			glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, kubeletutil.FormatPodNames(u.Pods))
 | 
								glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, kubeletutil.FormatPodNames(u.Pods))
 | 
				
			||||||
@@ -2160,6 +2179,25 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
 | 
				
			|||||||
			// TODO: Do we want to support this?
 | 
								// TODO: Do we want to support this?
 | 
				
			||||||
			glog.Errorf("Kubelet does not support snapshot update")
 | 
								glog.Errorf("Kubelet does not support snapshot update")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
						case e := <-plegCh:
 | 
				
			||||||
 | 
							// Filter out started events since we don't use them now.
 | 
				
			||||||
 | 
							if e.Type == pleg.ContainerStarted {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							pod, ok := kl.podManager.GetPodByUID(e.ID)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								// If the pod no longer exists, ignore the event.
 | 
				
			||||||
 | 
								glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", kubeletutil.FormatPodName(pod), e)
 | 
				
			||||||
 | 
							// Force the container runtime cache to update.
 | 
				
			||||||
 | 
							if err := kl.runtimeCache.ForceUpdateIfOlder(time.Now()); err != nil {
 | 
				
			||||||
 | 
								glog.Errorf("SyncLoop: unable to update runtime cache")
 | 
				
			||||||
 | 
								// TODO (yujuhong): should we delay the sync until container
 | 
				
			||||||
 | 
								// runtime can be updated?
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							handler.HandlePodSyncs([]*api.Pod{pod})
 | 
				
			||||||
	case <-syncCh:
 | 
						case <-syncCh:
 | 
				
			||||||
		podsToSync := kl.getPodsToSync()
 | 
							podsToSync := kl.getPodsToSync()
 | 
				
			||||||
		if len(podsToSync) == 0 {
 | 
							if len(podsToSync) == 0 {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -46,6 +46,7 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/pkg/kubelet/container"
 | 
						"k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
						kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/network"
 | 
						"k8s.io/kubernetes/pkg/kubelet/network"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/kubelet/pleg"
 | 
				
			||||||
	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
						kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/prober"
 | 
						"k8s.io/kubernetes/pkg/kubelet/prober"
 | 
				
			||||||
	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
						proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
				
			||||||
@@ -147,6 +148,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
 | 
				
			|||||||
	kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
 | 
						kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
 | 
				
			||||||
	kubelet.resyncInterval = 10 * time.Second
 | 
						kubelet.resyncInterval = 10 * time.Second
 | 
				
			||||||
	kubelet.workQueue = queue.NewBasicWorkQueue()
 | 
						kubelet.workQueue = queue.NewBasicWorkQueue()
 | 
				
			||||||
 | 
						// Relist period does not affect the tests.
 | 
				
			||||||
 | 
						kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour)
 | 
				
			||||||
	return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
 | 
						return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -338,15 +341,16 @@ func TestSyncLoopTimeUpdate(t *testing.T) {
 | 
				
			|||||||
	// Start sync ticker.
 | 
						// Start sync ticker.
 | 
				
			||||||
	syncCh := make(chan time.Time, 1)
 | 
						syncCh := make(chan time.Time, 1)
 | 
				
			||||||
	housekeepingCh := make(chan time.Time, 1)
 | 
						housekeepingCh := make(chan time.Time, 1)
 | 
				
			||||||
 | 
						plegCh := make(chan *pleg.PodLifecycleEvent)
 | 
				
			||||||
	syncCh <- time.Now()
 | 
						syncCh <- time.Now()
 | 
				
			||||||
	kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh)
 | 
						kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh)
 | 
				
			||||||
	loopTime2 := kubelet.LatestLoopEntryTime()
 | 
						loopTime2 := kubelet.LatestLoopEntryTime()
 | 
				
			||||||
	if loopTime2.IsZero() {
 | 
						if loopTime2.IsZero() {
 | 
				
			||||||
		t.Errorf("Unexpected sync loop time: 0, expected non-zero value.")
 | 
							t.Errorf("Unexpected sync loop time: 0, expected non-zero value.")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	syncCh <- time.Now()
 | 
						syncCh <- time.Now()
 | 
				
			||||||
	kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh)
 | 
						kubelet.syncLoopIteration(make(chan kubetypes.PodUpdate), kubelet, syncCh, housekeepingCh, plegCh)
 | 
				
			||||||
	loopTime3 := kubelet.LatestLoopEntryTime()
 | 
						loopTime3 := kubelet.LatestLoopEntryTime()
 | 
				
			||||||
	if !loopTime3.After(loopTime1) {
 | 
						if !loopTime3.After(loopTime1) {
 | 
				
			||||||
		t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp")
 | 
							t.Errorf("Sync Loop Time was not updated correctly. Second update timestamp should be greater than first update timestamp")
 | 
				
			||||||
@@ -366,7 +370,7 @@ func TestSyncLoopAbort(t *testing.T) {
 | 
				
			|||||||
	close(ch)
 | 
						close(ch)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// sanity check (also prevent this test from hanging in the next step)
 | 
						// sanity check (also prevent this test from hanging in the next step)
 | 
				
			||||||
	ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time))
 | 
						ok := kubelet.syncLoopIteration(ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1))
 | 
				
			||||||
	if ok {
 | 
						if ok {
 | 
				
			||||||
		t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed")
 | 
							t.Fatalf("expected syncLoopIteration to return !ok since update chan was closed")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										19
									
								
								pkg/kubelet/pleg/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								pkg/kubelet/pleg/doc.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,19 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Package pleg contains types and a generic implementation of the pod
 | 
				
			||||||
 | 
					// lifecycle event generator.
 | 
				
			||||||
 | 
					package pleg
 | 
				
			||||||
							
								
								
									
										141
									
								
								pkg/kubelet/pleg/generic.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										141
									
								
								pkg/kubelet/pleg/generic.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,141 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package pleg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/golang/glog"
 | 
				
			||||||
 | 
						kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/types"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GenericPLEG is an extremely simple generic PLEG that relies solely on
 | 
				
			||||||
 | 
					// periodic listing to discover container changes. It should be be used
 | 
				
			||||||
 | 
					// as temporary replacement for container runtimes do not support a proper
 | 
				
			||||||
 | 
					// event generator yet.
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Note that GenericPLEG assumes that a container would not be created,
 | 
				
			||||||
 | 
					// terminated, and garbage collected within one relist period. If such an
 | 
				
			||||||
 | 
					// incident happens, GenenricPLEG would miss all events regarding this
 | 
				
			||||||
 | 
					// container. In the case of relisting failure, the window may become longer.
 | 
				
			||||||
 | 
					// Note that this assumption is not unique -- many kubelet internal components
 | 
				
			||||||
 | 
					// rely on terminated containers as tombstones for bookkeeping purposes. The
 | 
				
			||||||
 | 
					// garbage collector is implemented to work with such situtations. However, to
 | 
				
			||||||
 | 
					// guarantee that kubelet can handle missing container events, it is
 | 
				
			||||||
 | 
					// recommended to set the relist period short and have an auxiliary, longer
 | 
				
			||||||
 | 
					// periodic sync in kubelet as the safety net.
 | 
				
			||||||
 | 
					type GenericPLEG struct {
 | 
				
			||||||
 | 
						// The period for relisting.
 | 
				
			||||||
 | 
						relistPeriod time.Duration
 | 
				
			||||||
 | 
						// The container runtime.
 | 
				
			||||||
 | 
						runtime kubecontainer.Runtime
 | 
				
			||||||
 | 
						// The channel from which the subscriber listens events.
 | 
				
			||||||
 | 
						eventChannel chan *PodLifecycleEvent
 | 
				
			||||||
 | 
						// The internal cache for container information.
 | 
				
			||||||
 | 
						containers map[string]containerInfo
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type containerInfo struct {
 | 
				
			||||||
 | 
						podID  types.UID
 | 
				
			||||||
 | 
						status kubecontainer.ContainerStatus
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
 | 
				
			||||||
 | 
						relistPeriod time.Duration) PodLifecycleEventGenerator {
 | 
				
			||||||
 | 
						return &GenericPLEG{
 | 
				
			||||||
 | 
							relistPeriod: relistPeriod,
 | 
				
			||||||
 | 
							runtime:      runtime,
 | 
				
			||||||
 | 
							eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
 | 
				
			||||||
 | 
							containers:   make(map[string]containerInfo),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Returns a channel from which the subscriber can recieve PodLifecycleEvent
 | 
				
			||||||
 | 
					// events.
 | 
				
			||||||
 | 
					// TODO: support multiple subscribers.
 | 
				
			||||||
 | 
					func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
 | 
				
			||||||
 | 
						return g.eventChannel
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Start spawns a goroutine to relist periodically.
 | 
				
			||||||
 | 
					func (g *GenericPLEG) Start() {
 | 
				
			||||||
 | 
						go util.Until(g.relist, g.relistPeriod, util.NeverStop)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func generateEvent(podID types.UID, cid string, oldStatus, newStatus kubecontainer.ContainerStatus) *PodLifecycleEvent {
 | 
				
			||||||
 | 
						if newStatus == oldStatus {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						switch newStatus {
 | 
				
			||||||
 | 
						case kubecontainer.ContainerStatusRunning:
 | 
				
			||||||
 | 
							return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid}
 | 
				
			||||||
 | 
						case kubecontainer.ContainerStatusExited:
 | 
				
			||||||
 | 
							return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
 | 
				
			||||||
 | 
						case kubecontainer.ContainerStatusUnknown:
 | 
				
			||||||
 | 
							// Don't generate any event if the status is unknown.
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							panic(fmt.Sprintf("unrecognized container status: %v", newStatus))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// relist queries the container runtime for list of pods/containers, compare
 | 
				
			||||||
 | 
					// with the internal pods/containers, and generats events accordingly.
 | 
				
			||||||
 | 
					func (g *GenericPLEG) relist() {
 | 
				
			||||||
 | 
						glog.V(5).Infof("GenericPLEG: Relisting")
 | 
				
			||||||
 | 
						// Get all the pods.
 | 
				
			||||||
 | 
						pods, err := g.runtime.GetPods(true)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						events := []*PodLifecycleEvent{}
 | 
				
			||||||
 | 
						containers := make(map[string]containerInfo, len(g.containers))
 | 
				
			||||||
 | 
						// Create a new containers map, compares container statuses, and generates
 | 
				
			||||||
 | 
						// correspoinding events.
 | 
				
			||||||
 | 
						for _, p := range pods {
 | 
				
			||||||
 | 
							for _, c := range p.Containers {
 | 
				
			||||||
 | 
								cid := c.ID.ID
 | 
				
			||||||
 | 
								// Get the of existing container info. Defaults to status unknown.
 | 
				
			||||||
 | 
								oldStatus := kubecontainer.ContainerStatusUnknown
 | 
				
			||||||
 | 
								if info, ok := g.containers[cid]; ok {
 | 
				
			||||||
 | 
									oldStatus = info.status
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// Generate an event if required.
 | 
				
			||||||
 | 
								glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldStatus, c.Status)
 | 
				
			||||||
 | 
								if e := generateEvent(p.ID, cid, oldStatus, c.Status); e != nil {
 | 
				
			||||||
 | 
									events = append(events, e)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// Write to the new cache.
 | 
				
			||||||
 | 
								containers[cid] = containerInfo{podID: p.ID, status: c.Status}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Swap the container info cache. This is purely to avoid the need of
 | 
				
			||||||
 | 
						// garbage collection.
 | 
				
			||||||
 | 
						g.containers = containers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Send out the events.
 | 
				
			||||||
 | 
						for i := range events {
 | 
				
			||||||
 | 
							g.eventChannel <- events[i]
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										148
									
								
								pkg/kubelet/pleg/generic_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										148
									
								
								pkg/kubelet/pleg/generic_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,148 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package pleg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
 | 
						"sort"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						testContainerRuntimeType = "fooRuntime"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type TestGenericPLEG struct {
 | 
				
			||||||
 | 
						pleg    *GenericPLEG
 | 
				
			||||||
 | 
						runtime *kubecontainer.FakeRuntime
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newTestGenericPLEG() *TestGenericPLEG {
 | 
				
			||||||
 | 
						fakeRuntime := &kubecontainer.FakeRuntime{}
 | 
				
			||||||
 | 
						// The channel capacity should be large enough to hold all events in a
 | 
				
			||||||
 | 
						// single test.
 | 
				
			||||||
 | 
						pleg := &GenericPLEG{
 | 
				
			||||||
 | 
							relistPeriod: time.Hour,
 | 
				
			||||||
 | 
							runtime:      fakeRuntime,
 | 
				
			||||||
 | 
							eventChannel: make(chan *PodLifecycleEvent, 100),
 | 
				
			||||||
 | 
							containers:   make(map[string]containerInfo),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
 | 
				
			||||||
 | 
						events := []*PodLifecycleEvent{}
 | 
				
			||||||
 | 
						for len(ch) > 0 {
 | 
				
			||||||
 | 
							e := <-ch
 | 
				
			||||||
 | 
							events = append(events, e)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return events
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func createTestContainer(ID string, status kubecontainer.ContainerStatus) *kubecontainer.Container {
 | 
				
			||||||
 | 
						return &kubecontainer.Container{
 | 
				
			||||||
 | 
							ID:     kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID},
 | 
				
			||||||
 | 
							Status: status,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type sortableEvents []*PodLifecycleEvent
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (a sortableEvents) Len() int      { return len(a) }
 | 
				
			||||||
 | 
					func (a sortableEvents) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
 | 
				
			||||||
 | 
					func (a sortableEvents) Less(i, j int) bool {
 | 
				
			||||||
 | 
						if a[i].ID != a[j].ID {
 | 
				
			||||||
 | 
							return a[i].ID < a[j].ID
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return a[i].Data.(string) < a[j].Data.(string)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) {
 | 
				
			||||||
 | 
						sort.Sort(sortableEvents(expected))
 | 
				
			||||||
 | 
						sort.Sort(sortableEvents(actual))
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(expected, actual) {
 | 
				
			||||||
 | 
							t.Errorf("Actual events differ from the expected; diff: %v", util.ObjectDiff(expected, actual))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestRelisting(t *testing.T) {
 | 
				
			||||||
 | 
						testPleg := newTestGenericPLEG()
 | 
				
			||||||
 | 
						pleg, runtime := testPleg.pleg, testPleg.runtime
 | 
				
			||||||
 | 
						ch := pleg.Watch()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// The first relist should send a PodSync event to each pod.
 | 
				
			||||||
 | 
						runtime.AllPodList = []*kubecontainer.Pod{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								ID: "1234",
 | 
				
			||||||
 | 
								Containers: []*kubecontainer.Container{
 | 
				
			||||||
 | 
									createTestContainer("c1", kubecontainer.ContainerStatusExited),
 | 
				
			||||||
 | 
									createTestContainer("c2", kubecontainer.ContainerStatusRunning),
 | 
				
			||||||
 | 
									createTestContainer("c3", kubecontainer.ContainerStatusUnknown),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								ID: "4567",
 | 
				
			||||||
 | 
								Containers: []*kubecontainer.Container{
 | 
				
			||||||
 | 
									createTestContainer("c1", kubecontainer.ContainerStatusExited),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pleg.relist()
 | 
				
			||||||
 | 
						// Report every running/exited container if we see them for the first time.
 | 
				
			||||||
 | 
						expected := []*PodLifecycleEvent{
 | 
				
			||||||
 | 
							{ID: "1234", Type: ContainerStarted, Data: "c2"},
 | 
				
			||||||
 | 
							{ID: "4567", Type: ContainerDied, Data: "c1"},
 | 
				
			||||||
 | 
							{ID: "1234", Type: ContainerDied, Data: "c1"},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						actual := getEventsFromChannel(ch)
 | 
				
			||||||
 | 
						verifyEvents(t, expected, actual)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// The second relist should not send out any event because no container
 | 
				
			||||||
 | 
						// changed.
 | 
				
			||||||
 | 
						pleg.relist()
 | 
				
			||||||
 | 
						verifyEvents(t, expected, actual)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						runtime.AllPodList = []*kubecontainer.Pod{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								ID: "1234",
 | 
				
			||||||
 | 
								Containers: []*kubecontainer.Container{
 | 
				
			||||||
 | 
									createTestContainer("c2", kubecontainer.ContainerStatusExited),
 | 
				
			||||||
 | 
									createTestContainer("c3", kubecontainer.ContainerStatusRunning),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								ID: "4567",
 | 
				
			||||||
 | 
								Containers: []*kubecontainer.Container{
 | 
				
			||||||
 | 
									createTestContainer("c4", kubecontainer.ContainerStatusRunning),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pleg.relist()
 | 
				
			||||||
 | 
						// Only report containers that transitioned to running or exited status.
 | 
				
			||||||
 | 
						expected = []*PodLifecycleEvent{
 | 
				
			||||||
 | 
							{ID: "1234", Type: ContainerDied, Data: "c2"},
 | 
				
			||||||
 | 
							{ID: "1234", Type: ContainerStarted, Data: "c3"},
 | 
				
			||||||
 | 
							{ID: "4567", Type: ContainerStarted, Data: "c4"},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						actual = getEventsFromChannel(ch)
 | 
				
			||||||
 | 
						verifyEvents(t, expected, actual)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										50
									
								
								pkg/kubelet/pleg/pleg.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								pkg/kubelet/pleg/pleg.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,50 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package pleg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/types"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type PodLifeCycleEventType string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						ContainerStarted      PodLifeCycleEventType = "ContainerStarted"
 | 
				
			||||||
 | 
						ContainerDied         PodLifeCycleEventType = "ContainerDied"
 | 
				
			||||||
 | 
						NetworkSetupCompleted PodLifeCycleEventType = "NetworkSetupCompleted"
 | 
				
			||||||
 | 
						NetworkFailed         PodLifeCycleEventType = "NetworkFailed"
 | 
				
			||||||
 | 
						// PodSync is used to trigger syncing of a pod when the observed change of
 | 
				
			||||||
 | 
						// the state of the pod cannot be captured by any single event above.
 | 
				
			||||||
 | 
						PodSync PodLifeCycleEventType = "PodSync"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// PodLifecycleEvent is an event that reflects the change of the pod state.
 | 
				
			||||||
 | 
					type PodLifecycleEvent struct {
 | 
				
			||||||
 | 
						// The pod ID.
 | 
				
			||||||
 | 
						ID types.UID
 | 
				
			||||||
 | 
						// The type of the event.
 | 
				
			||||||
 | 
						Type PodLifeCycleEventType
 | 
				
			||||||
 | 
						// The accompanied data which varies based on the event type.
 | 
				
			||||||
 | 
						//   - ContainerStarted/ContainerStopped: the container name (string).
 | 
				
			||||||
 | 
						//   - All other event types: unused.
 | 
				
			||||||
 | 
						Data interface{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type PodLifecycleEventGenerator interface {
 | 
				
			||||||
 | 
						Start()
 | 
				
			||||||
 | 
						Watch() chan *PodLifecycleEvent
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -171,15 +171,8 @@ func (pm *basicManager) getAllPods() []*api.Pod {
 | 
				
			|||||||
	return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
 | 
						return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
 | 
					// GetPodByUID provides the (non-mirror) pod that matches pod UID, as well as
 | 
				
			||||||
// as well as whether the pod was found.
 | 
					// whether the pod is found.
 | 
				
			||||||
func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
 | 
					 | 
				
			||||||
	podFullName := kubecontainer.BuildPodFullName(name, namespace)
 | 
					 | 
				
			||||||
	return pm.GetPodByFullName(podFullName)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetPodByUID provides the (non-mirror) pod that matches pod UID as well as
 | 
					 | 
				
			||||||
// whether the pod was found.
 | 
					 | 
				
			||||||
func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
 | 
					func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
 | 
				
			||||||
	pm.lock.RLock()
 | 
						pm.lock.RLock()
 | 
				
			||||||
	defer pm.lock.RUnlock()
 | 
						defer pm.lock.RUnlock()
 | 
				
			||||||
@@ -187,6 +180,13 @@ func (pm *basicManager) GetPodByUID(uid types.UID) (*api.Pod, bool) {
 | 
				
			|||||||
	return pod, ok
 | 
						return pod, ok
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetPodByName provides the (non-mirror) pod that matches namespace and name,
 | 
				
			||||||
 | 
					// as well as whether the pod was found.
 | 
				
			||||||
 | 
					func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
 | 
				
			||||||
 | 
						podFullName := kubecontainer.BuildPodFullName(name, namespace)
 | 
				
			||||||
 | 
						return pm.GetPodByFullName(podFullName)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetPodByName returns the (non-mirror) pod that matches full name, as well as
 | 
					// GetPodByName returns the (non-mirror) pod that matches full name, as well as
 | 
				
			||||||
// whether the pod was found.
 | 
					// whether the pod was found.
 | 
				
			||||||
func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
 | 
					func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user