Kubelet: add KillPod for new runtime API

This commit is contained in:
Pengfei Ni 2016-08-25 21:49:58 +08:00
parent b974c09819
commit 277070e267
5 changed files with 281 additions and 6 deletions

View File

@ -39,6 +39,8 @@ var (
ErrKillContainer = errors.New("KillContainerError")
ErrVerifyNonRoot = errors.New("VerifyNonRootError")
ErrRunInitContainer = errors.New("RunInitContainerError")
ErrCreatePodSandbox = errors.New("CreatePodSandboxError")
ErrKillPodSandbox = errors.New("KillPodSandboxError")
)
var (
@ -51,11 +53,13 @@ var (
type SyncAction string
const (
StartContainer SyncAction = "StartContainer"
KillContainer SyncAction = "KillContainer"
SetupNetwork SyncAction = "SetupNetwork"
TeardownNetwork SyncAction = "TeardownNetwork"
InitContainer SyncAction = "InitContainer"
StartContainer SyncAction = "StartContainer"
KillContainer SyncAction = "KillContainer"
SetupNetwork SyncAction = "SetupNetwork"
TeardownNetwork SyncAction = "TeardownNetwork"
InitContainer SyncAction = "InitContainer"
CreatePodSandbox SyncAction = "CreatePodSandbox"
KillPodSandbox SyncAction = "KillPodSandbox"
)
// SyncResult is the result of sync action.

View File

@ -24,13 +24,17 @@ import (
"os"
"path"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/types"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
)
@ -302,6 +306,125 @@ func (m *kubeGenericRuntimeManager) getKubeletContainerStatuses(podSandboxID str
return statuses, nil
}
// generateContainerEvent generates an event for the container.
func (m *kubeGenericRuntimeManager) generateContainerEvent(containerID kubecontainer.ContainerID, eventType, reason, message string) {
ref, ok := m.containerRefManager.GetRef(containerID)
if !ok {
glog.Warningf("No ref for container %q", containerID)
return
}
m.recorder.Event(ref, eventType, reason, message)
}
// executePreStopHook runs the pre-stop lifecycle hooks if applicable and returns the duration it takes.
func (m *kubeGenericRuntimeManager) executePreStopHook(pod *api.Pod, containerID kubecontainer.ContainerID, containerSpec *api.Container, gracePeriod int64) int64 {
glog.V(3).Infof("Running preStop hook for container %q", containerID.String())
start := unversioned.Now()
done := make(chan struct{})
go func() {
defer close(done)
defer utilruntime.HandleCrash()
if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {
glog.Errorf("preStop hook for container %q failed: %v", containerSpec.Name, err)
m.generateContainerEvent(containerID, api.EventTypeWarning, events.FailedPreStopHook, msg)
}
}()
select {
case <-time.After(time.Duration(gracePeriod) * time.Second):
glog.V(2).Infof("preStop hook for container %q did not complete in %d seconds", containerID, gracePeriod)
case <-done:
glog.V(3).Infof("preStop hook for container %q completed", containerID)
}
return int64(unversioned.Now().Sub(start.Time).Seconds())
}
// killContainer kills a container through the following steps:
// * Run the pre-stop lifecycle hooks (if applicable).
// * Stop the container.
func (m *kubeGenericRuntimeManager) killContainer(pod *api.Pod, containerID kubecontainer.ContainerID, containerSpec *api.Container, reason string, gracePeriodOverride *int64) error {
gracePeriod := int64(minimumGracePeriodInSeconds)
if pod != nil {
switch {
case pod.DeletionGracePeriodSeconds != nil:
gracePeriod = *pod.DeletionGracePeriodSeconds
case pod.Spec.TerminationGracePeriodSeconds != nil:
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
}
glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)
// Run the pre-stop lifecycle hooks if applicable.
if pod != nil && containerSpec != nil && containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil {
gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
}
if gracePeriodOverride == nil {
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
} else {
gracePeriod = *gracePeriodOverride
glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
}
err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
if err != nil {
glog.Errorf("Container %q termination failed with gracePeriod %d: %v", containerID.String(), gracePeriod, err)
} else {
glog.V(3).Infof("Container %q exited normally", containerID.String())
}
message := fmt.Sprintf("Killing container with id %s", containerID.String())
if reason != "" {
message = fmt.Sprint(message, ":", reason)
}
m.generateContainerEvent(containerID, api.EventTypeNormal, events.KillingContainer, message)
m.containerRefManager.ClearRef(containerID)
return err
}
// killContainersWithSyncResult kills all pod's containers with sync results.
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}
wg.Add(len(runningPod.Containers))
for _, container := range runningPod.Containers {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer wg.Done()
var containerSpec *api.Container
if pod != nil {
for i, c := range pod.Spec.Containers {
if container.Name == c.Name {
containerSpec = &pod.Spec.Containers[i]
break
}
}
}
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(pod, container.ID, containerSpec, "Need to kill Pod", gracePeriodOverride); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
}
containerResults <- killContainerResult
}(container)
}
wg.Wait()
close(containerResults)
for containerResult := range containerResults {
syncResults = append(syncResults, containerResult)
}
return
}
// AttachContainer attaches to the container's console
func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
return fmt.Errorf("not implemented")

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
kubetypes "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
@ -45,6 +46,8 @@ const (
kubeRuntimeAPIVersion = "0.1.0"
// The root directory for pod logs
podLogsRootDirectory = "/var/log/pods"
// A minimal shutdown window for avoiding unnecessary SIGKILLs
minimumGracePeriodInSeconds = 2
)
var (
@ -307,7 +310,77 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus,
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
// it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
func (m *kubeGenericRuntimeManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
return fmt.Errorf("not implemented")
err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride)
return err.Error()
}
// killPodWithSyncResult kills a runningPod and returns SyncResult.
// Note: The pod passed in could be *nil* when kubelet restarted.
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
for _, containerResult := range killContainerResults {
result.AddSyncResult(containerResult)
}
// Teardown network plugin
if len(runningPod.Sandboxes) == 0 {
glog.V(4).Infof("Can not find pod sandbox by UID %q, assuming already removed.", runningPod.ID)
return
}
sandboxID := runningPod.Sandboxes[0].ID.ID
isHostNetwork, err := m.isHostNetwork(sandboxID, pod)
if err != nil {
result.Fail(err)
return
}
if !isHostNetwork {
teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, pod.UID)
result.AddSyncResult(teardownNetworkResult)
// Tear down network plugin with sandbox id
if err := m.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubecontainer.ContainerID{
Type: m.runtimeName,
ID: sandboxID,
}); err != nil {
message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v",
format.Pod(pod), m.networkPlugin.Name(), err)
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message)
glog.Error(message)
}
}
// stop sandbox, the sandbox will be removed in GarbageCollect
killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
result.AddSyncResult(killSandboxResult)
// Stop all sandboxes belongs to same pod
for _, podSandbox := range runningPod.Sandboxes {
if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
glog.Errorf("Failed to stop sandbox %q", podSandbox.ID)
}
}
return
}
// isHostNetwork checks whether the pod is running in host-network mode.
func (m *kubeGenericRuntimeManager) isHostNetwork(podSandBoxID string, pod *api.Pod) (bool, error) {
if pod != nil {
return kubecontainer.IsHostNetworkPod(pod), nil
}
podStatus, err := m.runtimeService.PodSandboxStatus(podSandBoxID)
if err != nil {
return false, err
}
if podStatus.Linux != nil && podStatus.Linux.Namespaces != nil && podStatus.Linux.Namespaces.Options != nil {
if podStatus.Linux.Namespaces.Options.HostNetwork != nil {
return podStatus.Linux.Namespaces.Options.GetHostNetwork(), nil
}
}
return false, nil
}
// GetPodStatus retrieves the status of the pod, including the

View File

@ -321,3 +321,75 @@ func TestGetNetNS(t *testing.T) {
assert.Equal(t, "", actual)
assert.Equal(t, "not supported", err.Error())
}
func TestKillPod(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "foo",
Namespace: "new",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo1",
Image: "busybox",
},
{
Name: "foo2",
Image: "busybox",
},
},
},
}
// Set fake sandbox and fake containers to fakeRuntime.
fakeSandbox, fakeContainers, err := makeAndSetFakePod(m, fakeRuntime, pod)
assert.NoError(t, err)
// Convert the fakeContainers to kubecontainer.Container
containers := make([]*kubecontainer.Container, len(fakeContainers))
for i := range containers {
fakeContainer := fakeContainers[i]
c, err := m.toKubeContainer(&runtimeApi.Container{
Id: fakeContainer.Id,
Metadata: fakeContainer.Metadata,
State: fakeContainer.State,
Image: fakeContainer.Image,
ImageRef: fakeContainer.ImageRef,
Labels: fakeContainer.Labels,
})
if err != nil {
t.Fatalf("unexpected error %v", err)
}
containers[i] = c
}
runningPod := kubecontainer.Pod{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
Containers: []*kubecontainer.Container{containers[0], containers[1]},
Sandboxes: []*kubecontainer.Container{
{
ID: kubecontainer.ContainerID{
ID: fakeSandbox.GetId(),
Type: apitest.FakeRuntimeName,
},
},
},
}
err = m.KillPod(pod, runningPod, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(fakeRuntime.Containers))
assert.Equal(t, 1, len(fakeRuntime.Sandboxes))
for _, sandbox := range fakeRuntime.Sandboxes {
assert.Equal(t, runtimeApi.PodSandBoxState_NOTREADY, sandbox.GetState())
}
for _, c := range fakeRuntime.Containers {
assert.Equal(t, runtimeApi.ContainerState_EXITED, c.GetState())
}
}

View File

@ -68,12 +68,15 @@ type NetworkPlugin interface {
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
// TODO: rename podInfraContainerID to sandboxID
SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error
// TearDownPod is the method called before a pod's infra container will be deleted
// TODO: rename podInfraContainerID to sandboxID
TearDownPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error
// Status is the method called to obtain the ipv4 or ipv6 addresses of the container
// TODO: rename podInfraContainerID to sandboxID
GetPodNetworkStatus(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) (*PodNetworkStatus, error)
// NetworkStatus returns error if the network plugin is in error state