kubelet: move PodManager and MirrorClient to a subpackage

This change moves pod_manager.go and mirror_client.go to a separate package.
Also made necessary, minor changes to facilitate testing.
This commit is contained in:
Yu-Ju Hong 2015-10-12 16:28:23 -07:00
parent 9766f25902
commit 2c76c55bb9
10 changed files with 140 additions and 120 deletions

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
@ -393,7 +394,7 @@ func NewMainKubelet(
klet.lastTimestampRuntimeUp = time.Now() klet.lastTimestampRuntimeUp = time.Now()
klet.runner = klet.containerRuntime klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient) klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
klet.prober = prober.New(klet.runner, containerRefManager, recorder) klet.prober = prober.New(klet.runner, containerRefManager, recorder)
klet.probeManager = prober.NewManager( klet.probeManager = prober.NewManager(
@ -455,7 +456,7 @@ type Kubelet struct {
// safe and should only be access by the main kubelet syncloop goroutine. // safe and should only be access by the main kubelet syncloop goroutine.
sourcesSeen sets.String sourcesSeen sets.String
podManager podManager podManager kubepod.Manager
// Needed to report events for containers belonging to deleted/modified pods. // Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events // Tracks references for reporting events
@ -1271,7 +1272,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
// Before returning, regenerate status and store it in the cache. // Before returning, regenerate status and store it in the cache.
defer func() { defer func() {
if isStaticPod(pod) && mirrorPod == nil { if kubepod.IsStaticPod(pod) && mirrorPod == nil {
// No need to cache the status because the mirror pod does not // No need to cache the status because the mirror pod does not
// exist yet. // exist yet.
return return
@ -1398,7 +1399,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} }
} }
if isStaticPod(pod) { if kubepod.IsStaticPod(pod) {
if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove // The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later. // it. The mirror pod will get recreated later.
@ -2015,7 +2016,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
sort.Sort(podsByCreationTime(pods)) sort.Sort(podsByCreationTime(pods))
for _, pod := range pods { for _, pod := range pods {
kl.podManager.AddPod(pod) kl.podManager.AddPod(pod)
if isMirrorPod(pod) { if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start) kl.handleMirrorPod(pod, start)
continue continue
} }
@ -2040,7 +2041,7 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
start := time.Now() start := time.Now()
for _, pod := range pods { for _, pod := range pods {
kl.podManager.UpdatePod(pod) kl.podManager.UpdatePod(pod)
if isMirrorPod(pod) { if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start) kl.handleMirrorPod(pod, start)
continue continue
} }
@ -2055,7 +2056,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
start := time.Now() start := time.Now()
for _, pod := range pods { for _, pod := range pods {
kl.podManager.DeletePod(pod) kl.podManager.DeletePod(pod)
if isMirrorPod(pod) { if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start) kl.handleMirrorPod(pod, start)
continue continue
} }

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -80,7 +81,7 @@ type TestKubelet struct {
fakeRuntime *kubecontainer.FakeRuntime fakeRuntime *kubecontainer.FakeRuntime
fakeCadvisor *cadvisor.Mock fakeCadvisor *cadvisor.Mock
fakeKubeClient *testclient.Fake fakeKubeClient *testclient.Fake
fakeMirrorClient *fakeMirrorClient fakeMirrorClient *kubepod.FakeMirrorClient
} }
func newTestKubelet(t *testing.T) *TestKubelet { func newTestKubelet(t *testing.T) *TestKubelet {
@ -116,8 +117,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.daemonEndpoints = &api.NodeDaemonEndpoints{} kubelet.daemonEndpoints = &api.NodeDaemonEndpoints{}
mockCadvisor := &cadvisor.Mock{} mockCadvisor := &cadvisor.Mock{}
kubelet.cadvisor = mockCadvisor kubelet.cadvisor = mockCadvisor
podManager, fakeMirrorClient := newFakePodManager() fakeMirrorClient := kubepod.NewFakeMirrorClient()
kubelet.podManager = podManager kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
kubelet.containerRefManager = kubecontainer.NewRefManager() kubelet.containerRefManager = kubecontainer.NewRefManager()
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{}) diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
if err != nil { if err != nil {

View File

@ -14,18 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package pod
import ( import (
"sync" "sync"
"testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
type fakeMirrorClient struct { type FakeMirrorClient struct {
mirrorPodLock sync.RWMutex mirrorPodLock sync.RWMutex
// Note that a real mirror manager does not store the mirror pods in // Note that a real mirror manager does not store the mirror pods in
// itself. This fake manager does this to track calls. // itself. This fake manager does this to track calls.
@ -34,7 +33,15 @@ type fakeMirrorClient struct {
deleteCounts map[string]int deleteCounts map[string]int
} }
func (fmc *fakeMirrorClient) CreateMirrorPod(pod *api.Pod) error { func NewFakeMirrorClient() *FakeMirrorClient {
m := FakeMirrorClient{}
m.mirrorPods = sets.NewString()
m.createCounts = make(map[string]int)
m.deleteCounts = make(map[string]int)
return &m
}
func (fmc *FakeMirrorClient) CreateMirrorPod(pod *api.Pod) error {
fmc.mirrorPodLock.Lock() fmc.mirrorPodLock.Lock()
defer fmc.mirrorPodLock.Unlock() defer fmc.mirrorPodLock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
@ -43,7 +50,7 @@ func (fmc *fakeMirrorClient) CreateMirrorPod(pod *api.Pod) error {
return nil return nil
} }
func (fmc *fakeMirrorClient) DeleteMirrorPod(podFullName string) error { func (fmc *FakeMirrorClient) DeleteMirrorPod(podFullName string) error {
fmc.mirrorPodLock.Lock() fmc.mirrorPodLock.Lock()
defer fmc.mirrorPodLock.Unlock() defer fmc.mirrorPodLock.Unlock()
fmc.mirrorPods.Delete(podFullName) fmc.mirrorPods.Delete(podFullName)
@ -51,65 +58,26 @@ func (fmc *fakeMirrorClient) DeleteMirrorPod(podFullName string) error {
return nil return nil
} }
func newFakeMirrorClient() *fakeMirrorClient { func (fmc *FakeMirrorClient) HasPod(podFullName string) bool {
m := fakeMirrorClient{}
m.mirrorPods = sets.NewString()
m.createCounts = make(map[string]int)
m.deleteCounts = make(map[string]int)
return &m
}
func (fmc *fakeMirrorClient) HasPod(podFullName string) bool {
fmc.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer fmc.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return fmc.mirrorPods.Has(podFullName) return fmc.mirrorPods.Has(podFullName)
} }
func (fmc *fakeMirrorClient) NumOfPods() int { func (fmc *FakeMirrorClient) NumOfPods() int {
fmc.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer fmc.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return fmc.mirrorPods.Len() return fmc.mirrorPods.Len()
} }
func (fmc *fakeMirrorClient) GetPods() []string { func (fmc *FakeMirrorClient) GetPods() []string {
fmc.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer fmc.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return fmc.mirrorPods.List() return fmc.mirrorPods.List()
} }
func (fmc *fakeMirrorClient) GetCounts(podFullName string) (int, int) { func (fmc *FakeMirrorClient) GetCounts(podFullName string) (int, int) {
fmc.mirrorPodLock.RLock() fmc.mirrorPodLock.RLock()
defer fmc.mirrorPodLock.RUnlock() defer fmc.mirrorPodLock.RUnlock()
return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName] return fmc.createCounts[podFullName], fmc.deleteCounts[podFullName]
} }
func TestParsePodFullName(t *testing.T) {
type nameTuple struct {
Name string
Namespace string
}
successfulCases := map[string]nameTuple{
"bar_foo": {Name: "bar", Namespace: "foo"},
"bar.org_foo.com": {Name: "bar.org", Namespace: "foo.com"},
"bar-bar_foo": {Name: "bar-bar", Namespace: "foo"},
}
failedCases := []string{"barfoo", "bar_foo_foo", ""}
for podFullName, expected := range successfulCases {
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
t.Errorf("unexpected error when parsing the full name: %v", err)
continue
}
if name != expected.Name || namespace != expected.Namespace {
t.Errorf("expected name %q, namespace %q; got name %q, namespace %q",
expected.Name, expected.Namespace, name, namespace)
}
}
for _, podFullName := range failedCases {
_, _, err := kubecontainer.ParsePodFullName(podFullName)
if err == nil {
t.Errorf("expected error when parsing the full name, got none")
}
}
}

View File

@ -14,13 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package pod
import ( import (
"sync" "sync"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
) )
@ -40,7 +39,7 @@ import (
// When a static pod gets deleted, the associated orphaned mirror pod will // When a static pod gets deleted, the associated orphaned mirror pod will
// also be removed. // also be removed.
type podManager interface { type Manager interface {
GetPods() []*api.Pod GetPods() []*api.Pod
GetPodByFullName(podFullName string) (*api.Pod, bool) GetPodByFullName(podFullName string) (*api.Pod, bool)
GetPodByName(namespace, name string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool)
@ -60,13 +59,13 @@ type podManager interface {
DeleteOrphanedMirrorPods() DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID TranslatePodUID(uid types.UID) types.UID
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
mirrorClient MirrorClient
} }
// All maps in basicPodManager should be set by calling UpdatePods(); // All maps in basicManager should be set by calling UpdatePods();
// individual arrays/maps are not immutable and no other methods should attempt // individual arrays/maps are not immutable and no other methods should attempt
// to modify them. // to modify them.
type basicPodManager struct { type basicManager struct {
// Protects all internal maps. // Protects all internal maps.
lock sync.RWMutex lock sync.RWMutex
@ -80,24 +79,24 @@ type basicPodManager struct {
mirrorPodByFullName map[string]*api.Pod mirrorPodByFullName map[string]*api.Pod
// A mirror pod client to create/delete mirror pods. // A mirror pod client to create/delete mirror pods.
mirrorClient MirrorClient
} }
func newBasicPodManager(apiserverClient client.Interface) *basicPodManager { func NewBasicPodManager(client MirrorClient) Manager {
pm := &basicPodManager{} pm := &basicManager{}
pm.mirrorClient = newBasicMirrorClient(apiserverClient) pm.MirrorClient = client
pm.SetPods(nil) pm.SetPods(nil)
return pm return pm
} }
// Set the internal pods based on the new pods. // Set the internal pods based on the new pods.
func (pm *basicPodManager) SetPods(newPods []*api.Pod) { func (pm *basicManager) SetPods(newPods []*api.Pod) {
pm.lock.Lock() pm.lock.Lock()
defer pm.lock.Unlock() defer pm.lock.Unlock()
pm.setPods(newPods) pm.setPods(newPods)
} }
func (pm *basicPodManager) setPods(newPods []*api.Pod) { func (pm *basicManager) setPods(newPods []*api.Pod) {
podByUID := make(map[types.UID]*api.Pod) podByUID := make(map[types.UID]*api.Pod)
mirrorPodByUID := make(map[types.UID]*api.Pod) mirrorPodByUID := make(map[types.UID]*api.Pod)
podByFullName := make(map[string]*api.Pod) podByFullName := make(map[string]*api.Pod)
@ -105,7 +104,7 @@ func (pm *basicPodManager) setPods(newPods []*api.Pod) {
for _, pod := range newPods { for _, pod := range newPods {
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) { if IsMirrorPod(pod) {
mirrorPodByUID[pod.UID] = pod mirrorPodByUID[pod.UID] = pod
mirrorPodByFullName[podFullName] = pod mirrorPodByFullName[podFullName] = pod
} else { } else {
@ -120,15 +119,15 @@ func (pm *basicPodManager) setPods(newPods []*api.Pod) {
pm.mirrorPodByFullName = mirrorPodByFullName pm.mirrorPodByFullName = mirrorPodByFullName
} }
func (pm *basicPodManager) AddPod(pod *api.Pod) { func (pm *basicManager) AddPod(pod *api.Pod) {
pm.UpdatePod(pod) pm.UpdatePod(pod)
} }
func (pm *basicPodManager) UpdatePod(pod *api.Pod) { func (pm *basicManager) UpdatePod(pod *api.Pod) {
pm.lock.Lock() pm.lock.Lock()
defer pm.lock.Unlock() defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) { if IsMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod pm.mirrorPodByFullName[podFullName] = pod
} else { } else {
@ -137,11 +136,11 @@ func (pm *basicPodManager) UpdatePod(pod *api.Pod) {
} }
} }
func (pm *basicPodManager) DeletePod(pod *api.Pod) { func (pm *basicManager) DeletePod(pod *api.Pod) {
pm.lock.Lock() pm.lock.Lock()
defer pm.lock.Unlock() defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) { if IsMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID) delete(pm.mirrorPodByUID, pod.UID)
delete(pm.mirrorPodByFullName, podFullName) delete(pm.mirrorPodByFullName, podFullName)
} else { } else {
@ -151,14 +150,14 @@ func (pm *basicPodManager) DeletePod(pod *api.Pod) {
} }
// GetPods returns the regular pods bound to the kubelet and their spec. // GetPods returns the regular pods bound to the kubelet and their spec.
func (pm *basicPodManager) GetPods() []*api.Pod { func (pm *basicManager) GetPods() []*api.Pod {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()
return podsMapToPods(pm.podByUID) return podsMapToPods(pm.podByUID)
} }
// GetPodsAndMirrorPods returns the both regular and mirror pods. // GetPodsAndMirrorPods returns the both regular and mirror pods.
func (pm *basicPodManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) { func (pm *basicManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()
pods := podsMapToPods(pm.podByUID) pods := podsMapToPods(pm.podByUID)
@ -167,20 +166,20 @@ func (pm *basicPodManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) {
} }
// Returns all pods (including mirror pods). // Returns all pods (including mirror pods).
func (pm *basicPodManager) getAllPods() []*api.Pod { func (pm *basicManager) getAllPods() []*api.Pod {
return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...) return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
} }
// GetPodByName provides the (non-mirror) pod that matches namespace and name, // GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found. // as well as whether the pod was found.
func (pm *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { func (pm *basicManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
podFullName := kubecontainer.BuildPodFullName(name, namespace) podFullName := kubecontainer.BuildPodFullName(name, namespace)
return pm.GetPodByFullName(podFullName) return pm.GetPodByFullName(podFullName)
} }
// GetPodByName returns the (non-mirror) pod that matches full name, as well as // GetPodByName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found. // whether the pod was found.
func (pm *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { func (pm *basicManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()
pod, ok := pm.podByFullName[podFullName] pod, ok := pm.podByFullName[podFullName]
@ -191,7 +190,7 @@ func (pm *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool)
// Otherwise, return the original UID. All public-facing functions should // Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID, // perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions. // which is not recognized by internal Kubelet functions.
func (pm *basicPodManager) TranslatePodUID(uid types.UID) types.UID { func (pm *basicManager) TranslatePodUID(uid types.UID) types.UID {
if uid == "" { if uid == "" {
return uid return uid
} }
@ -207,7 +206,7 @@ func (pm *basicPodManager) TranslatePodUID(uid types.UID) types.UID {
return uid return uid
} }
func (pm *basicPodManager) getOrphanedMirrorPodNames() []string { func (pm *basicManager) getOrphanedMirrorPodNames() []string {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()
var podFullNames []string var podFullNames []string
@ -221,16 +220,16 @@ func (pm *basicPodManager) getOrphanedMirrorPodNames() []string {
// Delete all mirror pods which do not have associated static pods. This method // Delete all mirror pods which do not have associated static pods. This method
// sends deletion requets to the API server, but does NOT modify the internal // sends deletion requets to the API server, but does NOT modify the internal
// pod storage in basicPodManager. // pod storage in basicManager.
func (pm *basicPodManager) DeleteOrphanedMirrorPods() { func (pm *basicManager) DeleteOrphanedMirrorPods() {
podFullNames := pm.getOrphanedMirrorPodNames() podFullNames := pm.getOrphanedMirrorPodNames()
for _, podFullName := range podFullNames { for _, podFullName := range podFullNames {
pm.mirrorClient.DeleteMirrorPod(podFullName) pm.MirrorClient.DeleteMirrorPod(podFullName)
} }
} }
// Returns true if mirrorPod is a correct representation of pod; false otherwise. // Returns true if mirrorPod is a correct representation of pod; false otherwise.
func (pm *basicPodManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool { func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool {
// Check name and namespace first. // Check name and namespace first.
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {
return false return false
@ -246,14 +245,14 @@ func podsMapToPods(UIDMap map[types.UID]*api.Pod) []*api.Pod {
return pods return pods
} }
func (pm *basicPodManager) GetMirrorPodByPod(pod *api.Pod) (*api.Pod, bool) { func (pm *basicManager) GetMirrorPodByPod(pod *api.Pod) (*api.Pod, bool) {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()
mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)] mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
return mirrorPod, ok return mirrorPod, ok
} }
func (pm *basicPodManager) GetPodByMirrorPod(mirrorPod *api.Pod) (*api.Pod, bool) { func (pm *basicManager) GetPodByMirrorPod(mirrorPod *api.Pod) (*api.Pod, bool) {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()
pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)] pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package pod
import ( import (
"reflect" "reflect"
@ -25,11 +25,10 @@ import (
) )
// Stub out mirror client for testing purpose. // Stub out mirror client for testing purpose.
func newFakePodManager() (*basicPodManager, *fakeMirrorClient) { func newTestManager() (*basicManager, *FakeMirrorClient) {
podManager := newBasicPodManager(nil) fakeMirrorClient := NewFakeMirrorClient()
fakeMirrorClient := newFakeMirrorClient() manager := NewBasicPodManager(fakeMirrorClient).(*basicManager)
podManager.mirrorClient = fakeMirrorClient return manager, fakeMirrorClient
return podManager, fakeMirrorClient
} }
// Tests that pods/maps are properly set after the pod update, and the basic // Tests that pods/maps are properly set after the pod update, and the basic
@ -67,7 +66,7 @@ func TestGetSetPods(t *testing.T) {
staticPod, staticPod,
} }
updates := append(expectedPods, mirrorPod) updates := append(expectedPods, mirrorPod)
podManager, _ := newFakePodManager() podManager, _ := newTestManager()
podManager.SetPods(updates) podManager.SetPods(updates)
// Tests that all regular pods are recorded corrrectly. // Tests that all regular pods are recorded corrrectly.

View File

@ -14,11 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kubelet package pod
import ( import (
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
@ -27,8 +25,7 @@ import (
) )
// Mirror client is used to create/delete a mirror pod. // Mirror client is used to create/delete a mirror pod.
type MirrorClient interface {
type mirrorClient interface {
CreateMirrorPod(*api.Pod) error CreateMirrorPod(*api.Pod) error
DeleteMirrorPod(string) error DeleteMirrorPod(string) error
} }
@ -39,7 +36,7 @@ type basicMirrorClient struct {
apiserverClient client.Interface apiserverClient client.Interface
} }
func newBasicMirrorClient(apiserverClient client.Interface) *basicMirrorClient { func NewBasicMirrorClient(apiserverClient client.Interface) MirrorClient {
return &basicMirrorClient{apiserverClient: apiserverClient} return &basicMirrorClient{apiserverClient: apiserverClient}
} }
@ -78,22 +75,12 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
return nil return nil
} }
// Helper functions. func IsStaticPod(pod *api.Pod) bool {
func getPodSource(pod *api.Pod) (string, error) { source, err := kubetypes.GetPodSource(pod)
if pod.Annotations != nil {
if source, ok := pod.Annotations[kubetypes.ConfigSourceAnnotationKey]; ok {
return source, nil
}
}
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
}
func isStaticPod(pod *api.Pod) bool {
source, err := getPodSource(pod)
return err == nil && source != kubetypes.ApiserverSource return err == nil && source != kubetypes.ApiserverSource
} }
func isMirrorPod(pod *api.Pod) bool { func IsMirrorPod(pod *api.Pod) bool {
if value, ok := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; !ok { if value, ok := pod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; !ok {
return false return false
} else { } else {

View File

@ -0,0 +1,54 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func TestParsePodFullName(t *testing.T) {
type nameTuple struct {
Name string
Namespace string
}
successfulCases := map[string]nameTuple{
"bar_foo": {Name: "bar", Namespace: "foo"},
"bar.org_foo.com": {Name: "bar.org", Namespace: "foo.com"},
"bar-bar_foo": {Name: "bar-bar", Namespace: "foo"},
}
failedCases := []string{"barfoo", "bar_foo_foo", ""}
for podFullName, expected := range successfulCases {
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
t.Errorf("unexpected error when parsing the full name: %v", err)
continue
}
if name != expected.Name || namespace != expected.Namespace {
t.Errorf("expected name %q, namespace %q; got name %q, namespace %q",
expected.Name, expected.Namespace, name, namespace)
}
}
for _, podFullName := range failedCases {
_, _, err := kubecontainer.ParsePodFullName(podFullName)
if err == nil {
t.Errorf("expected error when parsing the full name, got none")
}
}
}

View File

@ -26,14 +26,14 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
) )
func TestRunOnce(t *testing.T) { func TestRunOnce(t *testing.T) {
cadvisor := &cadvisor.Mock{} cadvisor := &cadvisor.Mock{}
cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient())
podManager, _ := newFakePodManager()
diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{}) diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{})
fakeRuntime := &kubecontainer.FakeRuntime{} fakeRuntime := &kubecontainer.FakeRuntime{}

View File

@ -89,6 +89,16 @@ func GetValidatedSources(sources []string) ([]string, error) {
return validated, nil return validated, nil
} }
// GetPodSource returns the source of the pod based on the annotation.
func GetPodSource(pod *api.Pod) (string, error) {
if pod.Annotations != nil {
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
return source, nil
}
}
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
}
// SyncPodType classifies pod updates, eg: create, update. // SyncPodType classifies pod updates, eg: create, update.
type SyncPodType int type SyncPodType int

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/capabilities"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
) )
@ -82,7 +83,7 @@ func canRunPod(pod *api.Pod) error {
// Determined whether the specified pod is allowed to use host networking // Determined whether the specified pod is allowed to use host networking
func allowHostNetwork(pod *api.Pod) (bool, error) { func allowHostNetwork(pod *api.Pod) (bool, error) {
podSource, err := getPodSource(pod) podSource, err := kubetypes.GetPodSource(pod)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -96,7 +97,7 @@ func allowHostNetwork(pod *api.Pod) (bool, error) {
// Determined whether the specified pod is allowed to use host networking // Determined whether the specified pod is allowed to use host networking
func allowHostPID(pod *api.Pod) (bool, error) { func allowHostPID(pod *api.Pod) (bool, error) {
podSource, err := getPodSource(pod) podSource, err := kubetypes.GetPodSource(pod)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -110,7 +111,7 @@ func allowHostPID(pod *api.Pod) (bool, error) {
// Determined whether the specified pod is allowed to use host ipc // Determined whether the specified pod is allowed to use host ipc
func allowHostIPC(pod *api.Pod) (bool, error) { func allowHostIPC(pod *api.Pod) (bool, error) {
podSource, err := getPodSource(pod) podSource, err := kubetypes.GetPodSource(pod)
if err != nil { if err != nil {
return false, err return false, err
} }