pleg: add an internal clock for testability
Also add tests for the health check.
This commit is contained in:
parent
94368df91a
commit
4846c1e1b2
@ -447,7 +447,7 @@ func NewMainKubelet(
|
||||
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
|
||||
}
|
||||
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache)
|
||||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{})
|
||||
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible)
|
||||
klet.updatePodCIDR(podCIDR)
|
||||
|
||||
|
@ -194,7 +194,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
}
|
||||
kubelet.workQueue = queue.NewBasicWorkQueue()
|
||||
// Relist period does not affect the tests.
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil)
|
||||
kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, util.RealClock{})
|
||||
kubelet.clock = fakeClock
|
||||
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
|
||||
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/atomic"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
@ -57,6 +58,8 @@ type GenericPLEG struct {
|
||||
relistTime atomic.Value
|
||||
// Cache for storing the runtime states required for syncing pods.
|
||||
cache kubecontainer.Cache
|
||||
// For testability.
|
||||
clock util.Clock
|
||||
}
|
||||
|
||||
// plegContainerState has a one-to-one mapping to the
|
||||
@ -92,13 +95,14 @@ type podRecord struct {
|
||||
type podRecords map[types.UID]*podRecord
|
||||
|
||||
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
|
||||
relistPeriod time.Duration, cache kubecontainer.Cache) PodLifecycleEventGenerator {
|
||||
relistPeriod time.Duration, cache kubecontainer.Cache, clock util.Clock) PodLifecycleEventGenerator {
|
||||
return &GenericPLEG{
|
||||
relistPeriod: relistPeriod,
|
||||
runtime: runtime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
|
||||
podRecords: make(podRecords),
|
||||
cache: cache,
|
||||
clock: clock,
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,7 +125,7 @@ func (g *GenericPLEG) Healthy() (bool, error) {
|
||||
// relisting time, which can vary significantly. Set a conservative
|
||||
// threshold so that we don't cause kubelet to be restarted unnecessarily.
|
||||
threshold := 2 * time.Minute
|
||||
if time.Since(relistTime) > threshold {
|
||||
if g.clock.Since(relistTime) > threshold {
|
||||
return false, fmt.Errorf("pleg was last seen active at %v", relistTime)
|
||||
}
|
||||
return true, nil
|
||||
@ -178,7 +182,7 @@ func (g *GenericPLEG) relist() {
|
||||
metrics.PLEGRelistInterval.Observe(metrics.SinceInMicroseconds(lastRelistTime))
|
||||
}
|
||||
|
||||
timestamp := time.Now()
|
||||
timestamp := g.clock.Now()
|
||||
// Update the relist time.
|
||||
g.updateRelisTime(timestamp)
|
||||
defer func() {
|
||||
@ -287,7 +291,7 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
||||
g.cache.Delete(pid)
|
||||
return nil
|
||||
}
|
||||
timestamp := time.Now()
|
||||
timestamp := g.clock.Now()
|
||||
// TODO: Consider adding a new runtime method
|
||||
// GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing
|
||||
// all containers again.
|
||||
|
@ -37,10 +37,12 @@ const (
|
||||
type TestGenericPLEG struct {
|
||||
pleg *GenericPLEG
|
||||
runtime *containertest.FakeRuntime
|
||||
clock *util.FakeClock
|
||||
}
|
||||
|
||||
func newTestGenericPLEG() *TestGenericPLEG {
|
||||
fakeRuntime := &containertest.FakeRuntime{}
|
||||
clock := util.NewFakeClock(time.Time{})
|
||||
// The channel capacity should be large enough to hold all events in a
|
||||
// single test.
|
||||
pleg := &GenericPLEG{
|
||||
@ -48,8 +50,9 @@ func newTestGenericPLEG() *TestGenericPLEG {
|
||||
runtime: fakeRuntime,
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
podRecords: make(podRecords),
|
||||
clock: clock,
|
||||
}
|
||||
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime}
|
||||
return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock}
|
||||
}
|
||||
|
||||
func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
|
||||
@ -222,6 +225,7 @@ func newTestGenericPLEGWithRuntimeMock() (*GenericPLEG, *containertest.Mock) {
|
||||
eventChannel: make(chan *PodLifecycleEvent, 100),
|
||||
podRecords: make(podRecords),
|
||||
cache: kubecontainer.NewCache(),
|
||||
clock: util.RealClock{},
|
||||
}
|
||||
return pleg, runtimeMock
|
||||
}
|
||||
@ -318,3 +322,22 @@ func TestRemoveCacheEntry(t *testing.T) {
|
||||
assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
|
||||
assert.Equal(t, nil, actualErr)
|
||||
}
|
||||
|
||||
func TestHealthy(t *testing.T) {
|
||||
testPleg := newTestGenericPLEG()
|
||||
pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock
|
||||
ok, _ := pleg.Healthy()
|
||||
assert.True(t, ok, "pleg should be healthy")
|
||||
|
||||
// Advance the clock without any relisting.
|
||||
clock.Step(time.Minute * 10)
|
||||
ok, _ = pleg.Healthy()
|
||||
assert.False(t, ok, "pleg should be unhealthy")
|
||||
|
||||
// Relist and than advance the time by 1 minute. pleg should be healthy
|
||||
// because this is within the allowed limit.
|
||||
pleg.relist()
|
||||
clock.Step(time.Minute * 1)
|
||||
ok, _ = pleg.Healthy()
|
||||
assert.True(t, ok, "pleg should be healthy")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user