Merge pull request #63492 from liggitt/node-heartbeat-close-connections
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. track/close kubelet->API connections on heartbeat failure xref #48638 xref https://github.com/kubernetes-incubator/kube-aws/issues/598 we're already typically tracking kubelet -> API connections and have the ability to force close them as part of client cert rotation. if we do that tracking unconditionally, we gain the ability to also force close connections on heartbeat failure as well. it's a big hammer (means reestablishing pod watches, etc), but so is having all your pods evicted because you didn't heartbeat. this intentionally does minimal refactoring/extraction of the cert connection tracking transport in case we want to backport this * first commit unconditionally sets up the connection-tracking dialer, and moves all the cert management logic inside an if-block that gets skipped if no certificate manager is provided (view with whitespace ignored to see what actually changed) * second commit plumbs the connection-closing function to the heartbeat loop and calls it on repeated failures follow-ups: * consider backporting this to 1.10, 1.9, 1.8 * refactor the connection managing dialer to not be so tightly bound to the client certificate management /sig node /sig api-machinery ```release-note kubelet: fix hangs in updating Node status after network interruptions/changes between the kubelet and API server ```
This commit is contained in:
commit
8220171d8a
@ -547,13 +547,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
|
// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
|
||||||
// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
|
// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
|
||||||
// or the bootstrapping credentials to potentially lay down new initial config.
|
// or the bootstrapping credentials to potentially lay down new initial config.
|
||||||
if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute); err != nil {
|
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeClient, err = clientset.NewForConfig(clientConfig)
|
kubeClient, err = clientset.NewForConfig(clientConfig)
|
||||||
@ -591,6 +591,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
|
|||||||
kubeDeps.ExternalKubeClient = externalKubeClient
|
kubeDeps.ExternalKubeClient = externalKubeClient
|
||||||
if heartbeatClient != nil {
|
if heartbeatClient != nil {
|
||||||
kubeDeps.HeartbeatClient = heartbeatClient
|
kubeDeps.HeartbeatClient = heartbeatClient
|
||||||
|
kubeDeps.OnHeartbeatFailure = closeAllConns
|
||||||
}
|
}
|
||||||
if eventClient != nil {
|
if eventClient != nil {
|
||||||
kubeDeps.EventClient = eventClient
|
kubeDeps.EventClient = eventClient
|
||||||
|
@ -38,6 +38,8 @@ import (
|
|||||||
//
|
//
|
||||||
// The config must not already provide an explicit transport.
|
// The config must not already provide an explicit transport.
|
||||||
//
|
//
|
||||||
|
// The returned function allows forcefully closing all active connections.
|
||||||
|
//
|
||||||
// The returned transport periodically checks the manager to determine if the
|
// The returned transport periodically checks the manager to determine if the
|
||||||
// certificate has changed. If it has, the transport shuts down all existing client
|
// certificate has changed. If it has, the transport shuts down all existing client
|
||||||
// connections, forcing the client to re-handshake with the server and use the
|
// connections, forcing the client to re-handshake with the server and use the
|
||||||
@ -51,30 +53,15 @@ import (
|
|||||||
//
|
//
|
||||||
// stopCh should be used to indicate when the transport is unused and doesn't need
|
// stopCh should be used to indicate when the transport is unused and doesn't need
|
||||||
// to continue checking the manager.
|
// to continue checking the manager.
|
||||||
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) error {
|
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) {
|
||||||
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitAfter)
|
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitAfter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateTransport is an internal method that exposes how often this method checks that the
|
// updateTransport is an internal method that exposes how often this method checks that the
|
||||||
// client cert has changed.
|
// client cert has changed.
|
||||||
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) error {
|
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) {
|
||||||
if clientConfig.Transport != nil {
|
if clientConfig.Transport != nil || clientConfig.Dial != nil {
|
||||||
return fmt.Errorf("there is already a transport configured")
|
return nil, fmt.Errorf("there is already a transport or dialer configured")
|
||||||
}
|
|
||||||
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to configure TLS for the rest client: %v", err)
|
|
||||||
}
|
|
||||||
if tlsConfig == nil {
|
|
||||||
tlsConfig = &tls.Config{}
|
|
||||||
}
|
|
||||||
tlsConfig.Certificates = nil
|
|
||||||
tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
|
||||||
cert := clientCertificateManager.Current()
|
|
||||||
if cert == nil {
|
|
||||||
return &tls.Certificate{Certificate: nil}, nil
|
|
||||||
}
|
|
||||||
return cert, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Custom dialer that will track all connections it creates.
|
// Custom dialer that will track all connections it creates.
|
||||||
@ -83,48 +70,67 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
|
|||||||
conns: make(map[*closableConn]struct{}),
|
conns: make(map[*closableConn]struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
lastCertAvailable := time.Now()
|
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
|
||||||
lastCert := clientCertificateManager.Current()
|
if err != nil {
|
||||||
go wait.Until(func() {
|
return nil, fmt.Errorf("unable to configure TLS for the rest client: %v", err)
|
||||||
curr := clientCertificateManager.Current()
|
}
|
||||||
|
if tlsConfig == nil {
|
||||||
|
tlsConfig = &tls.Config{}
|
||||||
|
}
|
||||||
|
|
||||||
if exitAfter > 0 {
|
if clientCertificateManager != nil {
|
||||||
now := time.Now()
|
tlsConfig.Certificates = nil
|
||||||
if curr == nil {
|
tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||||
// the certificate has been deleted from disk or is otherwise corrupt
|
cert := clientCertificateManager.Current()
|
||||||
if now.After(lastCertAvailable.Add(exitAfter)) {
|
if cert == nil {
|
||||||
if clientCertificateManager.ServerHealthy() {
|
return &tls.Certificate{Certificate: nil}, nil
|
||||||
glog.Fatalf("It has been %s since a valid client cert was found and the server is responsive, exiting.", exitAfter)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("It has been %s since a valid client cert was found, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", exitAfter)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// the certificate is expired
|
|
||||||
if now.After(curr.Leaf.NotAfter) {
|
|
||||||
if clientCertificateManager.ServerHealthy() {
|
|
||||||
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
|
|
||||||
} else {
|
|
||||||
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lastCertAvailable = now
|
|
||||||
}
|
}
|
||||||
|
return cert, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if curr == nil || lastCert == curr {
|
lastCertAvailable := time.Now()
|
||||||
// Cert hasn't been rotated.
|
lastCert := clientCertificateManager.Current()
|
||||||
return
|
go wait.Until(func() {
|
||||||
}
|
curr := clientCertificateManager.Current()
|
||||||
lastCert = curr
|
|
||||||
|
|
||||||
glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials")
|
if exitAfter > 0 {
|
||||||
// The cert has been rotated. Close all existing connections to force the client
|
now := time.Now()
|
||||||
// to reperform its TLS handshake with new cert.
|
if curr == nil {
|
||||||
//
|
// the certificate has been deleted from disk or is otherwise corrupt
|
||||||
// See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493
|
if now.After(lastCertAvailable.Add(exitAfter)) {
|
||||||
t.closeAllConns()
|
if clientCertificateManager.ServerHealthy() {
|
||||||
}, period, stopCh)
|
glog.Fatalf("It has been %s since a valid client cert was found and the server is responsive, exiting.", exitAfter)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("It has been %s since a valid client cert was found, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", exitAfter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// the certificate is expired
|
||||||
|
if now.After(curr.Leaf.NotAfter) {
|
||||||
|
if clientCertificateManager.ServerHealthy() {
|
||||||
|
glog.Fatalf("The currently active client certificate has expired and the server is responsive, exiting.")
|
||||||
|
} else {
|
||||||
|
glog.Errorf("The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastCertAvailable = now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if curr == nil || lastCert == curr {
|
||||||
|
// Cert hasn't been rotated.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastCert = curr
|
||||||
|
|
||||||
|
glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials")
|
||||||
|
// The cert has been rotated. Close all existing connections to force the client
|
||||||
|
// to reperform its TLS handshake with new cert.
|
||||||
|
//
|
||||||
|
// See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493
|
||||||
|
t.closeAllConns()
|
||||||
|
}, period, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{
|
clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{
|
||||||
Proxy: http.ProxyFromEnvironment,
|
Proxy: http.ProxyFromEnvironment,
|
||||||
@ -142,7 +148,8 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
|
|||||||
clientConfig.CAData = nil
|
clientConfig.CAData = nil
|
||||||
clientConfig.CAFile = ""
|
clientConfig.CAFile = ""
|
||||||
clientConfig.Insecure = false
|
clientConfig.Insecure = false
|
||||||
return nil
|
|
||||||
|
return t.closeAllConns, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// connTracker is a dialer that tracks all open connections it creates.
|
// connTracker is a dialer that tracks all open connections it creates.
|
||||||
|
@ -187,7 +187,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check for a new cert every 10 milliseconds
|
// Check for a new cert every 10 milliseconds
|
||||||
if err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil {
|
if _, err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,6 +233,7 @@ type Dependencies struct {
|
|||||||
DockerClientConfig *dockershim.ClientConfig
|
DockerClientConfig *dockershim.ClientConfig
|
||||||
EventClient v1core.EventsGetter
|
EventClient v1core.EventsGetter
|
||||||
HeartbeatClient v1core.CoreV1Interface
|
HeartbeatClient v1core.CoreV1Interface
|
||||||
|
OnHeartbeatFailure func()
|
||||||
KubeClient clientset.Interface
|
KubeClient clientset.Interface
|
||||||
ExternalKubeClient clientset.Interface
|
ExternalKubeClient clientset.Interface
|
||||||
Mounter mount.Interface
|
Mounter mount.Interface
|
||||||
@ -488,6 +489,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
nodeName: nodeName,
|
nodeName: nodeName,
|
||||||
kubeClient: kubeDeps.KubeClient,
|
kubeClient: kubeDeps.KubeClient,
|
||||||
heartbeatClient: kubeDeps.HeartbeatClient,
|
heartbeatClient: kubeDeps.HeartbeatClient,
|
||||||
|
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
|
||||||
rootDirectory: rootDirectory,
|
rootDirectory: rootDirectory,
|
||||||
resyncInterval: kubeCfg.SyncFrequency.Duration,
|
resyncInterval: kubeCfg.SyncFrequency.Duration,
|
||||||
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
|
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
|
||||||
@ -874,6 +876,9 @@ type Kubelet struct {
|
|||||||
iptClient utilipt.Interface
|
iptClient utilipt.Interface
|
||||||
rootDirectory string
|
rootDirectory string
|
||||||
|
|
||||||
|
// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
|
||||||
|
onRepeatedHeartbeatFailure func()
|
||||||
|
|
||||||
// podWorkers handle syncing Pods in response to events.
|
// podWorkers handle syncing Pods in response to events.
|
||||||
podWorkers PodWorkers
|
podWorkers PodWorkers
|
||||||
|
|
||||||
|
@ -350,6 +350,9 @@ func (kl *Kubelet) updateNodeStatus() error {
|
|||||||
glog.V(5).Infof("Updating node status")
|
glog.V(5).Infof("Updating node status")
|
||||||
for i := 0; i < nodeStatusUpdateRetry; i++ {
|
for i := 0; i < nodeStatusUpdateRetry; i++ {
|
||||||
if err := kl.tryUpdateNodeStatus(i); err != nil {
|
if err := kl.tryUpdateNodeStatus(i); err != nil {
|
||||||
|
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
|
||||||
|
kl.onRepeatedHeartbeatFailure()
|
||||||
|
}
|
||||||
glog.Errorf("Error updating node status, will retry: %v", err)
|
glog.Errorf("Error updating node status, will retry: %v", err)
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
|
@ -679,6 +679,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
|||||||
|
|
||||||
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
||||||
attempts := int64(0)
|
attempts := int64(0)
|
||||||
|
failureCallbacks := int64(0)
|
||||||
|
|
||||||
// set up a listener that hangs connections
|
// set up a listener that hangs connections
|
||||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
@ -709,6 +710,9 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
|||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
kubelet.kubeClient = nil // ensure only the heartbeat client is used
|
||||||
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
|
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
|
||||||
|
kubelet.onRepeatedHeartbeatFailure = func() {
|
||||||
|
atomic.AddInt64(&failureCallbacks, 1)
|
||||||
|
}
|
||||||
kubelet.containerManager = &localCM{
|
kubelet.containerManager = &localCM{
|
||||||
ContainerManager: cm.NewStubContainerManager(),
|
ContainerManager: cm.NewStubContainerManager(),
|
||||||
allocatableReservation: v1.ResourceList{
|
allocatableReservation: v1.ResourceList{
|
||||||
@ -728,6 +732,10 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
|
|||||||
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
|
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts < nodeStatusUpdateRetry {
|
||||||
t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
|
t.Errorf("Expected at least %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
|
||||||
}
|
}
|
||||||
|
// should have gotten multiple failure callbacks
|
||||||
|
if actualFailureCallbacks := atomic.LoadInt64(&failureCallbacks); actualFailureCallbacks < (nodeStatusUpdateRetry - 1) {
|
||||||
|
t.Errorf("Expected %d failure callbacks, got %d", (nodeStatusUpdateRetry - 1), actualFailureCallbacks)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
|
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user