Add killPodNow to kubelet

This commit is contained in:
derekwaynecarr
2016-05-06 14:07:24 -04:00
parent 5a99fcd21b
commit 6fefb428c1
7 changed files with 380 additions and 74 deletions

View File

@@ -40,12 +40,18 @@ type fakePodWorkers struct {
t TestingInterface
}
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) {
status, err := f.cache.Get(pod.UID)
func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
status, err := f.cache.Get(options.Pod.UID)
if err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
if err := f.syncPodFn(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
if err := f.syncPodFn(syncPodOptions{
mirrorPod: options.MirrorPod,
pod: options.Pod,
podStatus: status,
updateType: options.UpdateType,
killPodOptions: options.KillPodOptions,
}); err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
}
@@ -67,18 +73,28 @@ func newPod(uid, name string) *api.Pod {
}
}
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
// syncPodRecord is a record of a sync pod call
type syncPodRecord struct {
name string
updateType kubetypes.SyncPodType
}
func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
lock := sync.Mutex{}
processed := make(map[types.UID][]string)
processed := make(map[types.UID][]syncPodRecord)
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &containertest.FakeRuntime{}
fakeCache := containertest.NewFakeCache(fakeRuntime)
podWorkers := newPodWorkers(
func(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
func(options syncPodOptions) error {
func() {
lock.Lock()
defer lock.Unlock()
processed[pod.UID] = append(processed[pod.UID], pod.Name)
pod := options.pod
processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
name: pod.Name,
updateType: options.updateType,
})
}()
return nil
},
@@ -115,12 +131,15 @@ func TestUpdatePod(t *testing.T) {
numPods := 20
for i := 0; i < numPods; i++ {
for j := i; j < numPods; j++ {
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, kubetypes.SyncPodCreate, func() {})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: newPod(string(j), string(i)),
UpdateType: kubetypes.SyncPodCreate,
})
}
}
drainWorkers(podWorkers, numPods)
if len(processed) != 20 {
if len(processed) != numPods {
t.Errorf("Not all pods processed: %v", len(processed))
return
}
@@ -133,22 +152,65 @@ func TestUpdatePod(t *testing.T) {
first := 0
last := len(processed[uid]) - 1
if processed[uid][first] != string(0) {
if processed[uid][first].name != string(0) {
t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first])
}
if processed[uid][last] != string(i) {
if processed[uid][last].name != string(i) {
t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last])
}
}
}
func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
podWorkers, processed := createPodWorkers()
numPods := 20
for i := 0; i < numPods; i++ {
pod := newPod(string(i), string(i))
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodCreate,
})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodUpdate,
})
}
drainWorkers(podWorkers, numPods)
if len(processed) != numPods {
t.Errorf("Not all pods processed: %v", len(processed))
return
}
for i := 0; i < numPods; i++ {
uid := types.UID(i)
// each pod should be processed two times (create, kill, but not update)
syncPodRecords := processed[uid]
if len(syncPodRecords) < 2 {
t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords))
continue
}
if syncPodRecords[0].updateType != kubetypes.SyncPodCreate {
t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate)
}
if syncPodRecords[1].updateType != kubetypes.SyncPodKill {
t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill)
}
}
}
func TestForgetNonExistingPodWorkers(t *testing.T) {
podWorkers, _ := createPodWorkers()
numPods := 20
for i := 0; i < numPods; i++ {
podWorkers.UpdatePod(newPod(string(i), "name"), nil, kubetypes.SyncPodUpdate, func() {})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: newPod(string(i), "name"),
UpdateType: kubetypes.SyncPodUpdate,
})
}
drainWorkers(podWorkers, numPods)
@@ -183,13 +245,13 @@ type simpleFakeKubelet struct {
wg sync.WaitGroup
}
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error {
kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
return nil
}
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error {
kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
kl.wg.Done()
return nil
}
@@ -240,8 +302,16 @@ func TestFakePodWorkers(t *testing.T) {
for i, tt := range tests {
kubeletForRealWorkers.wg.Add(1)
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
realPodWorkers.UpdatePod(&UpdatePodOptions{
Pod: tt.pod,
MirrorPod: tt.mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
})
fakePodWorkers.UpdatePod(&UpdatePodOptions{
Pod: tt.pod,
MirrorPod: tt.mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
})
kubeletForRealWorkers.wg.Wait()
@@ -258,3 +328,26 @@ func TestFakePodWorkers(t *testing.T) {
}
}
}
// TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
func TestKillPodNowFunc(t *testing.T) {
podWorkers, processed := createPodWorkers()
killPodFunc := killPodNow(podWorkers)
pod := newPod("test", "test")
gracePeriodOverride := int64(0)
err := killPodFunc(pod, api.PodStatus{Phase: api.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(processed) != 1 {
t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed))
return
}
syncPodRecords := processed[pod.UID]
if len(syncPodRecords) != 1 {
t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1)
}
if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
}
}