Emit event and retry when fail to start healthz server on kube-proxy.
This commit is contained in:
		| @@ -462,10 +462,17 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx | |||||||
| 	eventBroadcaster := record.NewBroadcaster() | 	eventBroadcaster := record.NewBroadcaster() | ||||||
| 	recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname}) | 	recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname}) | ||||||
|  |  | ||||||
|  | 	nodeRef := &v1.ObjectReference{ | ||||||
|  | 		Kind:      "Node", | ||||||
|  | 		Name:      hostname, | ||||||
|  | 		UID:       types.UID(hostname), | ||||||
|  | 		Namespace: "", | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	var healthzServer *healthcheck.HealthzServer | 	var healthzServer *healthcheck.HealthzServer | ||||||
| 	var healthzUpdater healthcheck.HealthzUpdater | 	var healthzUpdater healthcheck.HealthzUpdater | ||||||
| 	if len(config.HealthzBindAddress) > 0 { | 	if len(config.HealthzBindAddress) > 0 { | ||||||
| 		healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration) | 		healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) | ||||||
| 		healthzUpdater = healthzServer | 		healthzUpdater = healthzServer | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -572,13 +579,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx | |||||||
| 		iptInterface.AddReloadFunc(proxier.Sync) | 		iptInterface.AddReloadFunc(proxier.Sync) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	nodeRef := &v1.ObjectReference{ |  | ||||||
| 		Kind:      "Node", |  | ||||||
| 		Name:      hostname, |  | ||||||
| 		UID:       types.UID(hostname), |  | ||||||
| 		Namespace: "", |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &ProxyServer{ | 	return &ProxyServer{ | ||||||
| 		Client:                 client, | 		Client:                 client, | ||||||
| 		EventClient:            eventClient, | 		EventClient:            eventClient, | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ go_library( | |||||||
|         "//vendor/k8s.io/api/core/v1:go_default_library", |         "//vendor/k8s.io/api/core/v1:go_default_library", | ||||||
|         "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", |         "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", | ||||||
|         "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", |         "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", | ||||||
|  |         "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", | ||||||
|         "//vendor/k8s.io/client-go/tools/record:go_default_library", |         "//vendor/k8s.io/client-go/tools/record:go_default_library", | ||||||
|     ], |     ], | ||||||
| ) | ) | ||||||
|   | |||||||
| @@ -31,10 +31,13 @@ import ( | |||||||
| 	"k8s.io/api/core/v1" | 	"k8s.io/api/core/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/apimachinery/pkg/util/clock" | 	"k8s.io/apimachinery/pkg/util/clock" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	"k8s.io/client-go/tools/record" | 	"k8s.io/client-go/tools/record" | ||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | var nodeHealthzRetryInterval = 60 * time.Second | ||||||
|  |  | ||||||
| // Server serves HTTP endpoints for each service name, with results | // Server serves HTTP endpoints for each service name, with results | ||||||
| // based on the endpoints.  If there are 0 endpoints for a service, it returns a | // based on the endpoints.  If there are 0 endpoints for a service, it returns a | ||||||
| // 503 "Service Unavailable" error (telling LBs not to use this node).  If there | // 503 "Service Unavailable" error (telling LBs not to use this node).  If there | ||||||
| @@ -161,7 +164,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err | |||||||
| 						Namespace: nsn.Namespace, | 						Namespace: nsn.Namespace, | ||||||
| 						Name:      nsn.Name, | 						Name:      nsn.Name, | ||||||
| 						UID:       types.UID(nsn.String()), | 						UID:       types.UID(nsn.String()), | ||||||
| 					}, api.EventTypeWarning, "FailedToStartHealthcheck", msg) | 					}, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg) | ||||||
| 			} | 			} | ||||||
| 			glog.Error(msg) | 			glog.Error(msg) | ||||||
| 			continue | 			continue | ||||||
| @@ -259,16 +262,18 @@ type HealthzServer struct { | |||||||
| 	addr          string | 	addr          string | ||||||
| 	port          int32 | 	port          int32 | ||||||
| 	healthTimeout time.Duration | 	healthTimeout time.Duration | ||||||
|  | 	recorder      record.EventRecorder | ||||||
|  | 	nodeRef       *v1.ObjectReference | ||||||
|  |  | ||||||
| 	lastUpdated atomic.Value | 	lastUpdated atomic.Value | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewDefaultHealthzServer returns a default healthz http server. | // NewDefaultHealthzServer returns a default healthz http server. | ||||||
| func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer { | func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer { | ||||||
| 	return newHealthzServer(nil, nil, nil, addr, healthTimeout) | 	return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef) | ||||||
| } | } | ||||||
|  |  | ||||||
| func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer { | func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer { | ||||||
| 	if listener == nil { | 	if listener == nil { | ||||||
| 		listener = stdNetListener{} | 		listener = stdNetListener{} | ||||||
| 	} | 	} | ||||||
| @@ -284,6 +289,8 @@ func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c | |||||||
| 		clock:         c, | 		clock:         c, | ||||||
| 		addr:          addr, | 		addr:          addr, | ||||||
| 		healthTimeout: healthTimeout, | 		healthTimeout: healthTimeout, | ||||||
|  | 		recorder:      recorder, | ||||||
|  | 		nodeRef:       nodeRef, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -297,19 +304,26 @@ func (hs *HealthzServer) Run() { | |||||||
| 	serveMux := http.NewServeMux() | 	serveMux := http.NewServeMux() | ||||||
| 	serveMux.Handle("/healthz", healthzHandler{hs: hs}) | 	serveMux.Handle("/healthz", healthzHandler{hs: hs}) | ||||||
| 	server := hs.httpFactory.New(hs.addr, serveMux) | 	server := hs.httpFactory.New(hs.addr, serveMux) | ||||||
|  |  | ||||||
|  | 	go wait.Until(func() { | ||||||
|  | 		glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) | ||||||
|  |  | ||||||
| 		listener, err := hs.listener.Listen(hs.addr) | 		listener, err := hs.listener.Listen(hs.addr) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 		glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err) | 			msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err) | ||||||
|  | 			if hs.recorder != nil { | ||||||
|  | 				hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg) | ||||||
|  | 			} | ||||||
|  | 			glog.Error(msg) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| 	go func() { |  | ||||||
| 		glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) |  | ||||||
| 		if err := server.Serve(listener); err != nil { | 		if err := server.Serve(listener); err != nil { | ||||||
| 			glog.Errorf("Healhz closed: %v", err) | 			glog.Errorf("Healthz closed with error: %v", err) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| 		glog.Errorf("Unexpected healhz closed.") | 		glog.Errorf("Unexpected healthz closed.") | ||||||
| 	}() | 	}, nodeHealthzRetryInterval, wait.NeverStop) | ||||||
| } | } | ||||||
|  |  | ||||||
| type healthzHandler struct { | type healthzHandler struct { | ||||||
|   | |||||||
| @@ -368,7 +368,7 @@ func TestHealthzServer(t *testing.T) { | |||||||
| 	httpFactory := newFakeHTTPServerFactory() | 	httpFactory := newFakeHTTPServerFactory() | ||||||
| 	fakeClock := clock.NewFakeClock(time.Now()) | 	fakeClock := clock.NewFakeClock(time.Now()) | ||||||
|  |  | ||||||
| 	hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) | 	hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) | ||||||
| 	server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) | 	server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) | ||||||
|  |  | ||||||
| 	// Should return 200 "OK" by default. | 	// Should return 200 "OK" by default. | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Zihong Zheng
					Zihong Zheng