From 814b065928f86bb27f5e8ad973e4c0cfe8343d4a Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Mon, 7 May 2018 12:16:46 -0400 Subject: [PATCH] Close all kubelet->API connections on heartbeat failure --- cmd/kubelet/app/server.go | 3 ++- pkg/kubelet/kubelet.go | 5 +++++ pkg/kubelet/kubelet_node_status.go | 3 +++ pkg/kubelet/kubelet_node_status_test.go | 8 ++++++++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index a0b4dcf112d..9e76391246d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -551,7 +551,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { // we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable // to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper // or the bootstrapping credentials to potentially lay down new initial config. - _, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) + closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute) if err != nil { return err } @@ -591,6 +591,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { kubeDeps.ExternalKubeClient = externalKubeClient if heartbeatClient != nil { kubeDeps.HeartbeatClient = heartbeatClient + kubeDeps.OnHeartbeatFailure = closeAllConns } if eventClient != nil { kubeDeps.EventClient = eventClient diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 05a5064c603..5e94bf539a2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -233,6 +233,7 @@ type Dependencies struct { DockerClientConfig *dockershim.ClientConfig EventClient v1core.EventsGetter HeartbeatClient v1core.CoreV1Interface + OnHeartbeatFailure func() KubeClient clientset.Interface ExternalKubeClient clientset.Interface Mounter mount.Interface @@ -488,6 +489,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nodeName: nodeName, kubeClient: kubeDeps.KubeClient, heartbeatClient: kubeDeps.HeartbeatClient, + onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure, rootDirectory: rootDirectory, resyncInterval: kubeCfg.SyncFrequency.Duration, sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), @@ -873,6 +875,9 @@ type Kubelet struct { iptClient utilipt.Interface rootDirectory string + // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional. + onRepeatedHeartbeatFailure func() + // podWorkers handle syncing Pods in response to events. podWorkers PodWorkers diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 3d842b2f72a..0e06424c4d7 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -350,6 +350,9 @@ func (kl *Kubelet) updateNodeStatus() error { glog.V(5).Infof("Updating node status") for i := 0; i < nodeStatusUpdateRetry; i++ { if err := kl.tryUpdateNodeStatus(i); err != nil { + if i > 0 && kl.onRepeatedHeartbeatFailure != nil { + kl.onRepeatedHeartbeatFailure() + } glog.Errorf("Error updating node status, will retry: %v", err) } else { return nil diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 10c3f7f8565..126bb47a7c3 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -596,6 +596,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { func TestUpdateExistingNodeStatusTimeout(t *testing.T) { attempts := int64(0) + failureCallbacks := int64(0) // set up a listener that hangs connections ln, err := net.Listen("tcp", "127.0.0.1:0") @@ -626,6 +627,9 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) { kubelet := testKubelet.kubelet kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.heartbeatClient, err = v1core.NewForConfig(config) + kubelet.onRepeatedHeartbeatFailure = func() { + atomic.AddInt64(&failureCallbacks, 1) + } kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatableReservation: v1.ResourceList{ @@ -645,6 +649,10 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) { if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry { t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts) } + // should have gotten multiple failure callbacks + if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) { + t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks) + } } func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {