Add SyncPod() to DockerManager and use in Kubelet.
This allows us to abstract away the logic of syncing a pod by the runtime. It will allow other runtimes to perform their own sync as well.
This commit is contained in:
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/lifecycle"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
|
||||
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
|
||||
@@ -93,6 +94,15 @@ type DockerManager struct {
|
||||
// with prober.
|
||||
// Health check prober.
|
||||
Prober prober.Prober
|
||||
|
||||
// Generator of runtime container options.
|
||||
generator kubecontainer.RunContainerOptionsGenerator
|
||||
|
||||
// Runner of lifecycle events.
|
||||
runner kubecontainer.HandlerRunner
|
||||
|
||||
// Hooks injected into the container runtime.
|
||||
runtimeHooks kubecontainer.RuntimeHooks
|
||||
}
|
||||
|
||||
func NewDockerManager(
|
||||
@@ -106,7 +116,10 @@ func NewDockerManager(
|
||||
containerLogsDir string,
|
||||
osInterface kubecontainer.OSInterface,
|
||||
networkPlugin network.NetworkPlugin,
|
||||
prober prober.Prober) *DockerManager {
|
||||
prober prober.Prober,
|
||||
generator kubecontainer.RunContainerOptionsGenerator,
|
||||
httpClient kubeletTypes.HttpGetter,
|
||||
runtimeHooks kubecontainer.RuntimeHooks) *DockerManager {
|
||||
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
|
||||
// if there are any problems.
|
||||
dockerRoot := "/var/lib/docker"
|
||||
@@ -138,7 +151,7 @@ func NewDockerManager(
|
||||
}
|
||||
|
||||
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
|
||||
return &DockerManager{
|
||||
dm := &DockerManager{
|
||||
client: client,
|
||||
recorder: recorder,
|
||||
readinessManager: readinessManager,
|
||||
@@ -151,7 +164,11 @@ func NewDockerManager(
|
||||
containerLogsDir: containerLogsDir,
|
||||
networkPlugin: networkPlugin,
|
||||
Prober: prober,
|
||||
generator: generator,
|
||||
runtimeHooks: runtimeHooks,
|
||||
}
|
||||
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
|
||||
return dm
|
||||
}
|
||||
|
||||
// A cache which stores strings keyed by <pod_UID>_<container_name>.
|
||||
@@ -737,6 +754,7 @@ func (dm *DockerManager) ListImages() ([]kubecontainer.Image, error) {
|
||||
return images, nil
|
||||
}
|
||||
|
||||
// TODO(vmarmol): Consider unexporting.
|
||||
// PullImage pulls an image from network to local storage.
|
||||
func (dm *DockerManager) PullImage(image string) error {
|
||||
return dm.Puller.Pull(image)
|
||||
@@ -1061,6 +1079,7 @@ func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Po
|
||||
return dm.killContainer(targetContainer.ID)
|
||||
}
|
||||
|
||||
// TODO(vmarmol): Unexport this as it is no longer used externally.
|
||||
// KillContainer kills a container identified by containerID.
|
||||
// Internally, it invokes docker's StopContainer API with a timeout of 10s.
|
||||
// TODO: Deprecate this function in favor of KillContainerInPod.
|
||||
@@ -1084,14 +1103,15 @@ func (dm *DockerManager) killContainer(containerID types.UID) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(vmarmol): Unexport this as it is no longer used externally.
|
||||
// Run a single container from a pod. Returns the docker container ID
|
||||
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
|
||||
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
|
||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||
}
|
||||
|
||||
opts, err := generator.GenerateRunContainerOptions(pod, container, netMode, ipcMode)
|
||||
opts, err := dm.generator.GenerateRunContainerOptions(pod, container, netMode, ipcMode)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -1107,7 +1127,7 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
|
||||
}
|
||||
|
||||
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
||||
handlerErr := runner.Run(id, pod, container, container.Lifecycle.PostStart)
|
||||
handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
|
||||
if handlerErr != nil {
|
||||
dm.killContainer(types.UID(id))
|
||||
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
|
||||
@@ -1127,8 +1147,8 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
|
||||
return kubeletTypes.DockerID(id), err
|
||||
}
|
||||
|
||||
// CreatePodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
|
||||
func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (kubeletTypes.DockerID, error) {
|
||||
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
|
||||
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) {
|
||||
// Use host networking if specified.
|
||||
netNamespace := ""
|
||||
var ports []api.ContainerPort
|
||||
@@ -1172,7 +1192,7 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon
|
||||
dm.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
|
||||
}
|
||||
|
||||
id, err := dm.RunContainer(pod, container, generator, runner, netNamespace, "")
|
||||
id, err := dm.RunContainer(pod, container, netNamespace, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -1211,8 +1231,7 @@ type PodContainerChangesSpec struct {
|
||||
ContainersToKeep map[kubeletTypes.DockerID]int
|
||||
}
|
||||
|
||||
// TODO(vmarmol): This will soon be made non-public when its only use is internal.
|
||||
func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
|
||||
func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
uid := pod.UID
|
||||
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
|
||||
@@ -1318,3 +1337,97 @@ func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kub
|
||||
ContainersToKeep: containersToKeep,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Pull the image for the specified pod and container.
|
||||
func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container) error {
|
||||
present, err := dm.IsImagePresent(container.Image)
|
||||
if err != nil {
|
||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||
}
|
||||
if ref != nil {
|
||||
dm.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
||||
}
|
||||
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
|
||||
}
|
||||
|
||||
if !dm.runtimeHooks.ShouldPullImage(pod, container, present) {
|
||||
return nil
|
||||
}
|
||||
|
||||
err = dm.PullImage(container.Image)
|
||||
dm.runtimeHooks.ReportImagePull(pod, container, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync the running pod to match the specified desired pod.
|
||||
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) error {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus)
|
||||
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
|
||||
if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
|
||||
glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)
|
||||
} else {
|
||||
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
|
||||
}
|
||||
|
||||
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
||||
err = dm.KillPod(runningPod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Otherwise kill any containers in this pod which are not specified as ones to keep.
|
||||
for _, container := range runningPod.Containers {
|
||||
_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
|
||||
if !keep {
|
||||
glog.V(3).Infof("Killing unwanted container %+v", container)
|
||||
err = dm.KillContainer(container.ID)
|
||||
if err != nil {
|
||||
glog.Errorf("Error killing container: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we should create infra container then we do it first.
|
||||
podInfraContainerID := containerChanges.InfraContainerId
|
||||
if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
|
||||
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
|
||||
podInfraContainerID, err = dm.createPodInfraContainer(pod)
|
||||
|
||||
// Call the networking plugin
|
||||
if err == nil {
|
||||
err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
|
||||
}
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Start everything
|
||||
for container := range containerChanges.ContainersToStart {
|
||||
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
|
||||
containerSpec := &pod.Spec.Containers[container]
|
||||
if err := dm.pullImage(pod, containerSpec); err != nil {
|
||||
glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", containerSpec.Image, kubecontainer.GetPodFullName(pod), containerSpec.Name, err)
|
||||
continue
|
||||
}
|
||||
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
|
||||
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
|
||||
_, err := dm.RunContainer(pod, containerSpec, namespaceMode, namespaceMode)
|
||||
if err != nil {
|
||||
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
||||
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), containerSpec.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user