Close all kubelet->API connections on heartbeat failure
This commit is contained in:
@@ -551,7 +551,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
|
|||||||
// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
|
// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
|
||||||
// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
|
// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
|
||||||
// or the bootstrapping credentials to potentially lay down new initial config.
|
// or the bootstrapping credentials to potentially lay down new initial config.
|
||||||
_, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
|
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -591,6 +591,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
|
|||||||
kubeDeps.ExternalKubeClient = externalKubeClient
|
kubeDeps.ExternalKubeClient = externalKubeClient
|
||||||
if heartbeatClient != nil {
|
if heartbeatClient != nil {
|
||||||
kubeDeps.HeartbeatClient = heartbeatClient
|
kubeDeps.HeartbeatClient = heartbeatClient
|
||||||
|
kubeDeps.OnHeartbeatFailure = closeAllConns
|
||||||
}
|
}
|
||||||
if eventClient != nil {
|
if eventClient != nil {
|
||||||
kubeDeps.EventClient = eventClient
|
kubeDeps.EventClient = eventClient
|
||||||
|
@@ -233,6 +233,7 @@ type Dependencies struct {
|
|||||||
DockerClientConfig *dockershim.ClientConfig
|
DockerClientConfig *dockershim.ClientConfig
|
||||||
EventClient v1core.EventsGetter
|
EventClient v1core.EventsGetter
|
||||||
HeartbeatClient v1core.CoreV1Interface
|
HeartbeatClient v1core.CoreV1Interface
|
||||||
|
OnHeartbeatFailure func()
|
||||||
KubeClient clientset.Interface
|
KubeClient clientset.Interface
|
||||||
ExternalKubeClient clientset.Interface
|
ExternalKubeClient clientset.Interface
|
||||||
Mounter mount.Interface
|
Mounter mount.Interface
|
||||||
@@ -488,6 +489,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
nodeName: nodeName,
|
nodeName: nodeName,
|
||||||
kubeClient: kubeDeps.KubeClient,
|
kubeClient: kubeDeps.KubeClient,
|
||||||
heartbeatClient: kubeDeps.HeartbeatClient,
|
heartbeatClient: kubeDeps.HeartbeatClient,
|
||||||
|
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
|
||||||
rootDirectory: rootDirectory,
|
rootDirectory: rootDirectory,
|
||||||
resyncInterval: kubeCfg.SyncFrequency.Duration,
|
resyncInterval: kubeCfg.SyncFrequency.Duration,
|
||||||
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
|
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
|
||||||
@@ -873,6 +875,9 @@ type Kubelet struct {
|
|||||||
iptClient utilipt.Interface
|
iptClient utilipt.Interface
|
||||||
rootDirectory string
|
rootDirectory string
|
||||||
|
|
||||||
|
// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
|
||||||
|
onRepeatedHeartbeatFailure func()
|
||||||
|
|
||||||
// podWorkers handle syncing Pods in response to events.
|
// podWorkers handle syncing Pods in response to events.
|
||||||
podWorkers PodWorkers
|
podWorkers PodWorkers
|
||||||
|
|
||||||
|
@@ -350,6 +350,9 @@ func (kl *Kubelet) updateNodeStatus() error {
|
|||||||
glog.V(5).Infof("Updating node status")
|
glog.V(5).Infof("Updating node status")
|
||||||
for i := 0; i < nodeStatusUpdateRetry; i++ {
|
for i := 0; i < nodeStatusUpdateRetry; i++ {
|
||||||
if err := kl.tryUpdateNodeStatus(i); err != nil {
|
if err := kl.tryUpdateNodeStatus(i); err != nil {
|
||||||
|
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
|
||||||
|
kl.onRepeatedHeartbeatFailure()
|
||||||
|
}
|
||||||
glog.Errorf("Error updating node status, will retry: %v", err)
|
glog.Errorf("Error updating node status, will retry: %v", err)
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
|
@@ -596,6 +596,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
|||||||
|
|
||||||
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
||||||
attempts := int64(0)
|
attempts := int64(0)
|
||||||
|
failureCallbacks := int64(0)
|
||||||
|
|
||||||
// set up a listener that hangs connections
|
// set up a listener that hangs connections
|
||||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
@@ -626,6 +627,9 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
|||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
||||||
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
|
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
|
||||||
|
kubelet.onRepeatedHeartbeatFailure = func() {
|
||||||
|
atomic.AddInt64(&failureCallbacks, 1)
|
||||||
|
}
|
||||||
kubelet.containerManager = &localCM{
|
kubelet.containerManager = &localCM{
|
||||||
ContainerManager: cm.NewStubContainerManager(),
|
ContainerManager: cm.NewStubContainerManager(),
|
||||||
allocatableReservation: v1.ResourceList{
|
allocatableReservation: v1.ResourceList{
|
||||||
@@ -645,6 +649,10 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
|||||||
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
|
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
|
||||||
t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
|
t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
|
||||||
}
|
}
|
||||||
|
// should have gotten multiple failure callbacks
|
||||||
|
if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) {
|
||||||
|
t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
|
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
|
||||||
|
Reference in New Issue
Block a user