Merge pull request #118148 from linxiulei/sched_readyz

Expose /readyz & /livez in kube-scheduler
This commit is contained in:
Kubernetes Prow Robot
2024-05-29 22:09:17 -07:00
committed by GitHub
5 changed files with 313 additions and 25 deletions

View File

@@ -167,10 +167,12 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker
var checks, readyzChecks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
readyzChecks = append(readyzChecks, cc.LeaderElection.WatchDog)
}
readyzChecks = append(readyzChecks, healthz.NewShutdownHealthz(ctx.Done()))
waitingForLeader := make(chan struct{})
isLeader := func() bool {
@@ -184,9 +186,20 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
}
}
handlerSyncReadyCh := make(chan struct{})
handlerSyncCheck := healthz.NamedCheck("sched-handler-sync", func(_ *http.Request) error {
select {
case <-handlerSyncReadyCh:
return nil
default:
}
return fmt.Errorf("waiting for handlers to sync")
})
readyzChecks = append(readyzChecks, handlerSyncCheck)
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
@@ -214,6 +227,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
logger.Error(err, "waiting for handlers to sync")
}
close(handlerSyncReadyCh)
logger.V(3).Info("Handlers synced")
}
if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
@@ -288,11 +302,14 @@ func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers inform
})
}
// newHealthzAndMetricsHandler creates a healthz server from the config, and will also
// newHealthEndpointsAndMetricsHandler creates an API health server from the config, and will also
// embed the metrics handler.
func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
// TODO: healthz check is deprecated, please use livez and readyz instead. Will be removed in the future.
func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, healthzChecks, readyzChecks []healthz.HealthChecker) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux, checks...)
healthz.InstallHandler(pathRecorderMux, healthzChecks...)
healthz.InstallLivezHandler(pathRecorderMux)
healthz.InstallReadyzHandler(pathRecorderMux, readyzChecks...)
installMetricHandler(pathRecorderMux, informers, isLeader)
slis.SLIMetricsWithReset{}.Install(pathRecorderMux)

View File

@@ -118,7 +118,7 @@ func (s *GenericAPIServer) AddLivezChecks(delay time.Duration, checks ...healthz
// that we can register that the api-server is no longer ready while we attempt to gracefully
// shutdown.
func (s *GenericAPIServer) addReadyzShutdownCheck(stopCh <-chan struct{}) error {
return s.AddReadyzChecks(shutdownCheck{stopCh})
return s.AddReadyzChecks(healthz.NewShutdownHealthz(stopCh))
}
// installHealthz creates the healthz endpoint for this server
@@ -139,25 +139,6 @@ func (s *GenericAPIServer) installLivez() {
s.livezRegistry.installHandler(s.Handler.NonGoRestfulMux)
}
// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences
// for the apiserver.
type shutdownCheck struct {
StopCh <-chan struct{}
}
func (shutdownCheck) Name() string {
return "shutdown"
}
func (c shutdownCheck) Check(req *http.Request) error {
select {
case <-c.StopCh:
return fmt.Errorf("process is shutting down")
default:
}
return nil
}
// delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed. This
// is intended for use primarily for livez health checks.
func delayedHealthCheck(check healthz.HealthChecker, clock clock.Clock, delay time.Duration) healthz.HealthChecker {

View File

@@ -105,6 +105,29 @@ func (i *informerSync) Name() string {
return "informer-sync"
}
type shutdown struct {
stopCh <-chan struct{}
}
// NewShutdownHealthz returns a new HealthChecker that will fail if the embedded channel is closed.
// This is intended to allow for graceful shutdown sequences.
func NewShutdownHealthz(stopCh <-chan struct{}) HealthChecker {
return &shutdown{stopCh}
}
func (s *shutdown) Name() string {
return "shutdown"
}
func (s *shutdown) Check(req *http.Request) error {
select {
case <-s.stopCh:
return fmt.Errorf("process is shutting down")
default:
}
return nil
}
func (i *informerSync) Check(_ *http.Request) error {
stopCh := make(chan struct{})
// Close stopCh to force checking if informers are synced now.

View File

@@ -0,0 +1,240 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serving
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
"path"
"strings"
"testing"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
kubeschedulertesting "k8s.io/kubernetes/cmd/kube-scheduler/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestHealthEndpoints(t *testing.T) {
server, configStr, _, err := startTestAPIServer(t)
if err != nil {
t.Fatalf("Failed to start kube-apiserver server: %v", err)
}
defer server.TearDownFn()
apiserverConfig, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatalf("Failed to create config file: %v", err)
}
defer func() {
_ = os.Remove(apiserverConfig.Name())
}()
if _, err = apiserverConfig.WriteString(configStr); err != nil {
t.Fatalf("Failed to write config file: %v", err)
}
brokenConfigStr := strings.ReplaceAll(configStr, "127.0.0.1", "127.0.0.2")
brokenConfig, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatalf("Failed to create config file: %v", err)
}
if _, err := brokenConfig.WriteString(brokenConfigStr); err != nil {
t.Fatalf("Failed to write config file: %v", err)
}
defer func() {
_ = os.Remove(brokenConfig.Name())
}()
tests := []struct {
name string
path string
useBrokenConfig bool
wantResponseCode int
}{
{
"/healthz",
"/healthz",
false,
http.StatusOK,
},
{
"/livez",
"/livez",
false,
http.StatusOK,
},
{
"/livez with ping check",
"/livez/ping",
false,
http.StatusOK,
},
{
"/readyz",
"/readyz",
false,
http.StatusOK,
},
{
"/readyz with sched-handler-sync",
"/readyz/sched-handler-sync",
false,
http.StatusOK,
},
{
"/readyz with shutdown",
"/readyz/shutdown",
false,
http.StatusOK,
},
{
"/readyz with broken apiserver",
"/readyz",
true,
http.StatusInternalServerError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt := tt
_, ctx := ktesting.NewTestContext(t)
configFile := apiserverConfig.Name()
if tt.useBrokenConfig {
configFile = brokenConfig.Name()
}
result, err := kubeschedulertesting.StartTestServer(
ctx,
[]string{"--kubeconfig", configFile, "--leader-elect=false", "--authorization-always-allow-paths", tt.path})
if err != nil {
t.Fatalf("Failed to start kube-scheduler server: %v", err)
}
if result.TearDownFn != nil {
defer result.TearDownFn()
}
client, base, err := clientAndURLFromTestServer(result)
if err != nil {
t.Fatalf("Failed to get client from test server: %v", err)
}
req, err := http.NewRequest("GET", base+tt.path, nil)
if err != nil {
t.Fatalf("failed to request: %v", err)
}
r, err := client.Do(req)
if err != nil {
t.Fatalf("failed to GET %s from component: %v", tt.path, err)
}
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("failed to read response body: %v", err)
}
if err = r.Body.Close(); err != nil {
t.Fatalf("failed to close response body: %v", err)
}
if got, expected := r.StatusCode, tt.wantResponseCode; got != expected {
t.Fatalf("expected http %d at %s of component, got: %d %q", expected, tt.path, got, string(body))
}
})
}
}
// TODO: Make this a util function once there is a unified way to start a testing apiserver so that we can reuse it.
func startTestAPIServer(t *testing.T) (server *kubeapiservertesting.TestServer, apiserverConfig, token string, err error) {
// Insulate this test from picking up in-cluster config when run inside a pod
// We can't assume we have permissions to write to /var/run/secrets/... from a unit test to mock in-cluster config for testing
originalHost := os.Getenv("KUBERNETES_SERVICE_HOST")
if len(originalHost) > 0 {
if err = os.Setenv("KUBERNETES_SERVICE_HOST", ""); err != nil {
return
}
defer func() {
err = os.Setenv("KUBERNETES_SERVICE_HOST", originalHost)
}()
}
// authenticate to apiserver via bearer token
token = "flwqkenfjasasdfmwerasd" // Fake token for testing.
var tokenFile *os.File
tokenFile, err = os.CreateTemp("", "kubeconfig")
if err != nil {
return
}
if _, err = tokenFile.WriteString(fmt.Sprintf(`%s,system:kube-scheduler,system:kube-scheduler,""`, token)); err != nil {
return
}
if err = tokenFile.Close(); err != nil {
return
}
// start apiserver
server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{
"--token-auth-file", tokenFile.Name(),
"--authorization-mode", "AlwaysAllow",
}, framework.SharedEtcd())
apiserverConfig = fmt.Sprintf(`
apiVersion: v1
kind: Config
clusters:
- cluster:
server: %s
certificate-authority: %s
name: integration
contexts:
- context:
cluster: integration
user: kube-scheduler
name: default-context
current-context: default-context
users:
- name: kube-scheduler
user:
token: %s
`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token)
return server, apiserverConfig, token, nil
}
func clientAndURLFromTestServer(s kubeschedulertesting.TestServer) (*http.Client, string, error) {
secureInfo := s.Config.SecureServing
secureOptions := s.Options.SecureServing
url := fmt.Sprintf("https://%s", secureInfo.Listener.Addr().String())
url = strings.ReplaceAll(url, "[::]", "127.0.0.1") // switch to IPv4 because the self-signed cert does not support [::]
// read self-signed server cert disk
pool := x509.NewCertPool()
serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt")
serverCert, err := os.ReadFile(serverCertPath)
if err != nil {
return nil, "", fmt.Errorf("Failed to read component server cert %q: %w", serverCertPath, err)
}
pool.AppendCertsFromPEM(serverCert)
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: pool,
},
}
client := &http.Client{Transport: tr}
return client, url, nil
}

View File

@@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serving
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}