kubelet: Replace GetKubeletDockerContainers with GetPods in syncPod/SyncPods.

This commit is contained in:
Yifan Gu
2015-03-20 14:20:01 -07:00
parent eec6456f51
commit 13250c904f
7 changed files with 121 additions and 87 deletions

View File

@@ -839,31 +839,31 @@ func GetPods(client DockerInterface, all bool) ([]*container.Pod, error) {
// Group containers by pod. // Group containers by pod.
for _, c := range containers { for _, c := range containers {
if len(c.Names) == 0 { if len(c.Names) == 0 {
glog.Warningf("Cannog parse empty docker container name: %#v", c.Names) glog.Warningf("Cannot parse empty docker container name: %#v", c.Names)
continue continue
} }
podFullName, podUID, containerName, hash, err := ParseDockerName(c.Names[0]) dockerName, hash, err := ParseDockerName(c.Names[0])
if err != nil { if err != nil {
glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err) glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err)
continue continue
} }
pod, found := pods[podUID] pod, found := pods[dockerName.PodUID]
if !found { if !found {
name, namespace, err := parsePodFullName(podFullName) name, namespace, err := parsePodFullName(dockerName.PodFullName)
if err != nil { if err != nil {
glog.Warningf("Parse pod full name %q error: %v", podFullName, err) glog.Warningf("Parse pod full name %q error: %v", dockerName.PodFullName, err)
continue continue
} }
pod = &container.Pod{ pod = &container.Pod{
ID: podUID, ID: dockerName.PodUID,
Name: name, Name: name,
Namespace: namespace, Namespace: namespace,
} }
pods[podUID] = pod pods[dockerName.PodUID] = pod
} }
pod.Containers = append(pod.Containers, &container.Container{ pod.Containers = append(pod.Containers, &container.Container{
ID: types.UID(c.ID), ID: types.UID(c.ID),
Name: containerName, Name: dockerName.ContainerName,
Hash: hash, Hash: hash,
Created: c.Created, Created: c.Created,
}) })

View File

@@ -39,6 +39,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
@@ -977,8 +978,8 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
} }
// Kill a docker container // Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error { func (kl *Kubelet) killContainer(c *container.Container) error {
return kl.killContainerByID(dockerContainer.ID) return kl.killContainerByID(string(c.ID))
} }
func (kl *Kubelet) killContainerByID(ID string) error { func (kl *Kubelet) killContainerByID(ID string) error {
@@ -1072,7 +1073,7 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
} }
// Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs. // Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs.
func (kl *Kubelet) killContainersInPod(pod *api.Pod, dockerContainers dockertools.DockerContainers) (int, error) { func (kl *Kubelet) killContainersInPod(pod *api.Pod, runningPod container.Pod) (int, error) {
podFullName := GetPodFullName(pod) podFullName := GetPodFullName(pod)
count := 0 count := 0
@@ -1080,12 +1081,13 @@ func (kl *Kubelet) killContainersInPod(pod *api.Pod, dockerContainers dockertool
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
// TODO: Consider being more aggressive: kill all containers with this pod UID, period. // TODO: Consider being more aggressive: kill all containers with this pod UID, period.
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found { c := runningPod.FindContainerByName(container.Name)
if c != nil {
count++ count++
wg.Add(1) wg.Add(1)
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
err := kl.killContainer(dockerContainer) err := kl.killContainer(c)
if err != nil { if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, podFullName) glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, podFullName)
errs <- err errs <- err
@@ -1217,7 +1219,7 @@ type podContainerChangesSpec struct {
containersToKeep map[dockertools.DockerID]int containersToKeep map[dockertools.DockerID]int
} }
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) { func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) (podContainerChangesSpec, error) {
podFullName := GetPodFullName(pod) podFullName := GetPodFullName(pod)
uid := pod.UID uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
@@ -1231,9 +1233,12 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
containersToKeep := make(map[dockertools.DockerID]int) containersToKeep := make(map[dockertools.DockerID]int)
createPodInfraContainer := false createPodInfraContainer := false
var podStatus api.PodStatus var podStatus api.PodStatus
podInfraContainerID, found := kl.getPodInfraContainer(podFullName, uid, containersInPod)
if found { var podInfraContainerID dockertools.DockerID
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
glog.V(4).Infof("Found infra pod for %q", podFullName) glog.V(4).Infof("Found infra pod for %q", podFullName)
podInfraContainerID = dockertools.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1 containersToKeep[podInfraContainerID] = -1
podStatus, err = kl.GetPodStatus(podFullName) podStatus, err = kl.GetPodStatus(podFullName)
if err != nil { if err != nil {
@@ -1246,15 +1251,19 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
for index, container := range pod.Spec.Containers { for index, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container) expectedHash := dockertools.HashContainer(&container)
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID) c := runningPod.FindContainerByName(container.Name)
if c != nil {
containerID := dockertools.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
if !createPodInfraContainer { if !createPodInfraContainer {
// look for changes in the container. // look for changes in the container.
containerChanged := hash != 0 && hash != expectedHash containerChanged := hash != 0 && hash != expectedHash
if !containerChanged { if !containerChanged {
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer.ID, dockerContainer.Created) result, err := kl.probeContainer(pod, podStatus, container, string(c.ID), c.Created)
if err != nil { if err != nil {
// TODO(vmarmol): examine this logic. // TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name) glog.V(2).Infof("probe no-error: %q", container.Name)
@@ -1320,7 +1329,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
}, nil }, nil
} }
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error { func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
podFullName := GetPodFullName(pod) podFullName := GetPodFullName(pod)
uid := pod.UID uid := pod.UID
@@ -1334,7 +1343,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
} }
}() }()
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod) containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, runningPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil { if err != nil {
return err return err
@@ -1346,20 +1355,23 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
} else { } else {
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
} }
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
if podInfraContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found { // TODO(yifan): Replace with KillPod().
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
if err := kl.killContainer(podInfraContainer); err != nil { if err := kl.killContainer(podInfraContainer); err != nil {
glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err) glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err)
} }
} }
_, err = kl.killContainersInPod(pod, containersInPod) _, err = kl.killContainersInPod(pod, runningPod)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
// Otherwise kill any containers in this pod which are not specified as ones to keep. // Otherwise kill any containers in this pod which are not specified as ones to keep.
for id, container := range containersInPod { for _, container := range runningPod.Containers {
_, keep := containerChanges.containersToKeep[id] _, keep := containerChanges.containersToKeep[dockertools.DockerID(container.ID)]
if !keep { if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container) glog.V(3).Infof("Killing unwanted container %+v", container)
err = kl.killContainer(container) err = kl.killContainer(container)
@@ -1516,28 +1528,20 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
glog.V(4).Infof("Desired: %#v", pods) glog.V(4).Infof("Desired: %#v", pods)
var err error var err error
desiredContainers := make(map[dockertools.KubeletContainerName]empty)
desiredPods := make(map[types.UID]empty) desiredPods := make(map[types.UID]empty)
dockerContainers, err := kl.dockerCache.RunningContainers() runningPods, err := kl.dockerCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error listing containers: %#v", dockerContainers) glog.Errorf("Error listing containers: %#v", err)
return err return err
} }
// Check for any containers that need starting // Check for any containers that need starting
for ix := range pods { for ix := range pods {
pod := &pods[ix] pod := &pods[ix]
podFullName := GetPodFullName(pod)
uid := pod.UID uid := pod.UID
desiredPods[uid] = empty{} desiredPods[uid] = empty{}
// Add all containers (including net) to the map.
desiredContainers[dockertools.KubeletContainerName{podFullName, uid, dockertools.PodInfraContainerName}] = empty{}
for _, cont := range pod.Spec.Containers {
desiredContainers[dockertools.KubeletContainerName{podFullName, uid, cont.Name}] = empty{}
}
// Run the sync in an async manifest worker. // Run the sync in an async manifest worker.
_, hasMirrorPod := mirrorPods[podFullName] _, hasMirrorPod := mirrorPods[podFullName]
kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() { kl.podWorkers.UpdatePod(pod, hasMirrorPod, func() {
@@ -1561,31 +1565,27 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
// Kill any containers we don't need. // Kill any containers we don't need.
killed := []string{} killed := []string{}
for ix := range dockerContainers { for _, pod := range runningPods {
// Don't kill containers that are in the desired pods. if _, found := desiredPods[pod.ID]; found {
dockerName, _, err := dockertools.ParseDockerName(dockerContainers[ix].Names[0])
_, found := desiredPods[dockerName.PodUID]
if err == nil && found {
// syncPod() will handle this one. // syncPod() will handle this one.
continue continue
} }
_, ok := desiredContainers[*dockerName] // Kill all the containers in the unidentified pod.
if err != nil || !ok { for _, c := range pod.Containers {
// call the networking plugin for teardown // call the networking plugin for teardown
if dockerName.ContainerName == dockertools.PodInfraContainerName { if c.Name == dockertools.PodInfraContainerName {
name, namespace, _ := ParsePodFullName(dockerName.PodFullName) err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(c.ID))
err := kl.networkPlugin.TearDownPod(namespace, name, dockertools.DockerID(dockerContainers[ix].ID))
if err != nil { if err != nil {
glog.Errorf("Network plugin pre-delete method returned an error: %v", err) glog.Errorf("Network plugin pre-delete method returned an error: %v", err)
} }
} }
glog.V(1).Infof("Killing unwanted container %+v", *dockerName) glog.V(1).Infof("Killing unwanted container %+v", c)
err = kl.killContainer(dockerContainers[ix]) err = kl.killContainer(c)
if err != nil { if err != nil {
glog.Errorf("Error killing container %+v: %v", *dockerName, err) glog.Errorf("Error killing container %+v: %v", c, err)
} else { } else {
killed = append(killed, dockerContainers[ix].ID) killed = append(killed, string(c.ID))
} }
} }
} }

View File

@@ -38,6 +38,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
@@ -87,8 +88,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
waitGroup := new(sync.WaitGroup) waitGroup := new(sync.WaitGroup)
kubelet.podWorkers = newPodWorkers( kubelet.podWorkers = newPodWorkers(
fakeDockerCache, fakeDockerCache,
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error { func(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
err := kubelet.syncPod(pod, hasMirrorPod, containers) err := kubelet.syncPod(pod, hasMirrorPod, runningPod)
waitGroup.Done() waitGroup.Done()
return err return err
}, },
@@ -313,15 +314,49 @@ func TestKubeletDirsCompat(t *testing.T) {
} }
} }
func apiContainerToContainer(c docker.APIContainers) container.Container {
dockerName, hash, err := dockertools.ParseDockerName(c.Names[0])
if err != nil {
return container.Container{}
}
return container.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
}
}
func dockerContainersToPod(containers dockertools.DockerContainers) container.Pod {
var pod container.Pod
for _, c := range containers {
dockerName, hash, err := dockertools.ParseDockerName(c.Names[0])
if err != nil {
continue
}
pod.Containers = append(pod.Containers, &container.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
Image: c.Image,
})
// TODO(yifan): Only one evaluation is enough.
pod.ID = dockerName.PodUID
name, namespace, _ := ParsePodFullName(dockerName.PodFullName)
pod.Name = name
pod.Namespace = namespace
}
return pod
}
func TestKillContainerWithError(t *testing.T) { func TestKillContainerWithError(t *testing.T) {
containers := []docker.APIContainers{ containers := []docker.APIContainers{
{ {
ID: "1234", ID: "1234",
Names: []string{"/k8s_foo_qux_1234_42"}, Names: []string{"/k8s_foo_qux_new_1234_42"},
}, },
{ {
ID: "5678", ID: "5678",
Names: []string{"/k8s_bar_qux_5678_42"}, Names: []string{"/k8s_bar_qux_new_5678_42"},
}, },
} }
fakeDocker := &dockertools.FakeDockerClient{ fakeDocker := &dockertools.FakeDockerClient{
@@ -334,7 +369,8 @@ func TestKillContainerWithError(t *testing.T) {
kubelet.readiness.set(c.ID, true) kubelet.readiness.set(c.ID, true)
} }
kubelet.dockerClient = fakeDocker kubelet.dockerClient = fakeDocker
err := kubelet.killContainer(&fakeDocker.ContainerList[0]) c := apiContainerToContainer(fakeDocker.ContainerList[0])
err := kubelet.killContainer(&c)
if err == nil { if err == nil {
t.Errorf("expected error, found nil") t.Errorf("expected error, found nil")
} }
@@ -353,11 +389,11 @@ func TestKillContainer(t *testing.T) {
containers := []docker.APIContainers{ containers := []docker.APIContainers{
{ {
ID: "1234", ID: "1234",
Names: []string{"/k8s_foo_qux_1234_42"}, Names: []string{"/k8s_foo_qux_new_1234_42"},
}, },
{ {
ID: "5678", ID: "5678",
Names: []string{"/k8s_bar_qux_5678_42"}, Names: []string{"/k8s_bar_qux_new_5678_42"},
}, },
} }
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
@@ -371,7 +407,8 @@ func TestKillContainer(t *testing.T) {
kubelet.readiness.set(c.ID, true) kubelet.readiness.set(c.ID, true)
} }
err := kubelet.killContainer(&fakeDocker.ContainerList[0]) c := apiContainerToContainer(fakeDocker.ContainerList[0])
err := kubelet.killContainer(&c)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@@ -916,7 +953,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@@ -958,7 +995,7 @@ func TestSyncPodBadHash(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@@ -1013,7 +1050,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@@ -1704,7 +1741,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
} }
pods := []api.Pod{bound} pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers) err := kubelet.syncPod(&bound, false, dockerContainersToPod(dockerContainers))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@@ -3250,7 +3287,7 @@ func TestCreateMirrorPod(t *testing.T) {
pods := []api.Pod{pod} pods := []api.Pod{pod}
kl.podManager.SetPods(pods) kl.podManager.SetPods(pods)
hasMirrorPod := false hasMirrorPod := false
err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{}) err := kl.syncPod(&pod, hasMirrorPod, container.Pod{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }

View File

@@ -21,7 +21,6 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -139,22 +138,16 @@ func (self *podAndContainerCollector) Describe(ch chan<- *prometheus.Desc) {
} }
func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) { func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) {
runningContainers, err := self.containerCache.RunningContainers() runningPods, err := self.containerCache.GetPods()
if err != nil { if err != nil {
glog.Warning("Failed to get running container information while collecting metrics: %v", err) glog.Warning("Failed to get running container information while collecting metrics: %v", err)
return return
} }
// Get a set of running pods. runningContainers := 0
runningPods := make(map[types.UID]struct{}) for _, p := range runningPods {
for _, cont := range runningContainers { runningContainers += len(p.Containers)
containerName, _, err := dockertools.ParseDockerName(cont.Names[0])
if err != nil {
continue
}
runningPods[containerName.PodUID] = struct{}{}
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
runningPodCountDesc, runningPodCountDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
@@ -162,5 +155,5 @@ func (self *podAndContainerCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
runningContainerCountDesc, runningContainerCountDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(len(runningContainers))) float64(runningContainers))
} }

View File

@@ -22,13 +22,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error type syncPodFnType func(*api.Pod, bool, container.Pod) error
type podWorkers struct { type podWorkers struct {
// Protects all per worker fields. // Protects all per worker fields.
@@ -90,14 +91,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
glog.Errorf("Error updating docker cache: %v", err) glog.Errorf("Error updating docker cache: %v", err)
return return
} }
containers, err := p.dockerCache.RunningContainers() pods, err := p.dockerCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err) glog.Errorf("Error getting pods while syncing pod: %v", err)
return return
} }
err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod, err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod,
containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod))) container.Pods(pods).FindPodByID(newWork.pod.UID))
if err != nil { if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)

View File

@@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
@@ -46,7 +47,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
podWorkers := newPodWorkers( podWorkers := newPodWorkers(
fakeDockerCache, fakeDockerCache,
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error { func(pod *api.Pod, hasMirrorPod bool, runningPod container.Pod) error {
func() { func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()

View File

@@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/golang/glog" "github.com/golang/glog"
) )
@@ -91,11 +92,12 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
delay := retryDelay delay := retryDelay
retry := 0 retry := 0
for { for {
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) pods, err := dockertools.GetPods(kl.dockerClient, false)
if err != nil { if err != nil {
return fmt.Errorf("failed to get kubelet docker containers: %v", err) return fmt.Errorf("failed to get kubelet pods: %v", err)
} }
running, err := kl.isPodRunning(pod, dockerContainers) p := container.Pods(pods).FindPodByID(pod.UID)
running, err := kl.isPodRunning(pod, p)
if err != nil { if err != nil {
return fmt.Errorf("failed to check pod status: %v", err) return fmt.Errorf("failed to check pod status: %v", err)
} }
@@ -106,7 +108,7 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
glog.Infof("pod %q containers not running: syncing", pod.Name) glog.Infof("pod %q containers not running: syncing", pod.Name)
// We don't create mirror pods in this mode; pass a dummy boolean value // We don't create mirror pods in this mode; pass a dummy boolean value
// to sycnPod. // to sycnPod.
if err = kl.syncPod(&pod, false, dockerContainers); err != nil { if err = kl.syncPod(&pod, false, p); err != nil {
return fmt.Errorf("error syncing pod: %v", err) return fmt.Errorf("error syncing pod: %v", err)
} }
if retry >= RunOnceMaxRetries { if retry >= RunOnceMaxRetries {
@@ -121,14 +123,14 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
} }
// isPodRunning returns true if all containers of a manifest are running. // isPodRunning returns true if all containers of a manifest are running.
func (kl *Kubelet) isPodRunning(pod api.Pod, dockerContainers dockertools.DockerContainers) (bool, error) { func (kl *Kubelet) isPodRunning(pod api.Pod, runningPod container.Pod) (bool, error) {
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.UID, container.Name) c := runningPod.FindContainerByName(container.Name)
if !found { if c == nil {
glog.Infof("container %q not found", container.Name) glog.Infof("container %q not found", container.Name)
return false, nil return false, nil
} }
inspectResult, err := kl.dockerClient.InspectContainer(dockerContainer.ID) inspectResult, err := kl.dockerClient.InspectContainer(string(c.ID))
if err != nil { if err != nil {
glog.Infof("failed to inspect container %q: %v", container.Name, err) glog.Infof("failed to inspect container %q: %v", container.Name, err)
return false, err return false, err