Revert "Gracefully delete pods from the Kubelet"
This commit is contained in:
@@ -30,7 +30,6 @@ import (
|
||||
func noDefault(*api.Pod) error { return nil }
|
||||
|
||||
func TestDecodeSinglePod(t *testing.T) {
|
||||
grace := int64(30)
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
APIVersion: "",
|
||||
@@ -41,9 +40,8 @@ func TestDecodeSinglePod(t *testing.T) {
|
||||
Namespace: "mynamespace",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "image",
|
||||
Image: "test/image",
|
||||
@@ -95,7 +93,6 @@ func TestDecodeSinglePod(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDecodePodList(t *testing.T) {
|
||||
grace := int64(30)
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
APIVersion: "",
|
||||
@@ -106,9 +103,8 @@ func TestDecodePodList(t *testing.T) {
|
||||
Namespace: "mynamespace",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "image",
|
||||
Image: "test/image",
|
||||
|
@@ -209,8 +209,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||
for _, ref := range filtered {
|
||||
name := kubecontainer.GetPodFullName(ref)
|
||||
if existing, found := pods[name]; found {
|
||||
if checkAndUpdatePod(existing, ref) {
|
||||
if !reflect.DeepEqual(existing.Spec, ref.Spec) {
|
||||
// this is an update
|
||||
existing.Spec = ref.Spec
|
||||
updates.Pods = append(updates.Pods, existing)
|
||||
continue
|
||||
}
|
||||
@@ -251,8 +252,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||
name := kubecontainer.GetPodFullName(ref)
|
||||
if existing, found := oldPods[name]; found {
|
||||
pods[name] = existing
|
||||
if checkAndUpdatePod(existing, ref) {
|
||||
if !reflect.DeepEqual(existing.Spec, ref.Spec) {
|
||||
// this is an update
|
||||
existing.Spec = ref.Spec
|
||||
updates.Pods = append(updates.Pods, existing)
|
||||
continue
|
||||
}
|
||||
@@ -323,23 +325,6 @@ func filterInvalidPods(pods []*api.Pod, source string, recorder record.EventReco
|
||||
return
|
||||
}
|
||||
|
||||
// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
|
||||
// returns false if there was no update.
|
||||
func checkAndUpdatePod(existing, ref *api.Pod) bool {
|
||||
// TODO: it would be better to update the whole object and only preserve certain things
|
||||
// like the source annotation or the UID (to ensure safety)
|
||||
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
|
||||
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
|
||||
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) {
|
||||
return false
|
||||
}
|
||||
// this is an update
|
||||
existing.Spec = ref.Spec
|
||||
existing.DeletionTimestamp = ref.DeletionTimestamp
|
||||
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
|
||||
return true
|
||||
}
|
||||
|
||||
// Sync sends a copy of the current state through the update channel.
|
||||
func (s *podStorage) Sync() {
|
||||
s.updateLock.Lock()
|
||||
|
@@ -163,7 +163,6 @@ func TestReadContainerManifestFromFile(t *testing.T) {
|
||||
|
||||
func TestReadPodsFromFile(t *testing.T) {
|
||||
hostname := "random-test-hostname"
|
||||
grace := int64(30)
|
||||
var testCases = []struct {
|
||||
desc string
|
||||
pod runtime.Object
|
||||
@@ -193,10 +192,9 @@ func TestReadPodsFromFile(t *testing.T) {
|
||||
SelfLink: getSelfLink("test-"+hostname, "mynamespace"),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "image",
|
||||
Image: "test/image",
|
||||
@@ -229,10 +227,9 @@ func TestReadPodsFromFile(t *testing.T) {
|
||||
SelfLink: getSelfLink("12345-"+hostname, kubelet.NamespaceDefault),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "image",
|
||||
Image: "test/image",
|
||||
|
@@ -120,7 +120,6 @@ func TestExtractInvalidManifest(t *testing.T) {
|
||||
func TestExtractPodsFromHTTP(t *testing.T) {
|
||||
hostname := "different-value"
|
||||
|
||||
grace := int64(30)
|
||||
var testCases = []struct {
|
||||
desc string
|
||||
pods runtime.Object
|
||||
@@ -154,11 +153,9 @@ func TestExtractPodsFromHTTP(t *testing.T) {
|
||||
SelfLink: getSelfLink("foo-"+hostname, "mynamespace"),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "1",
|
||||
Image: "foo",
|
||||
@@ -209,11 +206,9 @@ func TestExtractPodsFromHTTP(t *testing.T) {
|
||||
SelfLink: getSelfLink("foo-"+hostname, kubelet.NamespaceDefault),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "1",
|
||||
Image: "foo",
|
||||
@@ -231,11 +226,9 @@ func TestExtractPodsFromHTTP(t *testing.T) {
|
||||
SelfLink: getSelfLink("bar-"+hostname, kubelet.NamespaceDefault),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
|
||||
NodeName: hostname,
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
Containers: []api.Container{{
|
||||
Name: "2",
|
||||
Image: "bar",
|
||||
|
@@ -163,13 +163,13 @@ func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secr
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) KillPod(pod *api.Pod, runningPod Pod) error {
|
||||
func (f *FakeRuntime) KillPod(pod Pod) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
f.CalledFunctions = append(f.CalledFunctions, "KillPod")
|
||||
f.KilledPods = append(f.KilledPods, string(runningPod.ID))
|
||||
for _, c := range runningPod.Containers {
|
||||
f.KilledPods = append(f.KilledPods, string(pod.ID))
|
||||
for _, c := range pod.Containers {
|
||||
f.KilledContainers = append(f.KilledContainers, c.Name)
|
||||
}
|
||||
return f.Err
|
||||
|
@@ -53,8 +53,8 @@ type Runtime interface {
|
||||
GetPods(all bool) ([]*Pod, error)
|
||||
// Syncs the running pod into the desired pod.
|
||||
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error
|
||||
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
|
||||
KillPod(pod *api.Pod, runningPod Pod) error
|
||||
// KillPod kills all the containers of a pod.
|
||||
KillPod(pod Pod) error
|
||||
// GetPodStatus retrieves the status of the pod, including the information of
|
||||
// all containers in the pod.
|
||||
GetPodStatus(*api.Pod) (*api.PodStatus, error)
|
||||
|
@@ -54,17 +54,8 @@ const (
|
||||
|
||||
maxReasonCacheEntries = 200
|
||||
|
||||
// In order to avoid unnecessary SIGKILLs, give every container a minimum grace
|
||||
// period after SIGTERM. Docker will guarantee the termination, but SIGTERM is
|
||||
// potentially dangerous.
|
||||
// TODO: evaluate whether there are scenarios in which SIGKILL is preferable to
|
||||
// SIGTERM for certain process types, which may justify setting this to 0.
|
||||
minimumGracePeriodInSeconds = 2
|
||||
|
||||
kubernetesNameLabel = "io.kubernetes.pod.name"
|
||||
kubernetesPodLabel = "io.kubernetes.pod.data"
|
||||
kubernetesTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod"
|
||||
kubernetesContainerLabel = "io.kubernetes.container.name"
|
||||
kubernetesPodLabel = "io.kubernetes.pod.data"
|
||||
kubernetesContainerLabel = "io.kubernetes.container.name"
|
||||
)
|
||||
|
||||
// DockerManager implements the Runtime interface.
|
||||
@@ -573,19 +564,12 @@ func (dm *DockerManager) runContainer(
|
||||
if len(containerHostname) > hostnameMaxLen {
|
||||
containerHostname = containerHostname[:hostnameMaxLen]
|
||||
}
|
||||
|
||||
// Pod information is recorded on the container as labels to preserve it in the event the pod is deleted
|
||||
// while the Kubelet is down and there is no information available to recover the pod. This includes
|
||||
// termination information like the termination grace period and the pre stop hooks.
|
||||
// TODO: keep these labels up to date if the pod changes
|
||||
namespacedName := types.NamespacedName{pod.Namespace, pod.Name}
|
||||
labels := map[string]string{
|
||||
kubernetesNameLabel: namespacedName.String(),
|
||||
}
|
||||
if pod.Spec.TerminationGracePeriodSeconds != nil {
|
||||
labels[kubernetesTerminationGracePeriodLabel] = strconv.FormatInt(*pod.Spec.TerminationGracePeriodSeconds, 10)
|
||||
"io.kubernetes.pod.name": namespacedName.String(),
|
||||
}
|
||||
if container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
|
||||
glog.V(1).Infof("Setting preStop hook")
|
||||
// TODO: This is kind of hacky, we should really just encode the bits we need.
|
||||
data, err := latest.Codec.Encode(pod)
|
||||
if err != nil {
|
||||
@@ -1048,12 +1032,12 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
|
||||
}
|
||||
|
||||
// Kills all containers in the specified pod
|
||||
func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
|
||||
// Send the kills in parallel since they may take a long time. Len + 1 since there
|
||||
// can be Len errors + the networkPlugin teardown error.
|
||||
errs := make(chan error, len(runningPod.Containers)+1)
|
||||
errs := make(chan error, len(pod.Containers)+1)
|
||||
wg := sync.WaitGroup{}
|
||||
for _, container := range runningPod.Containers {
|
||||
for _, container := range pod.Containers {
|
||||
wg.Add(1)
|
||||
go func(container *kubecontainer.Container) {
|
||||
defer util.HandleCrash()
|
||||
@@ -1061,24 +1045,15 @@ func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) err
|
||||
// TODO: Handle this without signaling the pod infra container to
|
||||
// adapt to the generic container runtime.
|
||||
if container.Name == PodInfraContainerName {
|
||||
err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubeletTypes.DockerID(container.ID))
|
||||
err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID))
|
||||
if err != nil {
|
||||
glog.Errorf("Failed tearing down the infra container: %v", err)
|
||||
errs <- err
|
||||
}
|
||||
}
|
||||
var containerSpec *api.Container
|
||||
if pod != nil {
|
||||
for i, c := range pod.Spec.Containers {
|
||||
if c.Name == container.Name {
|
||||
containerSpec = &pod.Spec.Containers[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
err := dm.killContainer(container.ID, containerSpec, pod)
|
||||
err := dm.killContainer(container.ID)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
|
||||
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
|
||||
errs <- err
|
||||
}
|
||||
wg.Done()
|
||||
@@ -1096,128 +1071,75 @@ func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillContainerInPod kills a container in the pod. It must be passed either a container ID or a container and pod,
|
||||
// and will attempt to lookup the other information if missing.
|
||||
func (dm *DockerManager) KillContainerInPod(containerID types.UID, container *api.Container, pod *api.Pod) error {
|
||||
switch {
|
||||
case len(containerID) == 0:
|
||||
// Locate the container.
|
||||
pods, err := dm.GetPods(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID)
|
||||
targetContainer := targetPod.FindContainerByName(container.Name)
|
||||
if targetContainer == nil {
|
||||
return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name)
|
||||
}
|
||||
containerID = targetContainer.ID
|
||||
|
||||
case container == nil || pod == nil:
|
||||
// Read information about the container from labels
|
||||
inspect, err := dm.client.InspectContainer(string(containerID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storedPod, storedContainer, cerr := containerAndPodFromLabels(inspect)
|
||||
if cerr != nil {
|
||||
glog.Errorf("unable to access pod data from container: %v", err)
|
||||
}
|
||||
if container == nil {
|
||||
container = storedContainer
|
||||
}
|
||||
if pod == nil {
|
||||
pod = storedPod
|
||||
}
|
||||
// KillContainerInPod kills a container in the pod.
|
||||
func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Pod) error {
|
||||
// Locate the container.
|
||||
pods, err := dm.GetPods(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return dm.killContainer(containerID, container, pod)
|
||||
targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID)
|
||||
targetContainer := targetPod.FindContainerByName(container.Name)
|
||||
if targetContainer == nil {
|
||||
return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name)
|
||||
}
|
||||
return dm.killContainer(targetContainer.ID)
|
||||
}
|
||||
|
||||
// killContainer accepts a containerID and an optional container or pod containing shutdown policies. Invoke
|
||||
// KillContainerInPod if information must be retrieved first.
|
||||
func (dm *DockerManager) killContainer(containerID types.UID, container *api.Container, pod *api.Pod) error {
|
||||
// TODO(vmarmol): Unexport this as it is no longer used externally.
|
||||
// KillContainer kills a container identified by containerID.
|
||||
// Internally, it invokes docker's StopContainer API with a timeout of 10s.
|
||||
// TODO: Deprecate this function in favor of KillContainerInPod.
|
||||
func (dm *DockerManager) KillContainer(containerID types.UID) error {
|
||||
return dm.killContainer(containerID)
|
||||
}
|
||||
|
||||
func (dm *DockerManager) killContainer(containerID types.UID) error {
|
||||
ID := string(containerID)
|
||||
name := ID
|
||||
if container != nil {
|
||||
name = fmt.Sprintf("%s %s", name, container.Name)
|
||||
glog.V(2).Infof("Killing container with id %q", ID)
|
||||
inspect, err := dm.client.InspectContainer(ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pod != nil {
|
||||
name = fmt.Sprintf("%s %s/%s", name, pod.Namespace, pod.Name)
|
||||
var found bool
|
||||
var preStop string
|
||||
if inspect != nil && inspect.Config != nil && inspect.Config.Labels != nil {
|
||||
preStop, found = inspect.Config.Labels[kubernetesPodLabel]
|
||||
}
|
||||
|
||||
gracePeriod := int64(minimumGracePeriodInSeconds)
|
||||
if pod != nil && pod.DeletionGracePeriodSeconds != nil {
|
||||
gracePeriod = *pod.DeletionGracePeriodSeconds
|
||||
}
|
||||
glog.V(2).Infof("Killing container %q with %d second grace period", name, gracePeriod)
|
||||
|
||||
if pod != nil && container != nil && container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
|
||||
glog.V(4).Infof("Running preStop hook for container %q", name)
|
||||
start := util.Now()
|
||||
// TODO: timebox PreStop execution to at most gracePeriod
|
||||
if err := dm.runner.Run(ID, pod, container, container.Lifecycle.PreStop); err != nil {
|
||||
glog.Errorf("preStop hook for container %q failed: %v", name, err)
|
||||
}
|
||||
gracePeriod -= int64(util.Now().Sub(start.Time).Seconds())
|
||||
}
|
||||
|
||||
dm.readinessManager.RemoveReadiness(ID)
|
||||
|
||||
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
|
||||
if gracePeriod < minimumGracePeriodInSeconds {
|
||||
gracePeriod = minimumGracePeriodInSeconds
|
||||
}
|
||||
err := dm.client.StopContainer(ID, uint(gracePeriod))
|
||||
ref, ok := dm.containerRefManager.GetRef(ID)
|
||||
if !ok {
|
||||
glog.Warningf("No ref for pod '%q'", name)
|
||||
} else {
|
||||
// TODO: pass reason down here, and state, or move this call up the stack.
|
||||
dm.recorder.Eventf(ref, "killing", "Killing %v", ID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var errNoPodOnContainer = fmt.Errorf("no pod information labels on Docker container")
|
||||
|
||||
// containerAndPodFromLabels tries to load the appropriate container info off of a Docker container's labels
|
||||
func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, container *api.Container, err error) {
|
||||
if inspect == nil && inspect.Config == nil && inspect.Config.Labels == nil {
|
||||
return nil, nil, errNoPodOnContainer
|
||||
}
|
||||
labels := inspect.Config.Labels
|
||||
|
||||
// the pod data may not be set
|
||||
if body, found := labels[kubernetesPodLabel]; found {
|
||||
pod = &api.Pod{}
|
||||
if err = latest.Codec.DecodeInto([]byte(body), pod); err == nil {
|
||||
name := labels[kubernetesContainerLabel]
|
||||
if found {
|
||||
var pod api.Pod
|
||||
err := latest.Codec.DecodeInto([]byte(preStop), &pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to decode prestop: %s, %s", preStop, ID)
|
||||
} else {
|
||||
name := inspect.Config.Labels[kubernetesContainerLabel]
|
||||
var container *api.Container
|
||||
for ix := range pod.Spec.Containers {
|
||||
if pod.Spec.Containers[ix].Name == name {
|
||||
container = &pod.Spec.Containers[ix]
|
||||
break
|
||||
}
|
||||
}
|
||||
if container == nil {
|
||||
err = fmt.Errorf("unable to find container %s in pod %v", name, pod)
|
||||
}
|
||||
} else {
|
||||
pod = nil
|
||||
}
|
||||
}
|
||||
|
||||
// attempt to find the default grace period if we didn't commit a pod, but set the generic metadata
|
||||
// field (the one used by kill)
|
||||
if pod == nil {
|
||||
if period, ok := labels[kubernetesTerminationGracePeriodLabel]; ok {
|
||||
if seconds, err := strconv.ParseInt(period, 10, 64); err == nil {
|
||||
pod = &api.Pod{}
|
||||
pod.DeletionGracePeriodSeconds = &seconds
|
||||
if container != nil {
|
||||
glog.V(1).Infof("Running preStop hook")
|
||||
if err := dm.runner.Run(ID, &pod, container, container.Lifecycle.PreStop); err != nil {
|
||||
glog.Errorf("failed to run preStop hook: %v", err)
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("unable to find container %v, %s", pod, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
dm.readinessManager.RemoveReadiness(ID)
|
||||
err = dm.client.StopContainer(ID, 10)
|
||||
ref, ok := dm.containerRefManager.GetRef(ID)
|
||||
if !ok {
|
||||
glog.Warningf("No ref for pod '%v'", ID)
|
||||
} else {
|
||||
// TODO: pass reason down here, and state, or move this call up the stack.
|
||||
dm.recorder.Eventf(ref, "killing", "Killing %v", ID)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Run a single container from a pod. Returns the docker container ID
|
||||
@@ -1245,7 +1167,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
|
||||
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
|
||||
handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
|
||||
if handlerErr != nil {
|
||||
dm.killContainer(types.UID(id), container, pod)
|
||||
dm.killContainer(types.UID(id))
|
||||
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
|
||||
}
|
||||
}
|
||||
@@ -1358,11 +1280,6 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
|
||||
containersToKeep := make(map[kubeletTypes.DockerID]int)
|
||||
createPodInfraContainer := false
|
||||
|
||||
if pod.DeletionTimestamp != nil {
|
||||
glog.V(4).Infof("Pod is terminating %q", podFullName)
|
||||
return PodContainerChangesSpec{}, nil
|
||||
}
|
||||
|
||||
var err error
|
||||
var podInfraContainerID kubeletTypes.DockerID
|
||||
var changed bool
|
||||
@@ -1516,7 +1433,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
|
||||
}
|
||||
|
||||
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
||||
err = dm.KillPod(pod, runningPod)
|
||||
err = dm.KillPod(runningPod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1526,15 +1443,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
|
||||
_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
|
||||
if !keep {
|
||||
glog.V(3).Infof("Killing unwanted container %+v", container)
|
||||
// attempt to find the appropriate container policy
|
||||
var podContainer *api.Container
|
||||
for i, c := range pod.Spec.Containers {
|
||||
if c.Name == container.Name {
|
||||
podContainer = &pod.Spec.Containers[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
err = dm.KillContainerInPod(container.ID, podContainer, pod)
|
||||
err = dm.KillContainer(container.ID)
|
||||
if err != nil {
|
||||
glog.Errorf("Error killing container: %v", err)
|
||||
}
|
||||
|
@@ -417,7 +417,7 @@ func TestKillContainerInPod(t *testing.T) {
|
||||
manager.readinessManager.SetReadiness(c.ID, true)
|
||||
}
|
||||
|
||||
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
|
||||
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
// Assert the container has been stopped.
|
||||
@@ -490,14 +490,14 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
|
||||
manager.readinessManager.SetReadiness(c.ID, true)
|
||||
}
|
||||
|
||||
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
|
||||
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
// Assert the container has been stopped.
|
||||
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
|
||||
t.Errorf("container was not stopped correctly: %v", err)
|
||||
}
|
||||
verifyCalls(t, fakeDocker, []string{"list", "create_exec", "start_exec", "stop"})
|
||||
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "create_exec", "start_exec", "stop"})
|
||||
if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) {
|
||||
t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd)
|
||||
}
|
||||
@@ -534,7 +534,7 @@ func TestKillContainerInPodWithError(t *testing.T) {
|
||||
manager.readinessManager.SetReadiness(c.ID, true)
|
||||
}
|
||||
|
||||
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil {
|
||||
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err == nil {
|
||||
t.Errorf("expected error, found nil")
|
||||
}
|
||||
|
||||
@@ -1030,7 +1030,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Kill the container since pod infra container is not running.
|
||||
"stop",
|
||||
"inspect_container", "stop",
|
||||
// Create pod infra container.
|
||||
"create", "start", "inspect_container",
|
||||
// Create container.
|
||||
@@ -1105,7 +1105,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Kill the duplicated container.
|
||||
"stop",
|
||||
"inspect_container", "stop",
|
||||
})
|
||||
// Expect one of the duplicates to be killed.
|
||||
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
|
||||
@@ -1159,7 +1159,7 @@ func TestSyncPodBadHash(t *testing.T) {
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Kill and restart the bad hash container.
|
||||
"stop", "create", "start",
|
||||
"inspect_container", "stop", "create", "start",
|
||||
})
|
||||
|
||||
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
|
||||
@@ -1217,7 +1217,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Kill the unhealthy container.
|
||||
"stop",
|
||||
"inspect_container", "stop",
|
||||
// Restart the unhealthy container.
|
||||
"create", "start",
|
||||
})
|
||||
@@ -1426,7 +1426,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
|
||||
// Check the pod infra container.
|
||||
"inspect_container",
|
||||
// Stop the last pod infra container.
|
||||
"stop",
|
||||
"inspect_container", "stop",
|
||||
},
|
||||
[]string{},
|
||||
[]string{"9876"},
|
||||
|
@@ -1056,8 +1056,8 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
|
||||
}
|
||||
|
||||
// Kill all running containers in a pod (includes the pod infra container).
|
||||
func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
return kl.containerRuntime.KillPod(pod, runningPod)
|
||||
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
|
||||
return kl.containerRuntime.KillPod(pod)
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
@@ -1103,7 +1103,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
||||
// Kill pods we can't run.
|
||||
err := canRunPod(pod)
|
||||
if err != nil {
|
||||
kl.killPod(pod, runningPod)
|
||||
kl.killPod(runningPod)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1418,7 +1418,7 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
|
||||
}()
|
||||
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
|
||||
// Stop the containers.
|
||||
err = kl.killPod(nil, *pod)
|
||||
err = kl.killPod(*pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
|
||||
return
|
||||
|
@@ -1155,7 +1155,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
// Create the container.
|
||||
"create", "start",
|
||||
// Kill the container since event handler fails.
|
||||
"stop",
|
||||
"inspect_container", "stop",
|
||||
// Get pod status.
|
||||
"list", "inspect_container", "inspect_container",
|
||||
// Get pods for deleting orphaned volumes.
|
||||
|
@@ -671,11 +671,11 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
|
||||
}
|
||||
|
||||
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
|
||||
func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
|
||||
func (r *runtime) KillPod(pod kubecontainer.Pod) error {
|
||||
glog.V(4).Infof("Rkt is killing pod: name %q.", pod.Name)
|
||||
|
||||
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
|
||||
r.systemd.KillUnit(makePodServiceFileName(runningPod.ID), int32(syscall.SIGKILL))
|
||||
r.systemd.KillUnit(makePodServiceFileName(pod.ID), int32(syscall.SIGKILL))
|
||||
return r.systemd.Reload()
|
||||
}
|
||||
|
||||
@@ -880,7 +880,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
|
||||
|
||||
if restartPod {
|
||||
// TODO(yifan): Handle network plugin.
|
||||
if err := r.KillPod(pod, runningPod); err != nil {
|
||||
if err := r.KillPod(runningPod); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.RunPod(pod); err != nil {
|
||||
|
@@ -22,7 +22,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
@@ -136,24 +135,13 @@ func (s *statusManager) syncBatch() error {
|
||||
}
|
||||
// TODO: make me easier to express from client code
|
||||
statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
|
||||
if errors.IsNotFound(err) {
|
||||
glog.V(3).Infof("Pod %q was deleted on the server", pod.Name)
|
||||
return nil
|
||||
}
|
||||
if err == nil {
|
||||
statusPod.Status = status
|
||||
_, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
|
||||
// TODO: handle conflict as a retry, make that easier too.
|
||||
statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
|
||||
if err == nil {
|
||||
glog.V(3).Infof("Status for pod %q updated successfully", pod.Name)
|
||||
|
||||
if statusPod.DeletionTimestamp == nil || !allTerminated(statusPod.Status.ContainerStatuses) {
|
||||
return nil
|
||||
}
|
||||
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
|
||||
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,14 +151,3 @@ func (s *statusManager) syncBatch() error {
|
||||
s.DeletePodStatus(podFullName)
|
||||
return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err)
|
||||
}
|
||||
|
||||
// allTerminated returns true if every status is terminated, or the status list
|
||||
// is empty.
|
||||
func allTerminated(statuses []api.ContainerStatus) bool {
|
||||
for _, status := range statuses {
|
||||
if status.State.Terminated == nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
Reference in New Issue
Block a user