Merge pull request #4867 from ixdy/rollbacks
Revert "Adding sync pod latency metric." and "Thread-per-pod model in Kubelet"
This commit is contained in:
		| @@ -61,10 +61,7 @@ const podOomScoreAdj = -100 | ||||
|  | ||||
| // SyncHandler is an interface implemented by Kubelet, for testability | ||||
| type SyncHandler interface { | ||||
| 	// Syncs current state to match the specified pods. SyncPodType specified what | ||||
| 	// type of sync is occuring per pod. StartTime specifies the time at which | ||||
| 	// syncing began (for use in monitoring). | ||||
| 	SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error | ||||
| 	SyncPods([]api.BoundPod) error | ||||
| } | ||||
|  | ||||
| type SourceReadyFn func(source string) bool | ||||
| @@ -114,6 +111,7 @@ func NewMainKubelet( | ||||
| 		rootDirectory:                  rootDirectory, | ||||
| 		resyncInterval:                 resyncInterval, | ||||
| 		podInfraContainerImage:         podInfraContainerImage, | ||||
| 		podWorkers:                     newPodWorkers(), | ||||
| 		dockerIDToRef:                  map[dockertools.DockerID]*api.ObjectReference{}, | ||||
| 		runner:                         dockertools.NewDockerContainerCommandRunner(dockerClient), | ||||
| 		httpClient:                     &http.Client{}, | ||||
| @@ -136,7 +134,6 @@ func NewMainKubelet( | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	klet.dockerCache = dockerCache | ||||
| 	klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod) | ||||
|  | ||||
| 	metrics.Register(dockerCache) | ||||
|  | ||||
| @@ -456,6 +453,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { | ||||
| 	kl.syncLoop(updates, kl) | ||||
| } | ||||
|  | ||||
| // Per-pod workers. | ||||
| type podWorkers struct { | ||||
| 	lock sync.Mutex | ||||
|  | ||||
| 	// Set of pods with existing workers. | ||||
| 	workers util.StringSet | ||||
| } | ||||
|  | ||||
| func newPodWorkers() *podWorkers { | ||||
| 	return &podWorkers{ | ||||
| 		workers: util.NewStringSet(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Runs a worker for "podFullName" asynchronously with the specified "action". | ||||
| // If the worker for the "podFullName" is already running, functions as a no-op. | ||||
| func (self *podWorkers) Run(podFullName string, action func()) { | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
|  | ||||
| 	// This worker is already running, let it finish. | ||||
| 	if self.workers.Has(podFullName) { | ||||
| 		return | ||||
| 	} | ||||
| 	self.workers.Insert(podFullName) | ||||
|  | ||||
| 	// Run worker async. | ||||
| 	go func() { | ||||
| 		defer util.HandleCrash() | ||||
| 		action() | ||||
|  | ||||
| 		self.lock.Lock() | ||||
| 		defer self.lock.Unlock() | ||||
| 		self.workers.Delete(podFullName) | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { | ||||
| 	binds := []string{} | ||||
| 	for _, mount := range container.VolumeMounts { | ||||
| @@ -945,7 +979,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke | ||||
| func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { | ||||
| 	start := time.Now() | ||||
| 	defer func() { | ||||
| 		metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start)) | ||||
| 		metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())) | ||||
| 	}() | ||||
|  | ||||
| 	if err := kl.dockerPuller.Pull(img); err != nil { | ||||
| @@ -1273,7 +1307,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker | ||||
| } | ||||
|  | ||||
| // SyncPods synchronizes the configured list of pods (desired state) with the host current state. | ||||
| func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error { | ||||
| func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { | ||||
| 	glog.V(4).Infof("Desired: %#v", pods) | ||||
| 	var err error | ||||
| 	desiredContainers := make(map[podContainer]empty) | ||||
| @@ -1299,14 +1333,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metr | ||||
| 		} | ||||
|  | ||||
| 		// Run the sync in an async manifest worker. | ||||
| 		kl.podWorkers.UpdatePod(pod, func() { | ||||
| 			metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) | ||||
| 		kl.podWorkers.Run(podFullName, func() { | ||||
| 			if err := kl.syncPod(pod, dockerContainers); err != nil { | ||||
| 				glog.Errorf("Error syncing pod, skipping: %v", err) | ||||
| 				record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	// Stop the workers for no-longer existing pods. | ||||
| 	kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) | ||||
|  | ||||
| 	// Kill any containers we don't need. | ||||
| 	killed := []string{} | ||||
| 	for ix := range dockerContainers { | ||||
| @@ -1421,21 +1454,19 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { | ||||
| func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { | ||||
| 	for { | ||||
| 		unsyncedPod := false | ||||
| 		podSyncTypes := make(map[types.UID]metrics.SyncPodType) | ||||
| 		select { | ||||
| 		case u := <-updates: | ||||
| 			kl.updatePods(u, podSyncTypes) | ||||
| 			kl.updatePods(u) | ||||
| 			unsyncedPod = true | ||||
| 		case <-time.After(kl.resyncInterval): | ||||
| 			glog.V(4).Infof("Periodic sync") | ||||
| 		} | ||||
| 		start := time.Now() | ||||
| 		// If we already caught some update, try to wait for some short time | ||||
| 		// to possibly batch it with other incoming updates. | ||||
| 		for unsyncedPod { | ||||
| 			select { | ||||
| 			case u := <-updates: | ||||
| 				kl.updatePods(u, podSyncTypes) | ||||
| 				kl.updatePods(u) | ||||
| 			case <-time.After(5 * time.Millisecond): | ||||
| 				// Break the for loop. | ||||
| 				unsyncedPod = false | ||||
| @@ -1447,54 +1478,25 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { | ||||
| 			glog.Errorf("Failed to get bound pods.") | ||||
| 			return | ||||
| 		} | ||||
| 		if err := handler.SyncPods(pods, podSyncTypes, start); err != nil { | ||||
| 		if err := handler.SyncPods(pods); err != nil { | ||||
| 			glog.Errorf("Couldn't sync containers: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Updated the Kubelet's internal pods with those provided by the update. | ||||
| // Records new and updated pods in newPods and updatedPods. | ||||
| func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { | ||||
| func (kl *Kubelet) updatePods(u PodUpdate) { | ||||
| 	switch u.Op { | ||||
| 	case SET: | ||||
| 		glog.V(3).Infof("SET: Containers changed") | ||||
|  | ||||
| 		// Store the new pods. Don't worry about filtering host ports since those | ||||
| 		// pods will never be looked up. | ||||
| 		existingPods := make(map[types.UID]struct{}) | ||||
| 		for i := range kl.pods { | ||||
| 			existingPods[kl.pods[i].UID] = struct{}{} | ||||
| 		} | ||||
| 		for i := range u.Pods { | ||||
| 			if _, ok := existingPods[u.Pods[i].UID]; !ok { | ||||
| 				podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		kl.pods = u.Pods | ||||
| 		kl.pods = filterHostPortConflicts(kl.pods) | ||||
| 	case UPDATE: | ||||
| 		glog.V(3).Infof("Update: Containers changed") | ||||
|  | ||||
| 		// Store the updated pods. Don't worry about filtering host ports since those | ||||
| 		// pods will never be looked up. | ||||
| 		for i := range u.Pods { | ||||
| 			podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate | ||||
| 		} | ||||
|  | ||||
| 		kl.pods = updateBoundPods(u.Pods, kl.pods) | ||||
| 		kl.pods = filterHostPortConflicts(kl.pods) | ||||
| 	default: | ||||
| 		panic("syncLoop does not support incremental changes") | ||||
| 	} | ||||
|  | ||||
| 	// Mark all remaining pods as sync. | ||||
| 	for i := range kl.pods { | ||||
| 		if _, ok := podSyncTypes[kl.pods[i].UID]; !ok { | ||||
| 			podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Returns Docker version for this Kubelet. | ||||
|   | ||||
| @@ -34,7 +34,6 @@ import ( | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" | ||||
| 	_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| @@ -49,15 +48,14 @@ func init() { | ||||
| 	util.ReallyCrash = true | ||||
| } | ||||
|  | ||||
| func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *sync.WaitGroup) { | ||||
| func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { | ||||
| 	fakeDocker := &dockertools.FakeDockerClient{ | ||||
| 		RemovedImages: util.StringSet{}, | ||||
| 	} | ||||
| 	fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) | ||||
|  | ||||
| 	kubelet := &Kubelet{} | ||||
| 	kubelet.dockerClient = fakeDocker | ||||
| 	kubelet.dockerCache = fakeDockerCache | ||||
| 	kubelet.dockerCache = dockertools.NewFakeDockerCache(fakeDocker) | ||||
| 	kubelet.dockerPuller = &dockertools.FakeDockerPuller{} | ||||
| 	if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { | ||||
| 		t.Fatalf("can't make a temp rootdir: %v", err) | ||||
| @@ -67,14 +65,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn | ||||
| 	if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { | ||||
| 		t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) | ||||
| 	} | ||||
| 	waitGroup := new(sync.WaitGroup) | ||||
| 	kubelet.podWorkers = newPodWorkers( | ||||
| 		fakeDockerCache, | ||||
| 		func(pod *api.BoundPod, containers dockertools.DockerContainers) error { | ||||
| 			err := kubelet.syncPod(pod, containers) | ||||
| 			waitGroup.Done() | ||||
| 			return err | ||||
| 		}) | ||||
| 	kubelet.podWorkers = newPodWorkers() | ||||
| 	kubelet.sourceReady = func(source string) bool { return true } | ||||
| 	kubelet.masterServiceNamespace = api.NamespaceDefault | ||||
| 	kubelet.serviceLister = testServiceLister{} | ||||
| @@ -83,7 +74,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient, *syn | ||||
| 		t.Fatalf("can't initialize kubelet data dirs: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	return kubelet, fakeDocker, waitGroup | ||||
| 	return kubelet, fakeDocker | ||||
| } | ||||
|  | ||||
| func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { | ||||
| @@ -135,7 +126,7 @@ func verifyBoolean(t *testing.T, expected, value bool) { | ||||
| } | ||||
|  | ||||
| func TestKubeletDirs(t *testing.T) { | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	root := kubelet.rootDirectory | ||||
|  | ||||
| 	var exp, got string | ||||
| @@ -196,7 +187,7 @@ func TestKubeletDirs(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestKubeletDirsCompat(t *testing.T) { | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	root := kubelet.rootDirectory | ||||
| 	if err := os.MkdirAll(root, 0750); err != nil { | ||||
| 		t.Fatalf("can't mkdir(%q): %s", root, err) | ||||
| @@ -302,7 +293,7 @@ func TestKillContainerWithError(t *testing.T) { | ||||
| 		Err:           fmt.Errorf("sample error"), | ||||
| 		ContainerList: append([]docker.APIContainers{}, containers...), | ||||
| 	} | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	for _, c := range fakeDocker.ContainerList { | ||||
| 		kubelet.readiness.set(c.ID, true) | ||||
| 	} | ||||
| @@ -333,7 +324,7 @@ func TestKillContainer(t *testing.T) { | ||||
| 			Names: []string{"/k8s_bar_qux_5678_42"}, | ||||
| 		}, | ||||
| 	} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = append([]docker.APIContainers{}, containers...) | ||||
| 	fakeDocker.Container = &docker.Container{ | ||||
| 		Name: "foobar", | ||||
| @@ -383,10 +374,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod { | ||||
| 	return cr.list | ||||
| } | ||||
|  | ||||
| var emptyPodUIDs map[types.UID]metrics.SyncPodType | ||||
|  | ||||
| func TestSyncPodsDoesNothing(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	container := api.Container{Name: "bar"} | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| @@ -415,17 +404,16 @@ func TestSyncPodsDoesNothing(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"}) | ||||
| 	kubelet.drainWorkers() | ||||
| 	verifyCalls(t, fakeDocker, []string{"list", "list", "inspect_container", "inspect_container"}) | ||||
| } | ||||
|  | ||||
| func TestSyncPodsWithTerminationLog(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	container := api.Container{ | ||||
| 		Name: "bar", | ||||
| 		TerminationMessagePath: "/dev/somepath", | ||||
| @@ -446,14 +434,13 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
| 		"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
| 	parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") | ||||
| @@ -466,6 +453,19 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { | ||||
| 	fakeDocker.Unlock() | ||||
| } | ||||
|  | ||||
| // drainWorkers waits until all workers are done.  Should only used for testing. | ||||
| func (kl *Kubelet) drainWorkers() { | ||||
| 	for { | ||||
| 		kl.podWorkers.lock.Lock() | ||||
| 		length := len(kl.podWorkers.workers) | ||||
| 		kl.podWorkers.lock.Unlock() | ||||
| 		if length == 0 { | ||||
| 			return | ||||
| 		} | ||||
| 		time.Sleep(time.Millisecond * 100) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func matchString(t *testing.T, pattern, str string) bool { | ||||
| 	match, err := regexp.MatchString(pattern, str) | ||||
| 	if err != nil { | ||||
| @@ -475,7 +475,7 @@ func matchString(t *testing.T, pattern, str string) bool { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsCreatesNetAndContainer(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.podInfraContainerImage = "custom_image_name" | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	kubelet.pods = []api.BoundPod{ | ||||
| @@ -493,15 +493,14 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
| 		"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
|  | ||||
| @@ -524,7 +523,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) | ||||
| 	puller.HasImages = []string{} | ||||
| 	kubelet.podInfraContainerImage = "custom_image_name" | ||||
| @@ -544,15 +543,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
| 		"list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
|  | ||||
| @@ -569,7 +567,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| 			// pod infra container | ||||
| @@ -592,15 +590,14 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
| 		"list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
| 	if len(fakeDocker.Created) != 1 || | ||||
| @@ -611,7 +608,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeHttp := fakeHTTP{} | ||||
| 	kubelet.httpClient = &fakeHttp | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| @@ -647,15 +644,14 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
| 		"list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
| 	if len(fakeDocker.Created) != 1 || | ||||
| @@ -669,7 +665,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| 			// format is // k8s_<container-id>_<pod-fullname>_<pod-uid> | ||||
| @@ -692,15 +688,14 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods(kubelet.pods) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
| 		"list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) | ||||
|  | ||||
| 	// A map iteration is used to delete containers, so must not depend on | ||||
| 	// order here. | ||||
| @@ -716,7 +711,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { | ||||
|  | ||||
| func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { | ||||
| 	ready := false | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.sourceReady = func(source string) bool { return ready } | ||||
|  | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| @@ -731,7 +726,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { | ||||
| 			ID:    "9876", | ||||
| 		}, | ||||
| 	} | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	// Validate nothing happened. | ||||
| @@ -739,7 +734,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { | ||||
| 	fakeDocker.ClearCalls() | ||||
|  | ||||
| 	ready = true | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) | ||||
| @@ -759,7 +754,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { | ||||
|  | ||||
| func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { | ||||
| 	ready := false | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.sourceReady = func(source string) bool { | ||||
| 		if source == "testSource" { | ||||
| 			return ready | ||||
| @@ -790,7 +785,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { | ||||
| 			ID:    "9876", | ||||
| 		}, | ||||
| 	} | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	// Validate nothing happened. | ||||
| @@ -798,7 +793,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { | ||||
| 	fakeDocker.ClearCalls() | ||||
|  | ||||
| 	ready = true | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil { | ||||
| 	if err := kubelet.SyncPods([]api.BoundPod{}); err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) | ||||
| @@ -819,7 +814,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsDeletes(t *testing.T) { | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| 			// the k8s prefix is required for the kubelet to manage the container | ||||
| @@ -836,7 +831,7 @@ func TestSyncPodsDeletes(t *testing.T) { | ||||
| 			ID:    "4567", | ||||
| 		}, | ||||
| 	} | ||||
| 	err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()) | ||||
| 	err := kubelet.SyncPods([]api.BoundPod{}) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| @@ -857,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodDeletesDuplicate(t *testing.T) { | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	dockerContainers := dockertools.DockerContainers{ | ||||
| 		"1234": &docker.APIContainers{ | ||||
| 			// the k8s prefix is required for the kubelet to manage the container | ||||
| @@ -907,7 +902,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodBadHash(t *testing.T) { | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	dockerContainers := dockertools.DockerContainers{ | ||||
| 		"1234": &docker.APIContainers{ | ||||
| 			// the k8s prefix is required for the kubelet to manage the container | ||||
| @@ -956,7 +951,7 @@ func TestSyncPodBadHash(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodUnhealthy(t *testing.T) { | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	dockerContainers := dockertools.DockerContainers{ | ||||
| 		"1234": &docker.APIContainers{ | ||||
| 			// the k8s prefix is required for the kubelet to manage the container | ||||
| @@ -1006,7 +1001,7 @@ func TestSyncPodUnhealthy(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestMountExternalVolumes(t *testing.T) { | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{&volume.FakePlugin{"fake", nil}}, &volumeHost{kubelet}) | ||||
|  | ||||
| 	pod := api.BoundPod{ | ||||
| @@ -1040,7 +1035,7 @@ func TestMountExternalVolumes(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestGetPodVolumesFromDisk(t *testing.T) { | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	plug := &volume.FakePlugin{"fake", nil} | ||||
| 	kubelet.volumePluginMgr.InitPlugins([]volume.Plugin{plug}, &volumeHost{kubelet}) | ||||
|  | ||||
| @@ -1315,7 +1310,7 @@ func TestGetContainerInfo(t *testing.T) { | ||||
| 	cadvisorReq := &info.ContainerInfoRequest{} | ||||
| 	mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) | ||||
|  | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.cadvisorClient = mockCadvisor | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| @@ -1353,6 +1348,7 @@ func TestGetRootInfo(t *testing.T) { | ||||
| 		dockerClient:   &fakeDocker, | ||||
| 		dockerPuller:   &dockertools.FakeDockerPuller{}, | ||||
| 		cadvisorClient: mockCadvisor, | ||||
| 		podWorkers:     newPodWorkers(), | ||||
| 	} | ||||
|  | ||||
| 	// If the container name is an empty string, then it means the root container. | ||||
| @@ -1364,7 +1360,7 @@ func TestGetRootInfo(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestGetContainerInfoWithoutCadvisor(t *testing.T) { | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| 			ID: "foobar", | ||||
| @@ -1389,7 +1385,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { | ||||
| 	cadvisorReq := &info.ContainerInfoRequest{} | ||||
| 	mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, ErrCadvisorApiFailure) | ||||
|  | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.cadvisorClient = mockCadvisor | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{ | ||||
| 		{ | ||||
| @@ -1417,7 +1413,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { | ||||
| func TestGetContainerInfoOnNonExistContainer(t *testing.T) { | ||||
| 	mockCadvisor := &mockCadvisorClient{} | ||||
|  | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.cadvisorClient = mockCadvisor | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
|  | ||||
| @@ -1431,7 +1427,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) { | ||||
| func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { | ||||
| 	mockCadvisor := &mockCadvisorClient{} | ||||
|  | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	kubelet.cadvisorClient = mockCadvisor | ||||
| 	expectedErr := fmt.Errorf("List containers error") | ||||
| 	kubelet.dockerClient = &errorTestingDockerClient{listContainersError: expectedErr} | ||||
| @@ -1451,7 +1447,7 @@ func TestGetContainerInfoWhenDockerToolsFailed(t *testing.T) { | ||||
| func TestGetContainerInfoWithNoContainers(t *testing.T) { | ||||
| 	mockCadvisor := &mockCadvisorClient{} | ||||
|  | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	kubelet.cadvisorClient = mockCadvisor | ||||
|  | ||||
| 	kubelet.dockerClient = &errorTestingDockerClient{listContainersError: nil} | ||||
| @@ -1470,7 +1466,7 @@ func TestGetContainerInfoWithNoContainers(t *testing.T) { | ||||
| func TestGetContainerInfoWithNoMatchingContainers(t *testing.T) { | ||||
| 	mockCadvisor := &mockCadvisorClient{} | ||||
|  | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	kubelet.cadvisorClient = mockCadvisor | ||||
|  | ||||
| 	containerList := []docker.APIContainers{ | ||||
| @@ -1534,7 +1530,7 @@ func (f *fakeContainerCommandRunner) PortForward(podInfraContainerID string, por | ||||
|  | ||||
| func TestRunInContainerNoSuchPod(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| @@ -1556,7 +1552,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { | ||||
|  | ||||
| func TestRunInContainer(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| 	containerID := "abc1234" | ||||
| @@ -1597,7 +1593,7 @@ func TestRunInContainer(t *testing.T) { | ||||
|  | ||||
| func TestRunHandlerExec(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| 	containerID := "abc1234" | ||||
| @@ -1645,7 +1641,7 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) { | ||||
| func TestRunHandlerHttp(t *testing.T) { | ||||
| 	fakeHttp := fakeHTTP{} | ||||
|  | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	kubelet.httpClient = &fakeHttp | ||||
|  | ||||
| 	podName := "podFoo" | ||||
| @@ -1674,7 +1670,7 @@ func TestRunHandlerHttp(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestNewHandler(t *testing.T) { | ||||
| 	kubelet, _, _ := newTestKubelet(t) | ||||
| 	kubelet, _ := newTestKubelet(t) | ||||
| 	handler := &api.Handler{ | ||||
| 		HTTPGet: &api.HTTPGetAction{ | ||||
| 			Host: "foo", | ||||
| @@ -1705,7 +1701,7 @@ func TestNewHandler(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodEventHandlerFails(t *testing.T) { | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.httpClient = &fakeHTTP{ | ||||
| 		err: fmt.Errorf("test error"), | ||||
| 	} | ||||
| @@ -1894,7 +1890,7 @@ func TestKubeletGarbageCollection(t *testing.T) { | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, test := range tests { | ||||
| 		kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 		kubelet, fakeDocker := newTestKubelet(t) | ||||
| 		kubelet.maxContainerCount = 2 | ||||
| 		fakeDocker.ContainerList = test.containers | ||||
| 		fakeDocker.ContainerMap = test.containerDetails | ||||
| @@ -2059,7 +2055,7 @@ func TestPurgeOldest(t *testing.T) { | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, test := range tests { | ||||
| 		kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 		kubelet, fakeDocker := newTestKubelet(t) | ||||
| 		kubelet.maxContainerCount = 5 | ||||
| 		fakeDocker.ContainerMap = test.containerDetails | ||||
| 		kubelet.purgeOldest(test.ids) | ||||
| @@ -2070,12 +2066,11 @@ func TestPurgeOldest(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestSyncPodsWithPullPolicy(t *testing.T) { | ||||
| 	kubelet, fakeDocker, waitGroup := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) | ||||
| 	puller.HasImages = []string{"existing_one", "want:latest"} | ||||
| 	kubelet.podInfraContainerImage = "custom_image_name" | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	waitGroup.Add(1) | ||||
| 	err := kubelet.SyncPods([]api.BoundPod{ | ||||
| 		{ | ||||
| 			ObjectMeta: api.ObjectMeta{ | ||||
| @@ -2094,11 +2089,11 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	}, emptyPodUIDs, time.Now()) | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	waitGroup.Wait() | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
|  | ||||
| @@ -2404,7 +2399,7 @@ func TestMakeEnvironmentVariables(t *testing.T) { | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range testCases { | ||||
| 		kl, _, _ := newTestKubelet(t) | ||||
| 		kl, _ := newTestKubelet(t) | ||||
| 		kl.masterServiceNamespace = tc.masterServiceNamespace | ||||
| 		if tc.nilLister { | ||||
| 			kl.serviceLister = nil | ||||
| @@ -2841,7 +2836,7 @@ func TestGetPodReadyCondition(t *testing.T) { | ||||
|  | ||||
| func TestExecInContainerNoSuchPod(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| @@ -2868,7 +2863,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { | ||||
|  | ||||
| func TestExecInContainerNoSuchContainer(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| 	podName := "podFoo" | ||||
| @@ -2921,7 +2916,7 @@ func (f *fakeReadWriteCloser) Close() error { | ||||
|  | ||||
| func TestExecInContainer(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| 	podName := "podFoo" | ||||
| @@ -2980,7 +2975,7 @@ func TestExecInContainer(t *testing.T) { | ||||
|  | ||||
| func TestPortForwardNoSuchPod(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| @@ -3004,7 +2999,7 @@ func TestPortForwardNoSuchPod(t *testing.T) { | ||||
|  | ||||
| func TestPortForwardNoSuchContainer(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| 	podName := "podFoo" | ||||
| @@ -3039,7 +3034,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) { | ||||
|  | ||||
| func TestPortForward(t *testing.T) { | ||||
| 	fakeCommandRunner := fakeContainerCommandRunner{} | ||||
| 	kubelet, fakeDocker, _ := newTestKubelet(t) | ||||
| 	kubelet, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.runner = &fakeCommandRunner | ||||
|  | ||||
| 	podName := "podFoo" | ||||
|   | ||||
| @@ -18,7 +18,6 @@ package metrics | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| @@ -36,16 +35,8 @@ var ( | ||||
| 			Help:      "Image pull latency in microseconds.", | ||||
| 		}, | ||||
| 	) | ||||
| 	// TODO(vmarmol): Break down by number of containers in pod? | ||||
| 	SyncPodLatency = prometheus.NewSummaryVec( | ||||
| 		prometheus.SummaryOpts{ | ||||
| 			Subsystem: kubeletSubsystem, | ||||
| 			Name:      "sync_pod_latency_microseconds", | ||||
| 			Help:      "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync", | ||||
| 		}, | ||||
| 		[]string{"operation_type"}, | ||||
| 	) | ||||
| 	// TODO(vmarmol): Containers per pod | ||||
| 	// TODO(vmarmol): Latency of pod startup | ||||
| 	// TODO(vmarmol): Latency of SyncPods | ||||
| ) | ||||
|  | ||||
| @@ -56,37 +47,10 @@ func Register(containerCache dockertools.DockerCache) { | ||||
| 	// Register the metrics. | ||||
| 	registerMetrics.Do(func() { | ||||
| 		prometheus.MustRegister(ImagePullLatency) | ||||
| 		prometheus.MustRegister(SyncPodLatency) | ||||
| 		prometheus.MustRegister(newPodAndContainerCollector(containerCache)) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| type SyncPodType int | ||||
|  | ||||
| const ( | ||||
| 	SyncPodCreate SyncPodType = iota | ||||
| 	SyncPodUpdate | ||||
| 	SyncPodSync | ||||
| ) | ||||
|  | ||||
| func (self SyncPodType) String() string { | ||||
| 	switch self { | ||||
| 	case SyncPodCreate: | ||||
| 		return "create" | ||||
| 	case SyncPodUpdate: | ||||
| 		return "update" | ||||
| 	case SyncPodSync: | ||||
| 		return "sync" | ||||
| 	default: | ||||
| 		return "unknown" | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Gets the time since the specified start in microseconds. | ||||
| func SinceInMicroseconds(start time.Time) float64 { | ||||
| 	return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) | ||||
| } | ||||
|  | ||||
| func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector { | ||||
| 	return &podAndContainerCollector{ | ||||
| 		containerCache: containerCache, | ||||
|   | ||||
| @@ -1,113 +0,0 @@ | ||||
| /* | ||||
| Copyright 2014 Google Inc. All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package kubelet | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| 	"github.com/golang/glog" | ||||
| ) | ||||
|  | ||||
| type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error | ||||
|  | ||||
| // TODO(wojtek-t) Add unit tests for this type. | ||||
| type podWorkers struct { | ||||
| 	// Protects podUpdates field. | ||||
| 	podLock sync.Mutex | ||||
|  | ||||
| 	// Tracks all running per-pod goroutines - per-pod goroutine will be | ||||
| 	// processing updates received through its corresponding channel. | ||||
| 	podUpdates map[types.UID]chan workUpdate | ||||
| 	// DockerCache is used for listing running containers. | ||||
| 	dockerCache dockertools.DockerCache | ||||
|  | ||||
| 	// This function is run to sync the desired stated of pod. | ||||
| 	// NOTE: This function has to be thread-safe - it can be called for | ||||
| 	// different pods at the same time. | ||||
| 	syncPodFun syncPodFunType | ||||
| } | ||||
|  | ||||
| type workUpdate struct { | ||||
| 	// The pod state to reflect. | ||||
| 	pod *api.BoundPod | ||||
|  | ||||
| 	// Function to call when the update is complete. | ||||
| 	updateCompleteFun func() | ||||
| } | ||||
|  | ||||
| func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers { | ||||
| 	return &podWorkers{ | ||||
| 		podUpdates:  map[types.UID]chan workUpdate{}, | ||||
| 		dockerCache: dockerCache, | ||||
| 		syncPodFun:  syncPodFun, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { | ||||
| 	for newWork := range podUpdates { | ||||
| 		// Since we use docker cache, getting current state shouldn't cause | ||||
| 		// performance overhead on Docker. Moreover, as long as we run syncPod | ||||
| 		// no matter if it changes anything, having an old version of "containers" | ||||
| 		// can cause starting eunended containers. | ||||
| 		containers, err := p.dockerCache.RunningContainers() | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Error listing containers while syncing pod: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 		err = p.syncPodFun(newWork.pod, containers) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) | ||||
| 			record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Apply the new setting to the specified pod. updateComplete is called when the update is completed. | ||||
| func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) { | ||||
| 	uid := pod.UID | ||||
| 	var podUpdates chan workUpdate | ||||
| 	var exists bool | ||||
|  | ||||
| 	p.podLock.Lock() | ||||
| 	defer p.podLock.Unlock() | ||||
| 	if podUpdates, exists = p.podUpdates[uid]; !exists { | ||||
| 		// TODO(wojtek-t): Adjust the size of the buffer in this channel | ||||
| 		podUpdates = make(chan workUpdate, 5) | ||||
| 		p.podUpdates[uid] = podUpdates | ||||
| 		go p.managePodLoop(podUpdates) | ||||
| 	} | ||||
| 	podUpdates <- workUpdate{ | ||||
| 		pod:               pod, | ||||
| 		updateCompleteFun: updateComplete, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { | ||||
| 	p.podLock.Lock() | ||||
| 	defer p.podLock.Unlock() | ||||
| 	for key, channel := range p.podUpdates { | ||||
| 		if _, exists := desiredPods[key]; !exists { | ||||
| 			close(channel) | ||||
| 			delete(p.podUpdates, key) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Victor Marmol
					Victor Marmol