Move ComputePodChanges to DockerManager.
This logic is specific to the Docker runtime. This move is the first step towards making syncPod() runtime-agnostic.
This commit is contained in:
		| @@ -19,7 +19,6 @@ package dockertools | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"hash/adler32" | ||||
| 	"io" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"strconv" | ||||
| @@ -27,7 +26,6 @@ import ( | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" | ||||
| 	kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| @@ -275,13 +273,6 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { | ||||
| 	return client | ||||
| } | ||||
|  | ||||
| // TODO(yifan): Move this to container.Runtime. | ||||
| type ContainerCommandRunner interface { | ||||
| 	RunInContainer(containerID string, cmd []string) ([]byte, error) | ||||
| 	ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error | ||||
| 	PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error | ||||
| } | ||||
|  | ||||
| func milliCPUToShares(milliCPU int64) int64 { | ||||
| 	if milliCPU == 0 { | ||||
| 		// zero milliCPU means unset. Use kernel default. | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" | ||||
| 	kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| 	docker "github.com/fsouza/go-dockerclient" | ||||
| @@ -394,7 +395,7 @@ func TestGetRunningContainers(t *testing.T) { | ||||
| 	fakeDocker := &FakeDockerClient{Errors: make(map[string]error)} | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) | ||||
| 	containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) | ||||
| 	containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) | ||||
| 	tests := []struct { | ||||
| 		containers  map[string]*docker.Container | ||||
| 		inputIDs    []string | ||||
| @@ -660,7 +661,7 @@ func TestFindContainersByPod(t *testing.T) { | ||||
| 	} | ||||
| 	fakeClient := &FakeDockerClient{} | ||||
| 	np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) | ||||
| 	containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) | ||||
| 	containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) | ||||
| 	for i, test := range tests { | ||||
| 		fakeClient.ContainerList = test.containerList | ||||
| 		fakeClient.ExitedContainerList = test.exitedContainerList | ||||
|   | ||||
| @@ -35,7 +35,9 @@ import ( | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" | ||||
| 	kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| 	"github.com/fsouza/go-dockerclient" | ||||
| @@ -86,6 +88,11 @@ type DockerManager struct { | ||||
|  | ||||
| 	// Network plugin. | ||||
| 	networkPlugin network.NetworkPlugin | ||||
|  | ||||
| 	// TODO(vmarmol): Make this non-public when we remove the circular dependency | ||||
| 	// with prober. | ||||
| 	// Health check prober. | ||||
| 	Prober prober.Prober | ||||
| } | ||||
|  | ||||
| func NewDockerManager( | ||||
| @@ -98,7 +105,8 @@ func NewDockerManager( | ||||
| 	burst int, | ||||
| 	containerLogsDir string, | ||||
| 	osInterface kubecontainer.OSInterface, | ||||
| 	networkPlugin network.NetworkPlugin) *DockerManager { | ||||
| 	networkPlugin network.NetworkPlugin, | ||||
| 	prober prober.Prober) *DockerManager { | ||||
| 	// Work out the location of the Docker runtime, defaulting to /var/lib/docker | ||||
| 	// if there are any problems. | ||||
| 	dockerRoot := "/var/lib/docker" | ||||
| @@ -142,6 +150,7 @@ func NewDockerManager( | ||||
| 		dockerRoot:             dockerRoot, | ||||
| 		containerLogsDir:       containerLogsDir, | ||||
| 		networkPlugin:          networkPlugin, | ||||
| 		Prober:                 prober, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -692,8 +701,8 @@ func (dm *DockerManager) IsImagePresent(image string) (bool, error) { | ||||
| 	return dm.Puller.IsImagePresent(image) | ||||
| } | ||||
|  | ||||
| // PodInfraContainer returns true if the pod infra container has changed. | ||||
| func (dm *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { | ||||
| // podInfraContainerChanged returns true if the pod infra container has changed. | ||||
| func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { | ||||
| 	networkMode := "" | ||||
| 	var ports []api.ContainerPort | ||||
|  | ||||
| @@ -1112,3 +1121,164 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon | ||||
| 	} | ||||
| 	return id, util.ApplyOomScoreAdj(containerInfo.State.Pid, podOomScoreAdj) | ||||
| } | ||||
|  | ||||
| // TODO(vmarmol): This will soon be made non-public when its only use is internal. | ||||
| // Structure keeping information on changes that need to happen for a pod. The semantics is as follows: | ||||
| // - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed. | ||||
| //   Additionally if it is true then containersToKeep have to be empty | ||||
| // - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container | ||||
| // - containersToStart keeps indices of Specs of containers that have to be started. | ||||
| // - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that | ||||
| //   should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1). | ||||
| //   It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case | ||||
| //   Infra Container should be killed, hence it's removed from this map. | ||||
| // - all running containers which are NOT contained in containersToKeep should be killed. | ||||
| type empty struct{} | ||||
| type PodContainerChangesSpec struct { | ||||
| 	StartInfraContainer bool | ||||
| 	InfraContainerId    kubeletTypes.DockerID | ||||
| 	ContainersToStart   map[int]empty | ||||
| 	ContainersToKeep    map[kubeletTypes.DockerID]int | ||||
| } | ||||
|  | ||||
| // TODO(vmarmol): This will soon be made non-public when its only use is internal. | ||||
| func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) { | ||||
| 	podFullName := kubecontainer.GetPodFullName(pod) | ||||
| 	uid := pod.UID | ||||
| 	glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) | ||||
|  | ||||
| 	containersToStart := make(map[int]empty) | ||||
| 	containersToKeep := make(map[kubeletTypes.DockerID]int) | ||||
| 	createPodInfraContainer := false | ||||
|  | ||||
| 	var err error | ||||
| 	var podInfraContainerID kubeletTypes.DockerID | ||||
| 	var changed bool | ||||
| 	podInfraContainer := runningPod.FindContainerByName(PodInfraContainerName) | ||||
| 	if podInfraContainer != nil { | ||||
| 		glog.V(4).Infof("Found pod infra container for %q", podFullName) | ||||
| 		changed, err = dm.podInfraContainerChanged(pod, podInfraContainer) | ||||
| 		if err != nil { | ||||
| 			return PodContainerChangesSpec{}, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	createPodInfraContainer = true | ||||
| 	if podInfraContainer == nil { | ||||
| 		glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName) | ||||
| 	} else if changed { | ||||
| 		glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName) | ||||
| 	} else { | ||||
| 		glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName) | ||||
| 		createPodInfraContainer = false | ||||
| 		podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID) | ||||
| 		containersToKeep[podInfraContainerID] = -1 | ||||
| 	} | ||||
|  | ||||
| 	for index, container := range pod.Spec.Containers { | ||||
| 		expectedHash := HashContainer(&container) | ||||
|  | ||||
| 		c := runningPod.FindContainerByName(container.Name) | ||||
| 		if c == nil { | ||||
| 			if shouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) { | ||||
| 				// If we are here it means that the container is dead and should be restarted, or never existed and should | ||||
| 				// be created. We may be inserting this ID again if the container has changed and it has | ||||
| 				// RestartPolicy::Always, but it's not a big deal. | ||||
| 				glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) | ||||
| 				containersToStart[index] = empty{} | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		containerID := kubeletTypes.DockerID(c.ID) | ||||
| 		hash := c.Hash | ||||
| 		glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) | ||||
|  | ||||
| 		if createPodInfraContainer { | ||||
| 			// createPodInfraContainer == true and Container exists | ||||
| 			// If we're creating infra containere everything will be killed anyway | ||||
| 			// If RestartPolicy is Always or OnFailure we restart containers that were running before we | ||||
| 			// killed them when restarting Infra Container. | ||||
| 			if pod.Spec.RestartPolicy != api.RestartPolicyNever { | ||||
| 				glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name) | ||||
| 				containersToStart[index] = empty{} | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// At this point, the container is running and pod infra container is good. | ||||
| 		// We will look for changes and check healthiness for the container. | ||||
| 		containerChanged := hash != 0 && hash != expectedHash | ||||
| 		if containerChanged { | ||||
| 			glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash) | ||||
| 			containersToStart[index] = empty{} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		result, err := dm.Prober.Probe(pod, podStatus, container, string(c.ID), c.Created) | ||||
| 		if err != nil { | ||||
| 			// TODO(vmarmol): examine this logic. | ||||
| 			glog.V(2).Infof("probe no-error: %q", container.Name) | ||||
| 			containersToKeep[containerID] = index | ||||
| 			continue | ||||
| 		} | ||||
| 		if result == probe.Success { | ||||
| 			glog.V(4).Infof("probe success: %q", container.Name) | ||||
| 			containersToKeep[containerID] = index | ||||
| 			continue | ||||
| 		} | ||||
| 		glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) | ||||
| 		containersToStart[index] = empty{} | ||||
| 	} | ||||
|  | ||||
| 	// After the loop one of the following should be true: | ||||
| 	// - createPodInfraContainer is true and containersToKeep is empty. | ||||
| 	// (In fact, when createPodInfraContainer is false, containersToKeep will not be touched). | ||||
| 	// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container | ||||
|  | ||||
| 	// If Infra container is the last running one, we don't want to keep it. | ||||
| 	if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 { | ||||
| 		containersToKeep = make(map[kubeletTypes.DockerID]int) | ||||
| 	} | ||||
|  | ||||
| 	return PodContainerChangesSpec{ | ||||
| 		StartInfraContainer: createPodInfraContainer, | ||||
| 		InfraContainerId:    podInfraContainerID, | ||||
| 		ContainersToStart:   containersToStart, | ||||
| 		ContainersToKeep:    containersToKeep, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool { | ||||
| 	podFullName := kubecontainer.GetPodFullName(pod) | ||||
|  | ||||
| 	// Get all dead container status. | ||||
| 	var resultStatus []*api.ContainerStatus | ||||
| 	for i, containerStatus := range podStatus.ContainerStatuses { | ||||
| 		if containerStatus.Name == container.Name && containerStatus.State.Termination != nil { | ||||
| 			resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i]) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Set dead containers to unready state. | ||||
| 	for _, c := range resultStatus { | ||||
| 		readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID)) | ||||
| 	} | ||||
|  | ||||
| 	// Check RestartPolicy for dead container. | ||||
| 	if len(resultStatus) > 0 { | ||||
| 		if pod.Spec.RestartPolicy == api.RestartPolicyNever { | ||||
| 			glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName) | ||||
| 			return false | ||||
| 		} | ||||
| 		if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure { | ||||
| 			// Check the exit code of last run. Note: This assumes the result is sorted | ||||
| 			// by the created time in reverse order. | ||||
| 			if resultStatus[0].State.Termination.ExitCode == 0 { | ||||
| 				glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName) | ||||
| 				return false | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
|   | ||||
| @@ -24,18 +24,19 @@ import ( | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| 	"github.com/golang/glog" | ||||
| ) | ||||
|  | ||||
| type handlerRunner struct { | ||||
| 	httpGetter       httpGetter | ||||
| 	commandRunner    dockertools.ContainerCommandRunner | ||||
| 	commandRunner    prober.ContainerCommandRunner | ||||
| 	containerManager *dockertools.DockerManager | ||||
| } | ||||
|  | ||||
| // TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface. | ||||
| func newHandlerRunner(httpGetter httpGetter, commandRunner dockertools.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner { | ||||
| func newHandlerRunner(httpGetter httpGetter, commandRunner prober.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner { | ||||
| 	return &handlerRunner{ | ||||
| 		httpGetter:       httpGetter, | ||||
| 		commandRunner:    commandRunner, | ||||
|   | ||||
| @@ -46,7 +46,6 @@ import ( | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" | ||||
| 	kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| @@ -251,7 +250,8 @@ func NewMainKubelet( | ||||
| 		pullBurst, | ||||
| 		containerLogsDir, | ||||
| 		osInterface, | ||||
| 		klet.networkPlugin) | ||||
| 		klet.networkPlugin, | ||||
| 		nil) | ||||
| 	klet.runner = containerManager | ||||
| 	klet.containerManager = containerManager | ||||
|  | ||||
| @@ -259,6 +259,9 @@ func NewMainKubelet( | ||||
| 	klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder) | ||||
| 	klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager) | ||||
|  | ||||
| 	// TODO(vmarmol): Remove when the circular dependency is removed :( | ||||
| 	containerManager.Prober = klet.prober | ||||
|  | ||||
| 	runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -318,7 +321,7 @@ type Kubelet struct { | ||||
| 	// Optional, defaults to /logs/ from /var/log | ||||
| 	logServer http.Handler | ||||
| 	// Optional, defaults to simple Docker implementation | ||||
| 	runner dockertools.ContainerCommandRunner | ||||
| 	runner prober.ContainerCommandRunner | ||||
| 	// Optional, client for http requests, defaults to empty client | ||||
| 	httpClient httpGetter | ||||
|  | ||||
| @@ -911,164 +914,6 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool { | ||||
| 	podFullName := kubecontainer.GetPodFullName(pod) | ||||
|  | ||||
| 	// Get all dead container status. | ||||
| 	var resultStatus []*api.ContainerStatus | ||||
| 	for i, containerStatus := range podStatus.ContainerStatuses { | ||||
| 		if containerStatus.Name == container.Name && containerStatus.State.Termination != nil { | ||||
| 			resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i]) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Set dead containers to unready state. | ||||
| 	for _, c := range resultStatus { | ||||
| 		readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID)) | ||||
| 	} | ||||
|  | ||||
| 	// Check RestartPolicy for dead container. | ||||
| 	if len(resultStatus) > 0 { | ||||
| 		if pod.Spec.RestartPolicy == api.RestartPolicyNever { | ||||
| 			glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName) | ||||
| 			return false | ||||
| 		} | ||||
| 		if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure { | ||||
| 			// Check the exit code of last run. Note: This assumes the result is sorted | ||||
| 			// by the created time in reverse order. | ||||
| 			if resultStatus[0].State.Termination.ExitCode == 0 { | ||||
| 				glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName) | ||||
| 				return false | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| // Structure keeping information on changes that need to happen for a pod. The semantics is as follows: | ||||
| // - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed. | ||||
| //   Additionally if it is true then containersToKeep have to be empty | ||||
| // - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container | ||||
| // - containersToStart keeps indices of Specs of containers that have to be started. | ||||
| // - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that | ||||
| //   should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1). | ||||
| //   It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case | ||||
| //   Infra Container should be killed, hence it's removed from this map. | ||||
| // - all running containers which are NOT contained in containersToKeep should be killed. | ||||
| type podContainerChangesSpec struct { | ||||
| 	startInfraContainer bool | ||||
| 	infraContainerId    kubeletTypes.DockerID | ||||
| 	containersToStart   map[int]empty | ||||
| 	containersToKeep    map[kubeletTypes.DockerID]int | ||||
| } | ||||
|  | ||||
| func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (podContainerChangesSpec, error) { | ||||
| 	podFullName := kubecontainer.GetPodFullName(pod) | ||||
| 	uid := pod.UID | ||||
| 	glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) | ||||
|  | ||||
| 	containersToStart := make(map[int]empty) | ||||
| 	containersToKeep := make(map[kubeletTypes.DockerID]int) | ||||
| 	createPodInfraContainer := false | ||||
|  | ||||
| 	var err error | ||||
| 	var podInfraContainerID kubeletTypes.DockerID | ||||
| 	var changed bool | ||||
| 	podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName) | ||||
| 	if podInfraContainer != nil { | ||||
| 		glog.V(4).Infof("Found pod infra container for %q", podFullName) | ||||
| 		changed, err = kl.containerManager.PodInfraContainerChanged(pod, podInfraContainer) | ||||
| 		if err != nil { | ||||
| 			return podContainerChangesSpec{}, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	createPodInfraContainer = true | ||||
| 	if podInfraContainer == nil { | ||||
| 		glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName) | ||||
| 	} else if changed { | ||||
| 		glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName) | ||||
| 	} else { | ||||
| 		glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName) | ||||
| 		createPodInfraContainer = false | ||||
| 		podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID) | ||||
| 		containersToKeep[podInfraContainerID] = -1 | ||||
| 	} | ||||
|  | ||||
| 	for index, container := range pod.Spec.Containers { | ||||
| 		expectedHash := dockertools.HashContainer(&container) | ||||
|  | ||||
| 		c := runningPod.FindContainerByName(container.Name) | ||||
| 		if c == nil { | ||||
| 			if shouldContainerBeRestarted(&container, pod, &podStatus, kl.readinessManager) { | ||||
| 				// If we are here it means that the container is dead and should be restarted, or never existed and should | ||||
| 				// be created. We may be inserting this ID again if the container has changed and it has | ||||
| 				// RestartPolicy::Always, but it's not a big deal. | ||||
| 				glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) | ||||
| 				containersToStart[index] = empty{} | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		containerID := kubeletTypes.DockerID(c.ID) | ||||
| 		hash := c.Hash | ||||
| 		glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) | ||||
|  | ||||
| 		if createPodInfraContainer { | ||||
| 			// createPodInfraContainer == true and Container exists | ||||
| 			// If we're creating infra containere everything will be killed anyway | ||||
| 			// If RestartPolicy is Always or OnFailure we restart containers that were running before we | ||||
| 			// killed them when restarting Infra Container. | ||||
| 			if pod.Spec.RestartPolicy != api.RestartPolicyNever { | ||||
| 				glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name) | ||||
| 				containersToStart[index] = empty{} | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// At this point, the container is running and pod infra container is good. | ||||
| 		// We will look for changes and check healthiness for the container. | ||||
| 		containerChanged := hash != 0 && hash != expectedHash | ||||
| 		if containerChanged { | ||||
| 			glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash) | ||||
| 			containersToStart[index] = empty{} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		result, err := kl.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) | ||||
| 		if err != nil { | ||||
| 			// TODO(vmarmol): examine this logic. | ||||
| 			glog.V(2).Infof("probe no-error: %q", container.Name) | ||||
| 			containersToKeep[containerID] = index | ||||
| 			continue | ||||
| 		} | ||||
| 		if result == probe.Success { | ||||
| 			glog.V(4).Infof("probe success: %q", container.Name) | ||||
| 			containersToKeep[containerID] = index | ||||
| 			continue | ||||
| 		} | ||||
| 		glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) | ||||
| 		containersToStart[index] = empty{} | ||||
| 	} | ||||
|  | ||||
| 	// After the loop one of the following should be true: | ||||
| 	// - createPodInfraContainer is true and containersToKeep is empty. | ||||
| 	// (In fact, when createPodInfraContainer is false, containersToKeep will not be touched). | ||||
| 	// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container | ||||
|  | ||||
| 	// If Infra container is the last running one, we don't want to keep it. | ||||
| 	if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 { | ||||
| 		containersToKeep = make(map[kubeletTypes.DockerID]int) | ||||
| 	} | ||||
|  | ||||
| 	return podContainerChangesSpec{ | ||||
| 		startInfraContainer: createPodInfraContainer, | ||||
| 		infraContainerId:    podInfraContainerID, | ||||
| 		containersToStart:   containersToStart, | ||||
| 		containersToKeep:    containersToKeep, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { | ||||
| 	podFullName := kubecontainer.GetPodFullName(pod) | ||||
| 	uid := pod.UID | ||||
| @@ -1110,14 +955,14 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	containerChanges, err := kl.computePodContainerChanges(pod, runningPod, podStatus) | ||||
| 	containerChanges, err := kl.containerManager.ComputePodContainerChanges(pod, runningPod, podStatus) | ||||
| 	glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) { | ||||
| 		if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 { | ||||
| 	if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { | ||||
| 		if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { | ||||
| 			glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName) | ||||
| 		} else { | ||||
| 			glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) | ||||
| @@ -1131,7 +976,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont | ||||
| 	} else { | ||||
| 		// Otherwise kill any containers in this pod which are not specified as ones to keep. | ||||
| 		for _, container := range runningPod.Containers { | ||||
| 			_, keep := containerChanges.containersToKeep[kubeletTypes.DockerID(container.ID)] | ||||
| 			_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] | ||||
| 			if !keep { | ||||
| 				glog.V(3).Infof("Killing unwanted container %+v", container) | ||||
| 				err = kl.containerManager.KillContainer(container.ID) | ||||
| @@ -1160,8 +1005,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont | ||||
| 	kl.volumeManager.SetVolumes(pod.UID, podVolumes) | ||||
|  | ||||
| 	// If we should create infra container then we do it first. | ||||
| 	podInfraContainerID := containerChanges.infraContainerId | ||||
| 	if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) { | ||||
| 	podInfraContainerID := containerChanges.InfraContainerId | ||||
| 	if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) { | ||||
| 		glog.V(4).Infof("Creating pod infra container for %q", podFullName) | ||||
| 		podInfraContainerID, err = kl.containerManager.CreatePodInfraContainer(pod, kl, kl.handlerRunner) | ||||
|  | ||||
| @@ -1176,7 +1021,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont | ||||
| 	} | ||||
|  | ||||
| 	// Start everything | ||||
| 	for container := range containerChanges.containersToStart { | ||||
| 	for container := range containerChanges.ContainersToStart { | ||||
| 		glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) | ||||
| 		containerSpec := &pod.Spec.Containers[container] | ||||
| 		if err := kl.pullImage(pod, containerSpec); err != nil { | ||||
|   | ||||
| @@ -108,7 +108,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { | ||||
| 	podManager, fakeMirrorClient := newFakePodManager() | ||||
| 	kubelet.podManager = podManager | ||||
| 	kubelet.containerRefManager = kubecontainer.NewRefManager() | ||||
| 	kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin) | ||||
| 	kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil) | ||||
| 	kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager) | ||||
| 	kubelet.podWorkers = newPodWorkers( | ||||
| 		kubelet.runtimeCache, | ||||
| @@ -120,6 +120,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { | ||||
| 		fakeRecorder) | ||||
| 	kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} | ||||
| 	kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder) | ||||
| 	kubelet.containerManager.Prober = kubelet.prober | ||||
| 	kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager) | ||||
| 	kubelet.volumeManager = newVolumeManager() | ||||
| 	return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} | ||||
|   | ||||
| @@ -26,6 +26,7 @@ import ( | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" | ||||
| 	kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/types" | ||||
| ) | ||||
|  | ||||
| @@ -42,7 +43,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { | ||||
| 	fakeDocker := &dockertools.FakeDockerClient{} | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) | ||||
| 	dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) | ||||
| 	dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) | ||||
| 	fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) | ||||
|  | ||||
| 	lock := sync.Mutex{} | ||||
|   | ||||
| @@ -18,13 +18,13 @@ package prober | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" | ||||
| 	execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec" | ||||
| 	httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http" | ||||
| @@ -42,12 +42,19 @@ type Prober interface { | ||||
| 	Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) | ||||
| } | ||||
|  | ||||
| type ContainerCommandRunner interface { | ||||
| 	RunInContainer(containerID string, cmd []string) ([]byte, error) | ||||
| 	ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error | ||||
| 	PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error | ||||
| } | ||||
|  | ||||
| // Prober helps to check the liveness/readiness of a container. | ||||
| type prober struct { | ||||
| 	exec   execprobe.ExecProber | ||||
| 	http   httprobe.HTTPProber | ||||
| 	tcp    tcprobe.TCPProber | ||||
| 	runner dockertools.ContainerCommandRunner | ||||
| 	exec execprobe.ExecProber | ||||
| 	http httprobe.HTTPProber | ||||
| 	tcp  tcprobe.TCPProber | ||||
| 	// TODO(vmarmol): Remove when we remove the circular dependency to DockerManager. | ||||
| 	Runner ContainerCommandRunner | ||||
|  | ||||
| 	readinessManager *kubecontainer.ReadinessManager | ||||
| 	refManager       *kubecontainer.RefManager | ||||
| @@ -57,7 +64,7 @@ type prober struct { | ||||
| // NewProber creates a Prober, it takes a command runner and | ||||
| // several container info managers. | ||||
| func New( | ||||
| 	runner dockertools.ContainerCommandRunner, | ||||
| 	runner ContainerCommandRunner, | ||||
| 	readinessManager *kubecontainer.ReadinessManager, | ||||
| 	refManager *kubecontainer.RefManager, | ||||
| 	recorder record.EventRecorder) Prober { | ||||
| @@ -66,7 +73,7 @@ func New( | ||||
| 		exec:   execprobe.New(), | ||||
| 		http:   httprobe.New(), | ||||
| 		tcp:    tcprobe.New(), | ||||
| 		runner: runner, | ||||
| 		Runner: runner, | ||||
|  | ||||
| 		readinessManager: readinessManager, | ||||
| 		refManager:       refManager, | ||||
| @@ -249,7 +256,7 @@ type execInContainer struct { | ||||
|  | ||||
| func (p *prober) newExecInContainer(pod *api.Pod, container api.Container, containerID string) exec.Cmd { | ||||
| 	return execInContainer{func() ([]byte, error) { | ||||
| 		return p.runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command) | ||||
| 		return p.Runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command) | ||||
| 	}} | ||||
| } | ||||
|  | ||||
|   | ||||
							
								
								
									
										31
									
								
								pkg/kubelet/prober/prober_fake.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								pkg/kubelet/prober/prober_fake.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,31 @@ | ||||
| /* | ||||
| Copyright 2015 Google Inc. All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package prober | ||||
|  | ||||
| import ( | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" | ||||
| ) | ||||
|  | ||||
| var _ Prober = &FakeProber{} | ||||
|  | ||||
| type FakeProber struct { | ||||
| } | ||||
|  | ||||
| func (fp *FakeProber) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { | ||||
| 	return probe.Success, nil | ||||
| } | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" | ||||
| 	kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" | ||||
| 	docker "github.com/fsouza/go-dockerclient" | ||||
| 	cadvisorApi "github.com/google/cadvisor/info/v1" | ||||
| ) | ||||
| @@ -158,7 +159,8 @@ func TestRunOnce(t *testing.T) { | ||||
| 		0, | ||||
| 		"", | ||||
| 		kubecontainer.FakeOS{}, | ||||
| 		kb.networkPlugin) | ||||
| 		kb.networkPlugin, | ||||
| 		&kubeletProber.FakeProber{}) | ||||
| 	kb.containerManager.Puller = &dockertools.FakeDockerPuller{} | ||||
|  | ||||
| 	pods := []*api.Pod{ | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Victor Marmol
					Victor Marmol