Merge pull request #121914 from siyuanfoundation/health-rf
k8s.io/apiserver: refactor GenericAPIServer healthz code.
This commit is contained in:
		@@ -799,15 +799,14 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
 | 
			
		||||
		preShutdownHooks:       map[string]preShutdownHookEntry{},
 | 
			
		||||
		disabledPostStartHooks: c.DisabledPostStartHooks,
 | 
			
		||||
 | 
			
		||||
		healthzChecks:    c.HealthzChecks,
 | 
			
		||||
		livezChecks:      c.LivezChecks,
 | 
			
		||||
		readyzChecks:     c.ReadyzChecks,
 | 
			
		||||
		healthzRegistry:  healthCheckRegistry{path: "/healthz", checks: c.HealthzChecks},
 | 
			
		||||
		livezRegistry:    healthCheckRegistry{path: "/livez", checks: c.LivezChecks, clock: clock.RealClock{}},
 | 
			
		||||
		readyzRegistry:   healthCheckRegistry{path: "/readyz", checks: c.ReadyzChecks},
 | 
			
		||||
		livezGracePeriod: c.LivezGracePeriod,
 | 
			
		||||
 | 
			
		||||
		DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
 | 
			
		||||
 | 
			
		||||
		maxRequestBodyBytes: c.MaxRequestBodyBytes,
 | 
			
		||||
		livezClock:          clock.RealClock{},
 | 
			
		||||
 | 
			
		||||
		lifecycleSignals:       c.lifecycleSignals,
 | 
			
		||||
		ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,
 | 
			
		||||
 
 | 
			
		||||
@@ -60,7 +60,6 @@ import (
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/handler3"
 | 
			
		||||
	openapiutil "k8s.io/kube-openapi/pkg/util"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/validation/spec"
 | 
			
		||||
	"k8s.io/utils/clock"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Info about an API group.
 | 
			
		||||
@@ -191,19 +190,11 @@ type GenericAPIServer struct {
 | 
			
		||||
	preShutdownHooksCalled bool
 | 
			
		||||
 | 
			
		||||
	// healthz checks
 | 
			
		||||
	healthzLock            sync.Mutex
 | 
			
		||||
	healthzChecks          []healthz.HealthChecker
 | 
			
		||||
	healthzChecksInstalled bool
 | 
			
		||||
	// livez checks
 | 
			
		||||
	livezLock            sync.Mutex
 | 
			
		||||
	livezChecks          []healthz.HealthChecker
 | 
			
		||||
	livezChecksInstalled bool
 | 
			
		||||
	// readyz checks
 | 
			
		||||
	readyzLock            sync.Mutex
 | 
			
		||||
	readyzChecks          []healthz.HealthChecker
 | 
			
		||||
	readyzChecksInstalled bool
 | 
			
		||||
	livezGracePeriod      time.Duration
 | 
			
		||||
	livezClock            clock.Clock
 | 
			
		||||
	healthzRegistry healthCheckRegistry
 | 
			
		||||
	readyzRegistry  healthCheckRegistry
 | 
			
		||||
	livezRegistry   healthCheckRegistry
 | 
			
		||||
 | 
			
		||||
	livezGracePeriod time.Duration
 | 
			
		||||
 | 
			
		||||
	// auditing. The backend is started before the server starts listening.
 | 
			
		||||
	AuditBackend audit.Backend
 | 
			
		||||
@@ -330,7 +321,7 @@ func (s *GenericAPIServer) PreShutdownHooks() map[string]preShutdownHookEntry {
 | 
			
		||||
	return s.preShutdownHooks
 | 
			
		||||
}
 | 
			
		||||
func (s *GenericAPIServer) HealthzChecks() []healthz.HealthChecker {
 | 
			
		||||
	return s.healthzChecks
 | 
			
		||||
	return s.healthzRegistry.checks
 | 
			
		||||
}
 | 
			
		||||
func (s *GenericAPIServer) ListedPaths() []string {
 | 
			
		||||
	return s.listedPathProvider.ListedPaths()
 | 
			
		||||
 
 | 
			
		||||
@@ -19,12 +19,60 @@ package server
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apiserver/pkg/server/healthz"
 | 
			
		||||
	"k8s.io/utils/clock"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// healthMux is an interface describing the methods InstallHandler requires.
 | 
			
		||||
type healthMux interface {
 | 
			
		||||
	Handle(pattern string, handler http.Handler)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type healthCheckRegistry struct {
 | 
			
		||||
	path            string
 | 
			
		||||
	lock            sync.Mutex
 | 
			
		||||
	checks          []healthz.HealthChecker
 | 
			
		||||
	checksInstalled bool
 | 
			
		||||
	clock           clock.Clock
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (reg *healthCheckRegistry) addHealthChecks(checks ...healthz.HealthChecker) error {
 | 
			
		||||
	return reg.addDelayedHealthChecks(0, checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (reg *healthCheckRegistry) addDelayedHealthChecks(delay time.Duration, checks ...healthz.HealthChecker) error {
 | 
			
		||||
	if delay > 0 && reg.clock == nil {
 | 
			
		||||
		return fmt.Errorf("nil clock in healthCheckRegistry for %s endpoint", reg.path)
 | 
			
		||||
	}
 | 
			
		||||
	reg.lock.Lock()
 | 
			
		||||
	defer reg.lock.Unlock()
 | 
			
		||||
	if reg.checksInstalled {
 | 
			
		||||
		return fmt.Errorf("unable to add because the %s endpoint has already been created", reg.path)
 | 
			
		||||
	}
 | 
			
		||||
	if delay > 0 {
 | 
			
		||||
		for _, check := range checks {
 | 
			
		||||
			reg.checks = append(reg.checks, delayedHealthCheck(check, reg.clock, delay))
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		reg.checks = append(reg.checks, checks...)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (reg *healthCheckRegistry) installHandler(mux healthMux) {
 | 
			
		||||
	reg.installHandlerWithHealthyFunc(mux, nil)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (reg *healthCheckRegistry) installHandlerWithHealthyFunc(mux healthMux, firstTimeHealthy func()) {
 | 
			
		||||
	reg.lock.Lock()
 | 
			
		||||
	defer reg.lock.Unlock()
 | 
			
		||||
	reg.checksInstalled = true
 | 
			
		||||
	healthz.InstallPathHandlerWithHealthyFunc(mux, reg.path, firstTimeHealthy, reg.checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddHealthChecks adds HealthCheck(s) to health endpoints (healthz, livez, readyz) but
 | 
			
		||||
// configures the liveness grace period to be zero, which means we expect this health check
 | 
			
		||||
// to immediately indicate that the apiserver is unhealthy.
 | 
			
		||||
@@ -47,40 +95,23 @@ func (s *GenericAPIServer) AddBootSequenceHealthChecks(checks ...healthz.HealthC
 | 
			
		||||
// addHealthChecks adds health checks to healthz, livez, and readyz. The delay passed in will set
 | 
			
		||||
// a corresponding grace period on livez.
 | 
			
		||||
func (s *GenericAPIServer) addHealthChecks(livezGracePeriod time.Duration, checks ...healthz.HealthChecker) error {
 | 
			
		||||
	s.healthzLock.Lock()
 | 
			
		||||
	defer s.healthzLock.Unlock()
 | 
			
		||||
	if s.healthzChecksInstalled {
 | 
			
		||||
		return fmt.Errorf("unable to add because the healthz endpoint has already been created")
 | 
			
		||||
	}
 | 
			
		||||
	s.healthzChecks = append(s.healthzChecks, checks...)
 | 
			
		||||
	if err := s.AddLivezChecks(livezGracePeriod, checks...); err != nil {
 | 
			
		||||
	if err := s.healthzRegistry.addHealthChecks(checks...); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return s.AddReadyzChecks(checks...)
 | 
			
		||||
	if err := s.livezRegistry.addDelayedHealthChecks(livezGracePeriod, checks...); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return s.readyzRegistry.addHealthChecks(checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddReadyzChecks allows you to add a HealthCheck to readyz.
 | 
			
		||||
func (s *GenericAPIServer) AddReadyzChecks(checks ...healthz.HealthChecker) error {
 | 
			
		||||
	s.readyzLock.Lock()
 | 
			
		||||
	defer s.readyzLock.Unlock()
 | 
			
		||||
	if s.readyzChecksInstalled {
 | 
			
		||||
		return fmt.Errorf("unable to add because the readyz endpoint has already been created")
 | 
			
		||||
	}
 | 
			
		||||
	s.readyzChecks = append(s.readyzChecks, checks...)
 | 
			
		||||
	return nil
 | 
			
		||||
	return s.readyzRegistry.addHealthChecks(checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddLivezChecks allows you to add a HealthCheck to livez.
 | 
			
		||||
func (s *GenericAPIServer) AddLivezChecks(delay time.Duration, checks ...healthz.HealthChecker) error {
 | 
			
		||||
	s.livezLock.Lock()
 | 
			
		||||
	defer s.livezLock.Unlock()
 | 
			
		||||
	if s.livezChecksInstalled {
 | 
			
		||||
		return fmt.Errorf("unable to add because the livez endpoint has already been created")
 | 
			
		||||
	}
 | 
			
		||||
	for _, check := range checks {
 | 
			
		||||
		s.livezChecks = append(s.livezChecks, delayedHealthCheck(check, s.livezClock, delay))
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
	return s.livezRegistry.addDelayedHealthChecks(delay, checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// addReadyzShutdownCheck is a convenience function for adding a readyz shutdown check, so
 | 
			
		||||
@@ -92,29 +123,20 @@ func (s *GenericAPIServer) addReadyzShutdownCheck(stopCh <-chan struct{}) error
 | 
			
		||||
 | 
			
		||||
// installHealthz creates the healthz endpoint for this server
 | 
			
		||||
func (s *GenericAPIServer) installHealthz() {
 | 
			
		||||
	s.healthzLock.Lock()
 | 
			
		||||
	defer s.healthzLock.Unlock()
 | 
			
		||||
	s.healthzChecksInstalled = true
 | 
			
		||||
	healthz.InstallHandler(s.Handler.NonGoRestfulMux, s.healthzChecks...)
 | 
			
		||||
	s.healthzRegistry.installHandler(s.Handler.NonGoRestfulMux)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// installReadyz creates the readyz endpoint for this server.
 | 
			
		||||
func (s *GenericAPIServer) installReadyz() {
 | 
			
		||||
	s.readyzLock.Lock()
 | 
			
		||||
	defer s.readyzLock.Unlock()
 | 
			
		||||
	s.readyzChecksInstalled = true
 | 
			
		||||
	healthz.InstallReadyzHandlerWithHealthyFunc(s.Handler.NonGoRestfulMux, func() {
 | 
			
		||||
		// note: InstallReadyzHandlerWithHealthyFunc guarantees that this is called only once
 | 
			
		||||
	s.readyzRegistry.installHandlerWithHealthyFunc(s.Handler.NonGoRestfulMux, func() {
 | 
			
		||||
		// note: installHandlerWithHealthyFunc guarantees that this is called only once
 | 
			
		||||
		s.lifecycleSignals.HasBeenReady.Signal()
 | 
			
		||||
	}, s.readyzChecks...)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// installLivez creates the livez endpoint for this server.
 | 
			
		||||
func (s *GenericAPIServer) installLivez() {
 | 
			
		||||
	s.livezLock.Lock()
 | 
			
		||||
	defer s.livezLock.Unlock()
 | 
			
		||||
	s.livezChecksInstalled = true
 | 
			
		||||
	healthz.InstallLivezHandler(s.Handler.NonGoRestfulMux, s.livezChecks...)
 | 
			
		||||
	s.livezRegistry.installHandler(s.Handler.NonGoRestfulMux)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences
 | 
			
		||||
 
 | 
			
		||||
@@ -142,12 +142,6 @@ func InstallReadyzHandler(mux mux, checks ...HealthChecker) {
 | 
			
		||||
	InstallPathHandler(mux, "/readyz", checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// InstallReadyzHandlerWithHealthyFunc is like InstallReadyzHandler, but in addition call firstTimeReady
 | 
			
		||||
// the first time /readyz succeeds.
 | 
			
		||||
func InstallReadyzHandlerWithHealthyFunc(mux mux, firstTimeReady func(), checks ...HealthChecker) {
 | 
			
		||||
	InstallPathHandlerWithHealthyFunc(mux, "/readyz", firstTimeReady, checks...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// InstallLivezHandler registers handlers for liveness checking on the path
 | 
			
		||||
// "/livez" to mux. *All handlers* for mux must be specified in
 | 
			
		||||
// exactly one call to InstallHandler. Calling InstallHandler more
 | 
			
		||||
 
 | 
			
		||||
@@ -313,7 +313,7 @@ func (s cacheSyncWaiterStub) WaitForCacheSync(_ <-chan struct{}) map[reflect.Typ
 | 
			
		||||
	return s.startedByInformerType
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestInstallReadyzHandlerWithHealthyFunc(t *testing.T) {
 | 
			
		||||
func TestInstallPathHandlerWithHealthyFunc(t *testing.T) {
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	readyzCh := make(chan struct{})
 | 
			
		||||
 | 
			
		||||
@@ -321,7 +321,7 @@ func TestInstallReadyzHandlerWithHealthyFunc(t *testing.T) {
 | 
			
		||||
	hasBeenReadyFn := func() {
 | 
			
		||||
		hasBeenReadyCounter++
 | 
			
		||||
	}
 | 
			
		||||
	InstallReadyzHandlerWithHealthyFunc(mux, hasBeenReadyFn, readyOnChanClose{readyzCh})
 | 
			
		||||
	InstallPathHandlerWithHealthyFunc(mux, "/readyz", hasBeenReadyFn, readyOnChanClose{readyzCh})
 | 
			
		||||
 | 
			
		||||
	// scenario 1: expect the check to fail since the channel hasn't been closed
 | 
			
		||||
	req, err := http.NewRequest("GET", fmt.Sprintf("http://example.com%s", "/readyz"), nil)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user