Merge pull request #110504 from pohly/kubelet-shutdown-test
kubelet: convert node shutdown manager to contextual logging
This commit is contained in:
@@ -334,6 +334,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
nodeStatusMaxImages int32,
|
||||
seccompDefault bool,
|
||||
) (*Kubelet, error) {
|
||||
logger := klog.TODO()
|
||||
|
||||
if rootDirectory == "" {
|
||||
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
||||
}
|
||||
@@ -819,6 +821,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
|
||||
// setup node shutdown manager
|
||||
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
|
||||
Logger: logger,
|
||||
ProbeManager: klet.probeManager,
|
||||
Recorder: kubeDeps.Recorder,
|
||||
NodeRef: nodeRef,
|
||||
|
@@ -44,6 +44,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
@@ -149,6 +150,8 @@ func newTestKubeletWithImageList(
|
||||
imageList []kubecontainer.Image,
|
||||
controllerAttachDetachEnabled bool,
|
||||
initFakeVolumePlugin bool) *TestKubelet {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
fakeRuntime := &containertest.FakeRuntime{
|
||||
ImageList: imageList,
|
||||
// Set ready conditions by default.
|
||||
@@ -321,6 +324,7 @@ func newTestKubeletWithImageList(
|
||||
|
||||
// setup shutdown manager
|
||||
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
|
||||
Logger: logger,
|
||||
ProbeManager: kubelet.probeManager,
|
||||
Recorder: fakeRecorder,
|
||||
NodeRef: nodeRef,
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/eviction"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
@@ -37,6 +38,7 @@ type Manager interface {
|
||||
|
||||
// Config represents Manager configuration
|
||||
type Config struct {
|
||||
Logger klog.Logger
|
||||
ProbeManager prober.Manager
|
||||
Recorder record.EventRecorder
|
||||
NodeRef *v1.ObjectReference
|
||||
|
@@ -67,6 +67,7 @@ type dbusInhibiter interface {
|
||||
|
||||
// managerImpl has functions that can be used to interact with the Node Shutdown Manager.
|
||||
type managerImpl struct {
|
||||
logger klog.Logger
|
||||
recorder record.EventRecorder
|
||||
nodeRef *v1.ObjectReference
|
||||
probeManager prober.Manager
|
||||
@@ -118,6 +119,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
|
||||
conf.Clock = clock.RealClock{}
|
||||
}
|
||||
manager := &managerImpl{
|
||||
logger: conf.Logger,
|
||||
probeManager: conf.ProbeManager,
|
||||
recorder: conf.Recorder,
|
||||
nodeRef: conf.NodeRef,
|
||||
@@ -131,7 +133,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
|
||||
Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
|
||||
},
|
||||
}
|
||||
klog.InfoS("Creating node shutdown manager",
|
||||
manager.logger.Info("Creating node shutdown manager",
|
||||
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
|
||||
"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
|
||||
"shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
|
||||
@@ -159,7 +161,7 @@ func (m *managerImpl) setMetrics() {
|
||||
sta := state{}
|
||||
err := m.storage.Load(&sta)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to load graceful shutdown state")
|
||||
m.logger.Error(err, "Failed to load graceful shutdown state")
|
||||
} else {
|
||||
if !sta.StartTime.IsZero() {
|
||||
metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
|
||||
@@ -184,10 +186,10 @@ func (m *managerImpl) Start() error {
|
||||
}
|
||||
|
||||
time.Sleep(dbusReconnectPeriod)
|
||||
klog.V(1).InfoS("Restarting watch for node shutdown events")
|
||||
m.logger.V(1).Info("Restarting watch for node shutdown events")
|
||||
stop, err = m.start()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Unable to watch the node for shutdown events")
|
||||
m.logger.Error(err, "Unable to watch the node for shutdown events")
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -255,11 +257,11 @@ func (m *managerImpl) start() (chan struct{}, error) {
|
||||
select {
|
||||
case isShuttingDown, ok := <-events:
|
||||
if !ok {
|
||||
klog.ErrorS(err, "Ended to watching the node for shutdown events")
|
||||
m.logger.Error(err, "Ended to watching the node for shutdown events")
|
||||
close(stop)
|
||||
return
|
||||
}
|
||||
klog.V(1).InfoS("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
|
||||
m.logger.V(1).Info("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
|
||||
|
||||
var shutdownType string
|
||||
if isShuttingDown {
|
||||
@@ -267,7 +269,7 @@ func (m *managerImpl) start() (chan struct{}, error) {
|
||||
} else {
|
||||
shutdownType = "cancelled"
|
||||
}
|
||||
klog.V(1).InfoS("Shutdown manager detected new shutdown event", "event", shutdownType)
|
||||
m.logger.V(1).Info("Shutdown manager detected new shutdown event", "event", shutdownType)
|
||||
if isShuttingDown {
|
||||
m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown event")
|
||||
} else {
|
||||
@@ -316,12 +318,12 @@ func (m *managerImpl) ShutdownStatus() error {
|
||||
}
|
||||
|
||||
func (m *managerImpl) processShutdownEvent() error {
|
||||
klog.V(1).InfoS("Shutdown manager processing shutdown event")
|
||||
m.logger.V(1).Info("Shutdown manager processing shutdown event")
|
||||
activePods := m.getPods()
|
||||
|
||||
defer func() {
|
||||
m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
|
||||
klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly")
|
||||
m.logger.V(1).Info("Shutdown manager completed processing shutdown event, node will shutdown shortly")
|
||||
}()
|
||||
|
||||
if m.enableMetrics && m.storage != nil {
|
||||
@@ -330,7 +332,7 @@ func (m *managerImpl) processShutdownEvent() error {
|
||||
StartTime: startTime,
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to store graceful shutdown state")
|
||||
m.logger.Error(err, "Failed to store graceful shutdown state")
|
||||
}
|
||||
metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
|
||||
metrics.GracefulShutdownEndTime.Set(0)
|
||||
@@ -342,7 +344,7 @@ func (m *managerImpl) processShutdownEvent() error {
|
||||
EndTime: endTime,
|
||||
})
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to store graceful shutdown state")
|
||||
m.logger.Error(err, "Failed to store graceful shutdown state")
|
||||
}
|
||||
metrics.GracefulShutdownStartTime.Set(timestamp(endTime))
|
||||
}()
|
||||
@@ -369,7 +371,7 @@ func (m *managerImpl) processShutdownEvent() error {
|
||||
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
|
||||
}
|
||||
|
||||
klog.V(1).InfoS("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
|
||||
m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
|
||||
|
||||
if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
|
||||
// set the pod status to failed (unless it was already in a successful terminal phase)
|
||||
@@ -379,9 +381,9 @@ func (m *managerImpl) processShutdownEvent() error {
|
||||
status.Message = nodeShutdownMessage
|
||||
status.Reason = nodeShutdownReason
|
||||
}); err != nil {
|
||||
klog.V(1).InfoS("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
|
||||
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
|
||||
} else {
|
||||
klog.V(1).InfoS("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
|
||||
m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
|
||||
}
|
||||
}(pod, group)
|
||||
}
|
||||
@@ -399,7 +401,7 @@ func (m *managerImpl) processShutdownEvent() error {
|
||||
case <-doneCh:
|
||||
timer.Stop()
|
||||
case <-timer.C():
|
||||
klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
|
||||
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -20,7 +20,6 @@ limitations under the License.
|
||||
package nodeshutdown
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
@@ -35,7 +34,8 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init" // activate ktesting command line flags
|
||||
"k8s.io/kubernetes/pkg/apis/scheduling"
|
||||
pkgfeatures "k8s.io/kubernetes/pkg/features"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
@@ -211,6 +211,8 @@ func TestManager(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
activePodsFunc := func() []*v1.Pod {
|
||||
return tc.activePods
|
||||
}
|
||||
@@ -243,6 +245,7 @@ func TestManager(t *testing.T) {
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
|
||||
manager, _ := NewManager(&Config{
|
||||
Logger: logger,
|
||||
ProbeManager: proberManager,
|
||||
Recorder: fakeRecorder,
|
||||
NodeRef: nodeRef,
|
||||
@@ -326,6 +329,7 @@ func TestFeatureEnabled(t *testing.T) {
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
activePodsFunc := func() []*v1.Pod {
|
||||
return nil
|
||||
}
|
||||
@@ -339,6 +343,7 @@ func TestFeatureEnabled(t *testing.T) {
|
||||
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
|
||||
|
||||
manager, _ := NewManager(&Config{
|
||||
Logger: logger,
|
||||
ProbeManager: proberManager,
|
||||
Recorder: fakeRecorder,
|
||||
NodeRef: nodeRef,
|
||||
@@ -355,6 +360,7 @@ func TestFeatureEnabled(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRestart(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
systemDbusTmp := systemDbus
|
||||
defer func() {
|
||||
systemDbus = systemDbusTmp
|
||||
@@ -393,6 +399,7 @@ func TestRestart(t *testing.T) {
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
|
||||
manager, _ := NewManager(&Config{
|
||||
Logger: logger,
|
||||
ProbeManager: proberManager,
|
||||
Recorder: fakeRecorder,
|
||||
NodeRef: nodeRef,
|
||||
@@ -617,23 +624,6 @@ func Test_groupByPriority(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
b bytes.Buffer
|
||||
rw sync.RWMutex
|
||||
}
|
||||
|
||||
func (b *buffer) String() string {
|
||||
b.rw.RLock()
|
||||
defer b.rw.RUnlock()
|
||||
return b.b.String()
|
||||
}
|
||||
|
||||
func (b *buffer) Write(p []byte) (n int, err error) {
|
||||
b.rw.Lock()
|
||||
defer b.rw.Unlock()
|
||||
return b.b.Write(p)
|
||||
}
|
||||
|
||||
func Test_managerImpl_processShutdownEvent(t *testing.T) {
|
||||
var (
|
||||
probeManager = probetest.FakeManager{}
|
||||
@@ -657,10 +647,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
|
||||
clock clock.Clock
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
wantErr bool
|
||||
exceptOutputContains string
|
||||
name string
|
||||
fields fields
|
||||
wantErr bool
|
||||
expectedOutputContains string
|
||||
}{
|
||||
{
|
||||
name: "kill pod func take too long",
|
||||
@@ -692,20 +682,17 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
|
||||
clock: fakeclock,
|
||||
dbusCon: &fakeDbus{},
|
||||
},
|
||||
wantErr: false,
|
||||
exceptOutputContains: "Shutdown manager pod killing time out",
|
||||
wantErr: false,
|
||||
expectedOutputContains: "Shutdown manager pod killing time out",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := klog.Level(1)
|
||||
l.Set("1")
|
||||
// hijack the klog output
|
||||
tmpWriteBuffer := new(buffer)
|
||||
klog.SetOutput(tmpWriteBuffer)
|
||||
klog.LogToStderr(false)
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
|
||||
m := &managerImpl{
|
||||
logger: logger,
|
||||
recorder: tt.fields.recorder,
|
||||
nodeRef: tt.fields.nodeRef,
|
||||
probeManager: tt.fields.probeManager,
|
||||
@@ -722,11 +709,17 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
|
||||
if err := m.processShutdownEvent(); (err != nil) != tt.wantErr {
|
||||
t.Errorf("managerImpl.processShutdownEvent() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
klog.Flush()
|
||||
|
||||
log := tmpWriteBuffer.String()
|
||||
if !strings.Contains(log, tt.exceptOutputContains) {
|
||||
t.Errorf("managerImpl.processShutdownEvent() should log %s, got %s", tt.exceptOutputContains, log)
|
||||
underlier, ok := logger.GetSink().(ktesting.Underlier)
|
||||
if !ok {
|
||||
t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink())
|
||||
}
|
||||
|
||||
log := underlier.GetBuffer().String()
|
||||
if !strings.Contains(log, tt.expectedOutputContains) {
|
||||
// Log will be shown on failure. To see it
|
||||
// during a successful run use "go test -v".
|
||||
t.Errorf("managerImpl.processShutdownEvent() should have logged %s, see actual output above.", tt.expectedOutputContains)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user