Change original PodStatus to APIPodStatus, and start using kubelet internal PodStatus in dockertools

This commit is contained in:
Random-Liu 2015-12-04 16:06:25 -08:00
parent b6f68df3c8
commit 3cbdf79f8c
17 changed files with 492 additions and 368 deletions

View File

@ -97,7 +97,7 @@ func (s *KubeletExecutorServer) runExecutor(
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
}
status, err := s.klet.GetRuntime().GetPodStatus(pod)
status, err := s.klet.GetRuntime().GetAPIPodStatus(pod)
if err != nil {
return nil, err
}

View File

@ -36,8 +36,8 @@ type FakeRuntime struct {
PodList []*Pod
AllPodList []*Pod
ImageList []Image
PodStatus api.PodStatus
RawPodStatus RawPodStatus
APIPodStatus api.PodStatus
PodStatus PodStatus
StartedPods []string
KilledPods []string
StartedContainers []string
@ -93,7 +93,7 @@ func (f *FakeRuntime) ClearCalls() {
f.CalledFunctions = []string{}
f.PodList = []*Pod{}
f.AllPodList = []*Pod{}
f.PodStatus = api.PodStatus{}
f.APIPodStatus = api.PodStatus{}
f.StartedPods = []string{}
f.KilledPods = []string{}
f.StartedContainers = []string{}
@ -165,7 +165,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
return f.PodList, f.Err
}
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secret, backOff *util.Backoff) error {
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ *PodStatus, _ []api.Secret, backOff *util.Backoff) error {
f.Lock()
defer f.Unlock()
@ -223,7 +223,16 @@ func (f *FakeRuntime) KillContainerInPod(container api.Container, pod *api.Pod)
return f.Err
}
func (f *FakeRuntime) GetPodStatus(*api.Pod) (*api.PodStatus, error) {
func (f *FakeRuntime) GetAPIPodStatus(*api.Pod) (*api.PodStatus, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetAPIPodStatus")
status := f.APIPodStatus
return &status, f.Err
}
func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error) {
f.Lock()
defer f.Unlock()
@ -232,22 +241,24 @@ func (f *FakeRuntime) GetPodStatus(*api.Pod) (*api.PodStatus, error) {
return &status, f.Err
}
func (f *FakeRuntime) GetRawPodStatus(uid types.UID, name, namespace string) (*RawPodStatus, error) {
func (f *FakeRuntime) ConvertPodStatusToAPIPodStatus(_ *api.Pod, _ *PodStatus) (*api.PodStatus, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetRawPodStatus")
status := f.RawPodStatus
f.CalledFunctions = append(f.CalledFunctions, "ConvertPodStatusToAPIPodStatus")
status := f.APIPodStatus
return &status, f.Err
}
func (f *FakeRuntime) ConvertRawToPodStatus(_ *api.Pod, _ *RawPodStatus) (*api.PodStatus, error) {
func (f *FakeRuntime) GetPodStatusAndAPIPodStatus(_ *api.Pod) (*PodStatus, *api.PodStatus, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "ConvertRawToPodStatus")
status := f.PodStatus
return &status, f.Err
// This is only a temporary function, it should be logged as GetAPIPodStatus
f.CalledFunctions = append(f.CalledFunctions, "GetAPIPodStatus")
apiPodStatus := f.APIPodStatus
podStatus := f.PodStatus
return &podStatus, &apiPodStatus, f.Err
}
func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {

View File

@ -43,7 +43,37 @@ type RunContainerOptionsGenerator interface {
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool {
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *PodStatus) bool {
podFullName := GetPodFullName(pod)
// Get all dead container status.
var resultStatus []*ContainerStatus
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name == container.Name && containerStatus.State == ContainerStateExited {
resultStatus = append(resultStatus, containerStatus)
}
}
// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {
glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure {
// Check the exit code of last run. Note: This assumes the result is sorted
// by the created time in reverse order.
if resultStatus[0].ExitCode == 0 {
glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
}
}
return true
}
// TODO (random-liu) This should be removed soon after rkt implements GetPodStatus.
func ShouldContainerBeRestartedOldVersion(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool {
podFullName := GetPodFullName(pod)
// Get all dead container status.
@ -72,6 +102,30 @@ func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
return true
}
// TODO (random-liu) Convert PodStatus to running Pod, should be deprecated soon
func ConvertPodStatusToRunningPod(podStatus *PodStatus) Pod {
runningPod := Pod{
ID: podStatus.ID,
Name: podStatus.Name,
Namespace: podStatus.Namespace,
}
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.State != ContainerStateRunning {
continue
}
container := &Container{
ID: containerStatus.ID,
Name: containerStatus.Name,
Image: containerStatus.Image,
Hash: containerStatus.Hash,
Created: containerStatus.CreatedAt.Unix(),
State: containerStatus.State,
}
runningPod.Containers = append(runningPod.Containers, container)
}
return runningPod
}
// HashContainer returns the hash of the container. It is used to compare
// the running container with its desired spec.
func HashContainer(container *api.Container) uint64 {

View File

@ -85,27 +85,27 @@ type Runtime interface {
// GarbageCollect removes dead containers using the specified container gc policy
GarbageCollect(gcPolicy ContainerGCPolicy) error
// Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error
// TODO (random-liu) The runningPod will be removed after #17420 is done.
SyncPod(pod *api.Pod, runningPod Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
KillPod(pod *api.Pod, runningPod Pod) error
// GetPodStatus retrieves the status of the pod, including the information of
// GetAPIPodStatus retrieves the api.PodStatus of the pod, including the information of
// all containers in the pod. Clients of this interface assume the
// containers' statuses in a pod always have a deterministic ordering
// (e.g., sorted by name).
// TODO: Rename this to GetAPIPodStatus, and eventually deprecate the
// function in favor of GetRawPodStatus.
GetPodStatus(*api.Pod) (*api.PodStatus, error)
// GetRawPodStatus retrieves the status of the pod, including the
GetAPIPodStatus(*api.Pod) (*api.PodStatus, error)
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visble in Runtime.
// TODO: Rename this to GetPodStatus to replace the original function.
GetRawPodStatus(uid types.UID, name, namespace string) (*RawPodStatus, error)
// ConvertRawToPodStatus converts the RawPodStatus object to api.PodStatus.
GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
// ConvertPodStatusToAPIPodStatus converts the PodStatus object to api.PodStatus.
// This function is needed because Docker generates some high-level and/or
// pod-level information for api.PodStatus (e.g., check whether the image
// exists to determine the reason).
// TODO: Deprecate this function once we generalize the logic for all
// container runtimes in kubelet.
ConvertRawToPodStatus(*api.Pod, *RawPodStatus) (*api.PodStatus, error)
// exists to determine the reason). We should try generalizing the logic
// for all container runtimes in kubelet and remove this funciton.
ConvertPodStatusToAPIPodStatus(*api.Pod, *PodStatus) (*api.PodStatus, error)
// Return both PodStatus and api.PodStatus, this is just a temporary function.
// TODO (random-liu) Remove this method later
GetPodStatusAndAPIPodStatus(*api.Pod) (*PodStatus, *api.PodStatus, error)
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
PullImage(image ImageSpec, pullSecrets []api.Secret) error
@ -213,17 +213,17 @@ func (c *ContainerID) UnmarshalJSON(data []byte) error {
return c.ParseString(string(data))
}
type ContainerStatus string
type ContainerState string
const (
ContainerStatusRunning ContainerStatus = "running"
ContainerStatusExited ContainerStatus = "exited"
// This unknown encompasses all the statuses that we currently don't care.
ContainerStatusUnknown ContainerStatus = "unknown"
ContainerStateRunning ContainerState = "running"
ContainerStateExited ContainerState = "exited"
// This unknown encompasses all the states that we currently don't care.
ContainerStateUnknown ContainerState = "unknown"
)
// Container provides the runtime information for a container, such as ID, hash,
// status of the container.
// state of the container.
type Container struct {
// The ID of the container, used by the container runtime to identify
// a container.
@ -239,13 +239,13 @@ type Container struct {
// The timestamp of the creation time of the container.
// TODO(yifan): Consider to move it to api.ContainerStatus.
Created int64
// Status is the status of the container.
Status ContainerStatus
// State is the state of the container.
State ContainerState
}
// RawPodStatus represents the status of the pod and its containers.
// api.PodStatus can be derived from examining RawPodStatus and api.Pod.
type RawPodStatus struct {
// PodStatus represents the status of the pod and its containers.
// api.PodStatus can be derived from examining PodStatus and api.Pod.
type PodStatus struct {
// ID of the pod.
ID types.UID
// Name of the pod.
@ -255,17 +255,17 @@ type RawPodStatus struct {
// IP of the pod.
IP string
// Status of containers in the pod.
ContainerStatuses []*RawContainerStatus
ContainerStatuses []*ContainerStatus
}
// RawPodContainer represents the status of a container.
type RawContainerStatus struct {
// ContainerStatus represents the status of a container.
type ContainerStatus struct {
// ID of the container.
ID ContainerID
// Name of the container.
Name string
// Status of the container.
Status ContainerStatus
State ContainerState
// Creation time of the container.
CreatedAt time.Time
// Start time of the container.
@ -279,7 +279,7 @@ type RawContainerStatus struct {
// ID of the image.
ImageID string
// Hash of the container, used for comparison.
Hash string
Hash uint64
// Number of times that the container has been restarted.
RestartCount int
// A string explains why container is in such a status.
@ -289,6 +289,28 @@ type RawContainerStatus struct {
Message string
}
// FindContainerStatusByName returns container status in the pod status with the given name.
// When there are multiple containers' statuses with the same name, the first match will be returned.
func (podStatus *PodStatus) FindContainerStatusByName(containerName string) *ContainerStatus {
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name == containerName {
return containerStatus
}
}
return nil
}
// Get container status of all the running containers in a pod
func (podStatus *PodStatus) GetRunningContainerStatuses() []*ContainerStatus {
runnningContainerStatues := []*ContainerStatus{}
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.State == ContainerStateRunning {
runnningContainerStatues = append(runnningContainerStatues, containerStatus)
}
}
return runnningContainerStatues
}
// Basic information about a container image.
type Image struct {
// ID of the image.

View File

@ -21,6 +21,8 @@ import (
"strings"
docker "github.com/fsouza/go-dockerclient"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
@ -28,16 +30,16 @@ import (
// This file contains helper functions to convert docker API types to runtime
// (kubecontainer) types.
func mapStatus(status string) kubecontainer.ContainerStatus {
// Parse the status string in docker.APIContainers. This could break when
func mapState(state string) kubecontainer.ContainerState {
// Parse the state string in docker.APIContainers. This could break when
// we upgrade docker.
switch {
case strings.HasPrefix(status, "Up"):
return kubecontainer.ContainerStatusRunning
case strings.HasPrefix(status, "Exited"):
return kubecontainer.ContainerStatusExited
case strings.HasPrefix(state, "Up"):
return kubecontainer.ContainerStateRunning
case strings.HasPrefix(state, "Exited"):
return kubecontainer.ContainerStateExited
default:
return kubecontainer.ContainerStatusUnknown
return kubecontainer.ContainerStateUnknown
}
}
@ -58,7 +60,11 @@ func toRuntimeContainer(c *docker.APIContainers) (*kubecontainer.Container, erro
Image: c.Image,
Hash: hash,
Created: c.Created,
Status: mapStatus(c.Status),
// (random-liu) docker uses status to indicate whether a container is running or exited.
// However, in kubernetes we usually use state to indicate whether a container is running or exited,
// while use status to indicate the comprehensive status of the container. So we have different naming
// norm here.
State: mapState(c.Status),
}, nil
}
@ -74,3 +80,31 @@ func toRuntimeImage(image *docker.APIImages) (*kubecontainer.Image, error) {
Size: image.VirtualSize,
}, nil
}
// convert ContainerStatus to api.ContainerStatus.
func containerStatusToAPIContainerStatus(containerStatus *kubecontainer.ContainerStatus) *api.ContainerStatus {
containerID := DockerPrefix + containerStatus.ID.ID
status := api.ContainerStatus{
Name: containerStatus.Name,
RestartCount: containerStatus.RestartCount,
Image: containerStatus.Image,
ImageID: containerStatus.ImageID,
ContainerID: containerID,
}
switch containerStatus.State {
case kubecontainer.ContainerStateRunning:
status.State.Running = &api.ContainerStateRunning{StartedAt: unversioned.NewTime(containerStatus.StartedAt)}
case kubecontainer.ContainerStateExited:
status.State.Terminated = &api.ContainerStateTerminated{
ExitCode: containerStatus.ExitCode,
Reason: containerStatus.Reason,
Message: containerStatus.Message,
StartedAt: unversioned.NewTime(containerStatus.StartedAt),
FinishedAt: unversioned.NewTime(containerStatus.FinishedAt),
ContainerID: containerID,
}
default:
status.State.Waiting = &api.ContainerStateWaiting{}
}
return &status
}

View File

@ -24,19 +24,19 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func TestMapStatus(t *testing.T) {
func TestMapState(t *testing.T) {
testCases := []struct {
input string
expected kubecontainer.ContainerStatus
expected kubecontainer.ContainerState
}{
{input: "Up 5 hours", expected: kubecontainer.ContainerStatusRunning},
{input: "Exited (0) 2 hours ago", expected: kubecontainer.ContainerStatusExited},
{input: "Created", expected: kubecontainer.ContainerStatusUnknown},
{input: "Random string", expected: kubecontainer.ContainerStatusUnknown},
{input: "Up 5 hours", expected: kubecontainer.ContainerStateRunning},
{input: "Exited (0) 2 hours ago", expected: kubecontainer.ContainerStateExited},
{input: "Created", expected: kubecontainer.ContainerStateUnknown},
{input: "Random string", expected: kubecontainer.ContainerStateUnknown},
}
for i, test := range testCases {
if actual := mapStatus(test.input); actual != test.expected {
if actual := mapState(test.input); actual != test.expected {
t.Errorf("Test[%d]: expected %q, got %q", i, test.expected, actual)
}
}
@ -56,7 +56,7 @@ func TestToRuntimeContainer(t *testing.T) {
Image: "bar_image",
Hash: 0x5678,
Created: 12345,
Status: kubecontainer.ContainerStatusRunning,
State: kubecontainer.ContainerStateRunning,
}
actual, err := toRuntimeContainer(original)

View File

@ -577,16 +577,16 @@ func TestFindContainersByPod(t *testing.T) {
Namespace: "ns",
Containers: []*kubecontainer.Container{
{
ID: kubetypes.DockerID("foobar").ContainerID(),
Name: "foobar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("foobar").ContainerID(),
Name: "foobar",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
{
ID: kubetypes.DockerID("baz").ContainerID(),
Name: "baz",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("baz").ContainerID(),
Name: "baz",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
},
},
@ -596,10 +596,10 @@ func TestFindContainersByPod(t *testing.T) {
Namespace: "ns",
Containers: []*kubecontainer.Container{
{
ID: kubetypes.DockerID("barbar").ContainerID(),
Name: "barbar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("barbar").ContainerID(),
Name: "barbar",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
},
},
@ -638,22 +638,22 @@ func TestFindContainersByPod(t *testing.T) {
Namespace: "ns",
Containers: []*kubecontainer.Container{
{
ID: kubetypes.DockerID("foobar").ContainerID(),
Name: "foobar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("foobar").ContainerID(),
Name: "foobar",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
{
ID: kubetypes.DockerID("barfoo").ContainerID(),
Name: "barfoo",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("barfoo").ContainerID(),
Name: "barfoo",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
{
ID: kubetypes.DockerID("baz").ContainerID(),
Name: "baz",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("baz").ContainerID(),
Name: "baz",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
},
},
@ -663,10 +663,10 @@ func TestFindContainersByPod(t *testing.T) {
Namespace: "ns",
Containers: []*kubecontainer.Container{
{
ID: kubetypes.DockerID("barbar").ContainerID(),
Name: "barbar",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("barbar").ContainerID(),
Name: "barbar",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
},
},
@ -676,10 +676,10 @@ func TestFindContainersByPod(t *testing.T) {
Namespace: "ns",
Containers: []*kubecontainer.Container{
{
ID: kubetypes.DockerID("bazbaz").ContainerID(),
Name: "bazbaz",
Hash: 0x1234,
Status: kubecontainer.ContainerStatusUnknown,
ID: kubetypes.DockerID("bazbaz").ContainerID(),
Name: "bazbaz",
Hash: 0x1234,
State: kubecontainer.ContainerStateUnknown,
},
},
},

View File

@ -250,7 +250,7 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do
// Docker likes to add a '/', so copy that behavior.
name := "/" + c.Name
f.Created = append(f.Created, name)
// The newest container should be in front, because we assume so in GetPodStatus()
// The newest container should be in front, because we assume so in GetAPIPodStatus()
f.ContainerList = append([]docker.APIContainers{
{ID: name, Names: []string{name}, Image: c.Config.Image, Labels: c.Config.Labels},
}, f.ContainerList...)
@ -299,7 +299,7 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
var newList []docker.APIContainers
for _, container := range f.ContainerList {
if container.ID == id {
// The newest exited container should be in front. Because we assume so in GetPodStatus()
// The newest exited container should be in front. Because we assume so in GetAPIPodStatus()
f.ExitedContainerList = append([]docker.APIContainers{container}, f.ExitedContainerList...)
continue
}

View File

@ -101,7 +101,7 @@ type DockerManager struct {
// deleted.
reasonCache reasonInfoCache
// TODO(yifan): Record the pull failure so we can eliminate the image checking
// in GetPodStatus()?
// in GetAPIPodStatus()?
// Lower level docker image puller.
dockerPuller DockerPuller
@ -307,13 +307,6 @@ var (
ErrContainerCannotRun = errors.New("ContainerCannotRun")
)
// Internal information kept for containers from inspection
type containerStatusResult struct {
status api.ContainerStatus
ip string
err error
}
// determineContainerIP determines the IP address of the given container. It is expected
// that the container passed is the infrastructure container of a pod and the responsibility
// of the caller to ensure that the correct container is passed.
@ -336,186 +329,144 @@ func (dm *DockerManager) determineContainerIP(podNamespace, podName string, cont
return result
}
func (dm *DockerManager) inspectContainer(dockerID, containerName string, pod *api.Pod) *containerStatusResult {
result := containerStatusResult{api.ContainerStatus{}, "", nil}
inspectResult, err := dm.client.InspectContainer(dockerID)
func (dm *DockerManager) inspectContainer(id string, podName, podNamespace string) (*kubecontainer.ContainerStatus, string, error) {
var ip string
iResult, err := dm.client.InspectContainer(id)
if err != nil {
result.err = err
return &result
}
// NOTE (pmorie): this is a seriously fishy if statement. A nil result from
// InspectContainer seems like it should should always be paired with a
// non-nil error in the result of InspectContainer.
if inspectResult == nil {
glog.Errorf("Received a nil result from InspectContainer without receiving an error for container ID %v", dockerID)
// Why did we not get an error?
return &result
return nil, ip, err
}
glog.V(4).Infof("Container inspect result: %+v", *iResult)
glog.V(4).Infof("Container inspect result: %+v", *inspectResult)
// TODO: Get k8s container name by parsing the docker name. This will be
// replaced by checking docker labels eventually.
dockerName, hash, err := ParseDockerName(iResult.Name)
if err != nil {
return nil, ip, fmt.Errorf("Unable to parse docker name %q", iResult.Name)
}
containerName := dockerName.ContainerName
var containerInfo *labelledContainerInfo
if containerInfo, err = getContainerInfoFromLabel(inspectResult.Config.Labels); err != nil {
glog.Errorf("Get labelled container info error for container %v: %v", dockerID, err)
if containerInfo, err = getContainerInfoFromLabel(iResult.Config.Labels); err != nil {
glog.Errorf("Get labelled container info error for container %v: %v", id, err)
}
result.status = api.ContainerStatus{
status := kubecontainer.ContainerStatus{
Name: containerName,
RestartCount: containerInfo.RestartCount,
Image: inspectResult.Config.Image,
ImageID: DockerPrefix + inspectResult.Image,
ContainerID: DockerPrefix + dockerID,
Image: iResult.Config.Image,
ImageID: DockerPrefix + iResult.Image,
ID: kubetypes.DockerID(id).ContainerID(),
ExitCode: iResult.State.ExitCode,
CreatedAt: iResult.Created,
Hash: hash,
}
if iResult.State.Running {
status.State = kubecontainer.ContainerStateRunning
status.StartedAt = iResult.State.StartedAt
if containerName == PodInfraContainerName {
ip = dm.determineContainerIP(podNamespace, podName, iResult)
}
return &status, ip, nil
}
if inspectResult.State.Running {
result.status.State.Running = &api.ContainerStateRunning{
StartedAt: unversioned.NewTime(inspectResult.State.StartedAt),
}
if containerName == PodInfraContainerName {
result.ip = dm.determineContainerIP(pod.Namespace, pod.Name, inspectResult)
}
} else if !inspectResult.State.FinishedAt.IsZero() || inspectResult.State.ExitCode != 0 {
// Find containers that have exited or failed to start.
if !iResult.State.FinishedAt.IsZero() || iResult.State.ExitCode != 0 {
// When a container fails to start State.ExitCode is non-zero, FinishedAt and StartedAt are both zero
reason := ""
message := inspectResult.State.Error
finishedAt := unversioned.NewTime(inspectResult.State.FinishedAt)
startedAt := unversioned.NewTime(inspectResult.State.StartedAt)
message := iResult.State.Error
finishedAt := iResult.State.FinishedAt
startedAt := iResult.State.StartedAt
// Note: An application might handle OOMKilled gracefully.
// In that case, the container is oom killed, but the exit
// code could be 0.
if inspectResult.State.OOMKilled {
if iResult.State.OOMKilled {
reason = "OOMKilled"
} else if inspectResult.State.ExitCode == 0 {
} else if iResult.State.ExitCode == 0 {
reason = "Completed"
} else if !inspectResult.State.FinishedAt.IsZero() {
} else if !iResult.State.FinishedAt.IsZero() {
reason = "Error"
} else {
// finishedAt is zero and ExitCode is nonZero occurs when docker fails to start the container
reason = ErrContainerCannotRun.Error()
// Adjust time to the time docker attempted to run the container, otherwise startedAt and finishedAt will be set to epoch, which is misleading
finishedAt = unversioned.NewTime(inspectResult.Created)
startedAt = unversioned.NewTime(inspectResult.Created)
}
result.status.State.Terminated = &api.ContainerStateTerminated{
ExitCode: inspectResult.State.ExitCode,
Message: message,
Reason: reason,
StartedAt: startedAt,
FinishedAt: finishedAt,
ContainerID: DockerPrefix + dockerID,
finishedAt = iResult.Created
startedAt = iResult.Created
}
terminationMessagePath := containerInfo.TerminationMessagePath
if terminationMessagePath != "" {
path, found := inspectResult.Volumes[terminationMessagePath]
if found {
data, err := ioutil.ReadFile(path)
if err != nil {
result.status.State.Terminated.Message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err)
if path, found := iResult.Volumes[terminationMessagePath]; found {
if data, err := ioutil.ReadFile(path); err != nil {
message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err)
} else {
result.status.State.Terminated.Message = string(data)
message = string(data)
}
}
}
status.State = kubecontainer.ContainerStateExited
status.Message = message
status.Reason = reason
status.StartedAt = startedAt
status.FinishedAt = finishedAt
} else {
// Non-running containers that are not terminatd could be pasued, or created (but not yet
// started), etc. Kubelet doesn't handle these scenarios yet.
status.State = kubecontainer.ContainerStateUnknown
}
return &result
return &status, "", nil
}
// GetPodStatus returns docker related status for all containers in the pod as
// well as the infrastructure container.
func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
// Now we retain restart count of container as a docker label. Each time a container
// restarts, pod will read the restart count from the latest dead container, increment
// it to get the new restart count, and then add a label with the new restart count on
// the newly started container.
// However, there are some limitations of this method:
// 1. When all dead containers were garbage collected, the container status could
// not get the historical value and would be *inaccurate*. Fortunately, the chance
// is really slim.
// 2. When working with old version containers which have no restart count label,
// we can only assume their restart count is 0.
// Anyhow, we only promised "best-effort" restart count reporting, we can just ignore
// these limitations now.
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
manifest := pod.Spec
var podStatus api.PodStatus
statuses := make(map[string]*api.ContainerStatus, len(pod.Spec.Containers))
expectedContainers := make(map[string]api.Container)
for _, container := range manifest.Containers {
expectedContainers[container.Name] = container
}
expectedContainers[PodInfraContainerName] = api.Container{}
// We have added labels like pod name and pod namespace, it seems that we can do filtered list here.
// However, there may be some old containers without these labels, so at least now we can't do that.
// TODO (random-liu) Add filter when we are sure that all the containers have the labels
containers, err := dm.client.ListContainers(docker.ListContainersOptions{All: true})
// GetAPIPodStatus returns docker related status for all containers in the pod
// spec.
func (dm *DockerManager) GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error) {
// Get the pod status.
podStatus, err := dm.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return nil, err
}
return dm.ConvertPodStatusToAPIPodStatus(pod, podStatus)
}
func (dm *DockerManager) ConvertPodStatusToAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) (*api.PodStatus, error) {
var apiPodStatus api.PodStatus
uid := pod.UID
statuses := make(map[string]*api.ContainerStatus, len(pod.Spec.Containers))
// Create a map of expected containers based on the pod spec.
expectedContainers := make(map[string]api.Container)
for _, container := range pod.Spec.Containers {
expectedContainers[container.Name] = container
}
containerDone := sets.NewString()
// Loop through list of running and exited docker containers to construct
// the statuses. We assume docker returns a list of containers sorted in
// reverse by time.
for _, value := range containers {
if len(value.Names) == 0 {
// NOTE: (random-liu) The Pod IP is generated in kubelet.generatePodStatus(), we have no podStatus.IP now
apiPodStatus.PodIP = podStatus.IP
for _, containerStatus := range podStatus.ContainerStatuses {
cName := containerStatus.Name
if _, ok := expectedContainers[cName]; !ok {
// This would also ignore the infra container.
continue
}
dockerName, _, err := ParseDockerName(value.Names[0])
if err != nil {
if containerDone.Has(cName) {
continue
}
if dockerName.PodFullName != podFullName {
continue
}
if uid != "" && dockerName.PodUID != uid {
continue
}
dockerContainerName := dockerName.ContainerName
_, found := expectedContainers[dockerContainerName]
if !found {
continue
}
if containerDone.Has(dockerContainerName) {
continue
}
// Inspect the container.
result := dm.inspectContainer(value.ID, dockerContainerName, pod)
if result.err != nil {
return nil, result.err
}
if containerStatus, found := statuses[dockerContainerName]; found {
// There should be no alive containers with the same name. Just in case.
if result.status.State.Terminated == nil {
continue
}
containerStatus.LastTerminationState = result.status.State
// Got the last termination state, we do not need to care about the other containers any more
containerDone.Insert(dockerContainerName)
continue
}
if dockerContainerName == PodInfraContainerName {
// Found network container
if result.status.State.Running != nil {
podStatus.PodIP = result.ip
}
status := containerStatusToAPIContainerStatus(containerStatus)
if existing, found := statuses[cName]; found {
existing.LastTerminationState = status.State
containerDone.Insert(cName)
} else {
statuses[dockerContainerName] = &result.status
statuses[cName] = status
}
}
// Handle the containers for which we cannot find any associated active or dead docker containers or are in restart backoff
// Fetch old containers statuses from old pod status.
oldStatuses := make(map[string]api.ContainerStatus, len(manifest.Containers))
oldStatuses := make(map[string]api.ContainerStatus, len(pod.Spec.Containers))
for _, status := range pod.Status.ContainerStatuses {
oldStatuses[status.Name] = status
}
for _, container := range manifest.Containers {
for _, container := range pod.Spec.Containers {
if containerStatus, found := statuses[container.Name]; found {
reasonInfo, ok := dm.reasonCache.Get(uid, container.Name)
if ok && reasonInfo.reason == kubecontainer.ErrCrashLoopBackOff.Error() {
@ -540,6 +491,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
}
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
reasonInfo, ok := dm.reasonCache.Get(uid, container.Name)
if !ok {
// default position for a container
// At this point there are no active or dead containers, the reasonCache is empty (no entry or the entry has expired)
@ -563,7 +515,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
statuses[container.Name] = &containerStatus
}
podStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
apiPodStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
for containerName, status := range statuses {
if status.State.Waiting != nil {
status.State.Running = nil
@ -573,13 +525,14 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
status.State.Waiting.Message = reasonInfo.message
}
}
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, *status)
apiPodStatus.ContainerStatuses = append(apiPodStatus.ContainerStatuses, *status)
}
// Sort the container statuses since clients of this interface expect the list
// of containers in a pod to behave like the output of `docker list`, which has a
// deterministic order.
sort.Sort(kubetypes.SortedContainerStatuses(podStatus.ContainerStatuses))
return &podStatus, nil
sort.Sort(kubetypes.SortedContainerStatuses(apiPodStatus.ContainerStatuses))
return &apiPodStatus, nil
}
// makeEnvList converts EnvVar list to a list of strings, in the form of
@ -961,11 +914,11 @@ func (dm *DockerManager) RemoveImage(image kubecontainer.ImageSpec) error {
}
// podInfraContainerChanged returns true if the pod infra container has changed.
func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) {
func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContainerStatus *kubecontainer.ContainerStatus) (bool, error) {
networkMode := ""
var ports []api.ContainerPort
dockerPodInfraContainer, err := dm.client.InspectContainer(podInfraContainer.ID.ID)
dockerPodInfraContainer, err := dm.client.InspectContainer(podInfraContainerStatus.ID.ID)
if err != nil {
return false, err
}
@ -992,7 +945,7 @@ func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContaine
Ports: ports,
ImagePullPolicy: podInfraContainerImagePullPolicy,
}
return podInfraContainer.Hash != kubecontainer.HashContainer(expectedPodInfraContainer), nil
return podInfraContainerStatus.Hash != kubecontainer.HashContainer(expectedPodInfraContainer), nil
}
type dockerVersion docker.APIVersion
@ -1270,7 +1223,8 @@ func (dm *DockerManager) GetContainerIP(containerID, interfaceName string) (stri
return string(out), nil
}
// Kills all containers in the specified pod
// TODO: (random-liu) Change running pod to pod status in the future. We can't do it now, because kubelet also uses this function without pod status.
// We can only deprecate this after refactoring kubelet.
func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time. Len + 1 since there
// can be Len errors + the networkPlugin teardown error.
@ -1673,7 +1627,7 @@ type PodContainerChangesSpec struct {
ContainersToKeep map[kubetypes.DockerID]int
}
func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, podStatus *kubecontainer.PodStatus) (PodContainerChangesSpec, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("computePodContainerChanges").Observe(metrics.SinceInMicroseconds(start))
@ -1689,33 +1643,33 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
var err error
var podInfraContainerID kubetypes.DockerID
var changed bool
podInfraContainer := runningPod.FindContainerByName(PodInfraContainerName)
if podInfraContainer != nil {
podInfraContainerStatus := podStatus.FindContainerStatusByName(PodInfraContainerName)
if podInfraContainerStatus != nil && podInfraContainerStatus.State == kubecontainer.ContainerStateRunning {
glog.V(4).Infof("Found pod infra container for %q", podFullName)
changed, err = dm.podInfraContainerChanged(pod, podInfraContainer)
changed, err = dm.podInfraContainerChanged(pod, podInfraContainerStatus)
if err != nil {
return PodContainerChangesSpec{}, err
}
}
createPodInfraContainer := true
if podInfraContainer == nil {
if podInfraContainerStatus == nil || podInfraContainerStatus.State != kubecontainer.ContainerStateRunning {
glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName)
} else if changed {
glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName)
} else {
glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName)
createPodInfraContainer = false
podInfraContainerID = kubetypes.DockerID(podInfraContainer.ID.ID)
podInfraContainerID = kubetypes.DockerID(podInfraContainerStatus.ID.ID)
containersToKeep[podInfraContainerID] = -1
}
for index, container := range pod.Spec.Containers {
expectedHash := kubecontainer.HashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
@ -1726,8 +1680,8 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
continue
}
containerID := kubetypes.DockerID(c.ID.ID)
hash := c.Hash
containerID := kubetypes.DockerID(containerStatus.ID.ID)
hash := containerStatus.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
if createPodInfraContainer {
@ -1753,7 +1707,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
continue
}
liveness, found := dm.livenessManager.Get(c.ID)
liveness, found := dm.livenessManager.Get(containerStatus.ID)
if !found || liveness == proberesults.Success {
containersToKeep[containerID] = index
continue
@ -1799,14 +1753,15 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
func (dm *DockerManager) SyncPod(pod *api.Pod, _ kubecontainer.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
}()
podFullName := kubecontainer.GetPodFullName(pod)
containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus)
containerChanges, err := dm.computePodContainerChanges(pod, podStatus)
if err != nil {
return err
}
@ -1823,31 +1778,33 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)
} else {
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
}
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
if err := dm.KillPod(pod, runningPod); err != nil {
// TODO: (random-liu) We'll use pod status directly in the future
if err := dm.KillPod(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus)); err != nil {
return err
}
} else {
// Otherwise kill any containers in this pod which are not specified as ones to keep.
for _, container := range runningPod.Containers {
_, keep := containerChanges.ContainersToKeep[kubetypes.DockerID(container.ID.ID)]
// Otherwise kill any running containers in this pod which are not specified as ones to keep.
runningContainerStatues := podStatus.GetRunningContainerStatuses()
for _, containerStatus := range runningContainerStatues {
_, keep := containerChanges.ContainersToKeep[kubetypes.DockerID(containerStatus.ID.ID)]
if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container)
// NOTE: (random-liu) Just log ID or log container status here?
glog.V(3).Infof("Killing unwanted container %+v", containerStatus)
// attempt to find the appropriate container policy
var podContainer *api.Container
var killMessage string
for i, c := range pod.Spec.Containers {
if c.Name == container.Name {
if c.Name == containerStatus.Name {
podContainer = &pod.Spec.Containers[i]
killMessage = containerChanges.ContainersToStart[i]
break
}
}
if err := dm.KillContainerInPod(container.ID, podContainer, pod, killMessage); err != nil {
if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage); err != nil {
glog.Errorf("Error killing container: %v", err)
return err
}
@ -1922,19 +1879,11 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
continue
}
}
containerStatuses := podStatus.ContainerStatuses
// podStatus is generated by GetPodStatus(). In GetPodStatus(), we make sure that ContainerStatuses
// contains statuses of all containers in pod.Spec.Containers.
// ContainerToStart is a subset of pod.Spec.Containers, we should always find a result here.
// For a new container, the RestartCount should be 0
restartCount := 0
for _, containerStatus := range containerStatuses {
// If the container's terminate state is not empty, it exited before. Increment the restart count.
if containerStatus.Name == container.Name && (containerStatus.State.Terminated != nil || containerStatus.LastTerminationState.Terminated != nil) {
restartCount = containerStatus.RestartCount + 1
break
}
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
@ -2021,33 +1970,18 @@ func getUidFromUser(id string) string {
return id
}
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus api.PodStatus, backOff *util.Backoff) bool {
var ts unversioned.Time
for _, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name != container.Name {
continue
}
// first failure
if containerStatus.State.Terminated != nil && !containerStatus.State.Terminated.FinishedAt.IsZero() {
ts = containerStatus.State.Terminated.FinishedAt
break
}
// state is waiting and the failure timestamp is in LastTerminationState
if (containerStatus.State.Waiting != nil) && (containerStatus.LastTerminationState.Terminated != nil) {
ts = containerStatus.LastTerminationState.Terminated.FinishedAt
break
}
}
// found a container that requires backoff
if !ts.IsZero() {
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *util.Backoff) bool {
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateExited && !containerStatus.FinishedAt.IsZero() {
ts := containerStatus.FinishedAt
// found a container that requires backoff
dockerName := KubeletContainerName{
PodFullName: kubecontainer.GetPodFullName(pod),
PodUID: pod.UID,
ContainerName: container.Name,
}
stableName, _ := BuildDockerName(dockerName, container)
if backOff.IsInBackOffSince(stableName, ts.Time) {
if backOff.IsInBackOffSince(stableName, ts) {
if ref, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {
dm.recorder.Eventf(ref, api.EventTypeWarning, kubecontainer.BackOffStartContainer, "Back-off restarting failed docker container")
}
@ -2056,7 +1990,8 @@ func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podSt
glog.Infof("%s", err.Error())
return true
}
backOff.Next(stableName, ts.Time)
backOff.Next(stableName, ts)
}
dm.clearReasonCache(pod, container)
return false
@ -2096,10 +2031,66 @@ func (dm *DockerManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy
return dm.containerGC.GarbageCollect(gcPolicy)
}
func (dm *DockerManager) GetRawPodStatus(uid types.UID, name, namespace string) (*kubecontainer.RawPodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
func (dm *DockerManager) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
podStatus := &kubecontainer.PodStatus{ID: uid, Name: name, Namespace: namespace}
// Now we retain restart count of container as a docker label. Each time a container
// restarts, pod will read the restart count from the latest dead container, increment
// it to get the new restart count, and then add a label with the new restart count on
// the newly started container.
// However, there are some limitations of this method:
// 1. When all dead containers were garbage collected, the container status could
// not get the historical value and would be *inaccurate*. Fortunately, the chance
// is really slim.
// 2. When working with old version containers which have no restart count label,
// we can only assume their restart count is 0.
// Anyhow, we only promised "best-effort" restart count reporting, we can just ignore
// these limitations now.
var containerStatuses []*kubecontainer.ContainerStatus
// We have added labels like pod name and pod namespace, it seems that we can do filtered list here.
// However, there may be some old containers without these labels, so at least now we can't do that.
// TODO (random-liu) Do only one list and pass in the list result in the future
// TODO (random-liu) Add filter when we are sure that all the containers have the labels
containers, err := dm.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return podStatus, err
}
// Loop through list of running and exited docker containers to construct
// the statuses. We assume docker returns a list of containers sorted in
// reverse by time.
// TODO: optimization: set maximum number of containers per container name to examine.
for _, c := range containers {
if len(c.Names) == 0 {
continue
}
dockerName, _, err := ParseDockerName(c.Names[0])
if err != nil {
continue
}
if dockerName.PodUID != uid {
continue
}
result, ip, err := dm.inspectContainer(c.ID, name, namespace)
if err != nil {
return podStatus, err
}
containerStatuses = append(containerStatuses, result)
if ip != "" {
podStatus.IP = ip
}
}
podStatus.ContainerStatuses = containerStatuses
return podStatus, nil
}
func (dm *DockerManager) ConvertRawToPodStatus(_ *api.Pod, _ *kubecontainer.RawPodStatus) (*api.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
func (dm *DockerManager) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodStatus, *api.PodStatus, error) {
// Get the pod status.
podStatus, err := dm.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return nil, nil, err
}
var apiPodStatus *api.PodStatus
apiPodStatus, err = dm.ConvertPodStatusToAPIPodStatus(pod, podStatus)
return podStatus, apiPodStatus, err
}

View File

@ -548,7 +548,7 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
}
runningPod := kubecontainer.Pods(runningPods).FindPodByID(pod.UID)
podStatus, err := dm.GetPodStatus(pod)
podStatus, apiPodStatus, err := dm.GetPodStatusAndAPIPodStatus(pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -556,7 +556,7 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
if backOff == nil {
backOff = util.NewBackOff(time.Second, time.Minute)
}
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff)
err = dm.SyncPod(pod, runningPod, *apiPodStatus, podStatus, []api.Secret{}, backOff)
if err != nil && !expectErr {
t.Errorf("unexpected error: %v", err)
} else if err == nil && expectErr {
@ -920,7 +920,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
}
runSyncPod(t, dm, fakeDocker, pod, nil, true)
statuses, err := dm.GetPodStatus(pod)
statuses, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Errorf("unable to get pod status")
}
@ -930,7 +930,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
if containerStatus.State.Running != nil && expectedStatusMap[c.Name].Running != nil {
expectedStatusMap[c.Name].Running.StartedAt = containerStatus.State.Running.StartedAt
}
assert.Equal(t, containerStatus.State, expectedStatusMap[c.Name], "for container %s", c.Name)
assert.Equal(t, expectedStatusMap[c.Name], containerStatus.State, "for container %s", c.Name)
}
}
@ -1050,7 +1050,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
}
}
func TestGetPodStatusWithLastTermination(t *testing.T) {
func TestGetAPIPodStatusWithLastTermination(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
containers := []api.Container{
{Name: "succeeded"},
@ -1131,7 +1131,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
runSyncPod(t, dm, fakeDocker, pod, nil, false)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
status, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -1245,10 +1245,8 @@ func TestSyncPodBackoff(t *testing.T) {
}
}
}
func TestGetPodCreationFailureReason(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
// Inject the creation failure error to docker.
failureReason := "RunContainerError"
fakeDocker.Errors = map[string]error{
@ -1275,7 +1273,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
runSyncPod(t, dm, fakeDocker, pod, nil, true)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
status, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -1320,7 +1318,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
}})
runSyncPod(t, dm, fakeDocker, pod, nil, true)
// Check if we can retrieve the pod status.
status, err := dm.GetPodStatus(pod)
status, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -1357,7 +1355,7 @@ func TestGetRestartCount(t *testing.T) {
// Helper function for verifying the restart count.
verifyRestartCount := func(pod *api.Pod, expectedCount int) api.PodStatus {
runSyncPod(t, dm, fakeDocker, pod, nil, false)
status, err := dm.GetPodStatus(pod)
status, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
@ -1369,7 +1367,7 @@ func TestGetRestartCount(t *testing.T) {
}
killOneContainer := func(pod *api.Pod) {
status, err := dm.GetPodStatus(pod)
status, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
@ -1684,7 +1682,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) {
}
}
func TestGetPodStatusSortedContainers(t *testing.T) {
func TestGetAPIPodStatusSortedContainers(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
specContainerList := []api.Container{}
expectedOrder := []string{}
@ -1723,7 +1721,7 @@ func TestGetPodStatusSortedContainers(t *testing.T) {
},
}
for i := 0; i < 5; i++ {
status, err := dm.GetPodStatus(pod)
status, err := dm.GetAPIPodStatus(pod)
if err != nil {
t.Fatalf("unexpected error %v", err)
}

View File

@ -1679,7 +1679,9 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
var podStatus api.PodStatus
var apiPodStatus api.PodStatus
var podStatus *kubecontainer.PodStatus
if updateType == kubetypes.SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
@ -1687,20 +1689,23 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
podStatus = pod.Status
podStatus.StartTime = &unversioned.Time{Time: start}
kl.statusManager.SetPodStatus(pod, podStatus)
apiPodStatus = pod.Status
apiPodStatus.StartTime = &unversioned.Time{Time: start}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
podStatus = &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
}
glog.V(3).Infof("Not generating pod status for new pod %q", podFullName)
} else {
var err error
podStatus, err = kl.generatePodStatus(pod)
// TODO (random-liu) It's strange that generatePodStatus generates some podStatus in
// the phase Failed, Pending etc, even with empty ContainerStatuses but still keep going
// on. Maybe need refactor here.
podStatusPtr, apiPodStatusPtr, err := kl.containerRuntime.GetPodStatusAndAPIPodStatus(pod)
if err != nil {
glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)
return err
}
apiPodStatus = *apiPodStatusPtr
podStatus = podStatusPtr
}
pullSecrets, err := kl.getPullSecretsForPod(pod)
@ -1709,7 +1714,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
return err
}
err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets, kl.backOff)
err = kl.containerRuntime.SyncPod(pod, runningPod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
if err != nil {
return err
}
@ -1724,7 +1729,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} else if kl.shaper != nil {
status, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
statusPtr, err := kl.containerRuntime.GetPodStatus(pod)
statusPtr, err := kl.containerRuntime.GetAPIPodStatus(pod)
if err != nil {
glog.Errorf("Error getting pod for bandwidth shaping")
return err
@ -1831,7 +1836,7 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
}
status, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
statusPtr, err := kl.containerRuntime.GetPodStatus(pod)
statusPtr, err := kl.containerRuntime.GetAPIPodStatus(pod)
if err != nil {
return err
}
@ -3067,6 +3072,8 @@ func getPodReadyCondition(spec *api.PodSpec, containerStatuses []api.ContainerSt
// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
// TODO (random-liu) api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions
// after refactoring, modify them later.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
start := time.Now()
@ -3088,7 +3095,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
}
spec := &pod.Spec
podStatus, err := kl.containerRuntime.GetPodStatus(pod)
podStatus, err := kl.containerRuntime.GetAPIPodStatus(pod)
if err != nil {
// Error handling

View File

@ -4026,7 +4026,7 @@ func TestCleanupBandwidthLimits(t *testing.T) {
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"},
expectedCalls: []string{"GetPodStatus"},
expectedCalls: []string{"GetAPIPodStatus"},
name: "pod running",
},
{
@ -4077,7 +4077,7 @@ func TestCleanupBandwidthLimits(t *testing.T) {
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectedCalls: []string{"GetPodStatus"},
expectedCalls: []string{"GetAPIPodStatus"},
name: "pod not running",
},
{
@ -4135,7 +4135,7 @@ func TestCleanupBandwidthLimits(t *testing.T) {
testKube := newTestKubelet(t)
testKube.kubelet.shaper = shaper
testKube.fakeRuntime.PodStatus = *test.status
testKube.fakeRuntime.APIPodStatus = *test.status
if test.cacheStatus {
for _, pod := range test.pods {

View File

@ -35,7 +35,7 @@ type HandlerRunner struct {
}
type podStatusProvider interface {
GetPodStatus(pod *api.Pod) (*api.PodStatus, error)
GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error)
}
func NewHandlerRunner(httpGetter kubetypes.HttpGetter, commandRunner kubecontainer.ContainerCommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
@ -86,7 +86,7 @@ func resolvePort(portReference intstr.IntOrString, container *api.Container) (in
func (hr *HandlerRunner) runHTTPHandler(pod *api.Pod, container *api.Container, handler *api.Handler) error {
host := handler.HTTPGet.Host
if len(host) == 0 {
status, err := hr.containerManager.GetPodStatus(pod)
status, err := hr.containerManager.GetAPIPodStatus(pod)
if err != nil {
glog.Errorf("Unable to get pod info, event handlers may be invalid.")
return err

View File

@ -53,8 +53,8 @@ type GenericPLEG struct {
}
type containerInfo struct {
podID types.UID
status kubecontainer.ContainerStatus
podID types.UID
state kubecontainer.ContainerState
}
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
@ -79,20 +79,20 @@ func (g *GenericPLEG) Start() {
go util.Until(g.relist, g.relistPeriod, util.NeverStop)
}
func generateEvent(podID types.UID, cid string, oldStatus, newStatus kubecontainer.ContainerStatus) *PodLifecycleEvent {
if newStatus == oldStatus {
func generateEvent(podID types.UID, cid string, oldState, newState kubecontainer.ContainerState) *PodLifecycleEvent {
if newState == oldState {
return nil
}
switch newStatus {
case kubecontainer.ContainerStatusRunning:
switch newState {
case kubecontainer.ContainerStateRunning:
return &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: cid}
case kubecontainer.ContainerStatusExited:
case kubecontainer.ContainerStateExited:
return &PodLifecycleEvent{ID: podID, Type: ContainerDied, Data: cid}
case kubecontainer.ContainerStatusUnknown:
case kubecontainer.ContainerStateUnknown:
// Don't generate any event if the status is unknown.
return nil
default:
panic(fmt.Sprintf("unrecognized container status: %v", newStatus))
panic(fmt.Sprintf("unrecognized container state: %v", newState))
}
return nil
}
@ -115,18 +115,18 @@ func (g *GenericPLEG) relist() {
for _, p := range pods {
for _, c := range p.Containers {
cid := c.ID.ID
// Get the of existing container info. Defaults to status unknown.
oldStatus := kubecontainer.ContainerStatusUnknown
// Get the of existing container info. Defaults to state unknown.
oldState := kubecontainer.ContainerStateUnknown
if info, ok := g.containers[cid]; ok {
oldStatus = info.status
oldState = info.state
}
// Generate an event if required.
glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldStatus, c.Status)
if e := generateEvent(p.ID, cid, oldStatus, c.Status); e != nil {
glog.V(7).Infof("GenericPLEG: %v/%v: %v -> %v", p.ID, cid, oldState, c.State)
if e := generateEvent(p.ID, cid, oldState, c.State); e != nil {
events = append(events, e)
}
// Write to the new cache.
containers[cid] = containerInfo{podID: p.ID, status: c.Status}
containers[cid] = containerInfo{podID: p.ID, state: c.State}
}
}

View File

@ -57,10 +57,10 @@ func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
return events
}
func createTestContainer(ID string, status kubecontainer.ContainerStatus) *kubecontainer.Container {
func createTestContainer(ID string, state kubecontainer.ContainerState) *kubecontainer.Container {
return &kubecontainer.Container{
ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID},
Status: status,
ID: kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID},
State: state,
}
}
@ -93,15 +93,15 @@ func TestRelisting(t *testing.T) {
{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStatusExited),
createTestContainer("c2", kubecontainer.ContainerStatusRunning),
createTestContainer("c3", kubecontainer.ContainerStatusUnknown),
createTestContainer("c1", kubecontainer.ContainerStateExited),
createTestContainer("c2", kubecontainer.ContainerStateRunning),
createTestContainer("c3", kubecontainer.ContainerStateUnknown),
},
},
{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStatusExited),
createTestContainer("c1", kubecontainer.ContainerStateExited),
},
},
}
@ -124,14 +124,14 @@ func TestRelisting(t *testing.T) {
{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStatusExited),
createTestContainer("c3", kubecontainer.ContainerStatusRunning),
createTestContainer("c2", kubecontainer.ContainerStateExited),
createTestContainer("c3", kubecontainer.ContainerStateRunning),
},
},
{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c4", kubecontainer.ContainerStatusRunning),
createTestContainer("c4", kubecontainer.ContainerStateRunning),
},
},
}

View File

@ -788,14 +788,14 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
var pods []*kubecontainer.Pod
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) {
var status kubecontainer.ContainerStatus
var state kubecontainer.ContainerState
switch {
case u.SubState == "running":
status = kubecontainer.ContainerStatusRunning
state = kubecontainer.ContainerStateRunning
default:
status = kubecontainer.ContainerStatusExited
state = kubecontainer.ContainerStateExited
}
if !all && status != kubecontainer.ContainerStatusRunning {
if !all && state != kubecontainer.ContainerStateRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)
@ -804,7 +804,7 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
continue
}
for _, c := range pod.Containers {
c.Status = status
c.State = state
}
pods = append(pods, pod)
}
@ -839,9 +839,9 @@ func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
return nil
}
// getPodStatus reads the service file and invokes 'rkt status $UUID' to get the
// getAPIPodStatus reads the service file and invokes 'rkt status $UUID' to get the
// pod's status.
func (r *Runtime) getPodStatus(serviceName string) (*api.PodStatus, error) {
func (r *Runtime) getAPIPodStatus(serviceName string) (*api.PodStatus, error) {
var status api.PodStatus
// TODO(yifan): Get rkt uuid from the service file name.
@ -865,10 +865,10 @@ func (r *Runtime) getPodStatus(serviceName string) (*api.PodStatus, error) {
return &status, nil
}
// GetPodStatus returns the status of the given pod.
func (r *Runtime) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
// GetAPIPodStatus returns the status of the given pod.
func (r *Runtime) GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error) {
serviceName := makePodServiceFileName(pod.UID)
return r.getPodStatus(serviceName)
return r.getAPIPodStatus(serviceName)
}
func (r *Runtime) Type() string {
@ -988,7 +988,7 @@ func (r *Runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
}
// SyncPod syncs the running pod to match the specified desired pod.
func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, _ *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
podFullName := format.Pod(pod)
// Add references to all containers.
@ -1003,7 +1003,7 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
if kubecontainer.ShouldContainerBeRestartedOldVersion(&container, pod, &podStatus) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276.
@ -1383,10 +1383,16 @@ func (r *Runtime) RemoveImage(image kubecontainer.ImageSpec) error {
return nil
}
func (r *Runtime) GetRawPodStatus(uid types.UID, name, namespace string) (*kubecontainer.RawPodStatus, error) {
func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
}
func (r *Runtime) ConvertRawToPodStatus(_ *api.Pod, _ *kubecontainer.RawPodStatus) (*api.PodStatus, error) {
func (r *Runtime) ConvertPodStatusToAPIPodStatus(_ *api.Pod, _ *kubecontainer.PodStatus) (*api.PodStatus, error) {
return nil, fmt.Errorf("Not implemented yet")
}
func (r *Runtime) GetPodStatusAndAPIPodStatus(pod *api.Pod) (*kubecontainer.PodStatus, *api.PodStatus, error) {
podStatus, err := r.GetAPIPodStatus(pod)
return nil, podStatus, err
}

View File

@ -132,7 +132,8 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
// isPodRunning returns true if all containers of a manifest are running.
func (kl *Kubelet) isPodRunning(pod *api.Pod, runningPod container.Pod) (bool, error) {
status, err := kl.containerRuntime.GetPodStatus(pod)
// TODO (random-liu) Change this to new pod status
status, err := kl.containerRuntime.GetAPIPodStatus(pod)
if err != nil {
glog.Infof("Failed to get the status of pod %q: %v", kubecontainer.GetPodFullName(pod), err)
return false, err