kubernetes/cmd/kube-scheduler/app/server.go
Patrick Ohly 21d1bcd6b8 initialize logging after flag parsing
It wasn't documented that InitLogs already uses the log flush frequency, so
some commands have called it before parsing (for example, kubectl in the
original code for logs.go). The flag never had an effect in such commands.

Fixing this turned into a major refactoring of how commands set up flags and
run their Cobra command:

- component-base/logs: implicitely registering flags during package init is an
  anti-pattern that makes it impossible to use the package in commands which
  want full control over their command line. Logging flags must be added
  explicitly now, something that the new cli.Run does automatically.

- component-base/logs: AddFlags would have crashed in kubectl-convert if it
  had been called because it relied on the global pflag.CommandLine. This
  has been fixed and kubectl-convert now has the same --log-flush-frequency
  flag as other commands.

- component-base/logs/testinit: an exception are tests where flag.CommandLine has
  to be used. This new package can be imported to add flags to that
  once per test program.

- Normalization of the klog command line flags was inconsistent. Some commands
  unintentionally didn't normalize to the recommended format with hyphens. This
  gets fixed for sample programs, but not for production programs because
  it would be a breaking change.

This refactoring has the following user-visible effects:

- The validation error for `go run ./cmd/kube-apiserver --logging-format=json
  --add-dir-header` now references `add-dir-header` instead of `add_dir_header`.

- `staging/src/k8s.io/cloud-provider/sample` uses flags with hyphen instead of
  underscore.

- `--log-flush-frequency` is not listed anymore in the --logging-format flag's
  `non-default formats don't honor these flags` usage text because it will also
  work for non-default formats once it is needed.

- `cmd/kubelet`: the description of `--logging-format` uses hyphens instead of
  underscores for the flags, which now matches what the command is using.

- `staging/src/k8s.io/component-base/logs/example/cmd`: added logging flags.

- `apiextensions-apiserver` no longer prints a useless stack trace for `main`
  when command line parsing raises an error.
2021-09-30 13:46:49 +02:00

344 lines
12 KiB
Go

/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a Server object for running the scheduler.
package app
import (
"context"
"fmt"
"net/http"
"os"
goruntime "runtime"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/configz"
"k8s.io/component-base/logs"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/term"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics/resources"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
// Option configures a framework.Registry.
type Option func(runtime.Registry) error
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
namedFlagSets := opts.Flags()
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Complete(&namedFlagSets); err != nil {
return err
}
if err := runCommand(cmd, opts, registryOptions...); err != nil {
return err
}
return nil
},
SilenceErrors: true,
SilenceUsage: true,
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
cmd.SetGlobalNormalizationFunc(pflag.CommandLine.GetNormalizeFunc())
fs := cmd.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
cmd.MarkFlagFilename("config", "yaml", "yml", "json")
return cmd
}
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
stopCh := server.SetupSignalHandler()
<-stopCh
cancel()
}()
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
}
return Run(ctx, cc, sched)
}
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// To help debugging, immediately log version
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Prepare the event broadcaster.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
klog.Exitf("leaderelection lost")
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
// buildHandlerChain wraps the given handler with the standard filters.
func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
requestInfoResolver := &apirequest.RequestInfoFactory{}
failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHTTPLogging(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
return handler
}
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
configz.InstallHandler(pathRecorderMux)
pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
if !isLeader() {
return
}
resourceMetricsHandler.ServeHTTP(w, req)
})
}
// newHealthzAndMetricsHandler creates a healthz server from the config, and will also
// embed the metrics handler.
func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
healthz.InstallHandler(pathRecorderMux, checks...)
installMetricHandler(pathRecorderMux, informers, isLeader)
if config.EnableProfiling {
routes.Profiling{}.Install(pathRecorderMux)
if config.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
routes.DebugFlags{}.Install(pathRecorderMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
return pathRecorderMux
}
func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
return func(name string) events.EventRecorder {
return cc.EventBroadcaster.NewRecorder(name)
}
}
// WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins,
// hence there are no references to it from the kubernetes scheduler code base.
func WithPlugin(name string, factory runtime.PluginFactory) Option {
return func(registry runtime.Registry) error {
return registry.Register(name, factory)
}
}
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// Get the completed config
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithLegacyPolicySource(cc.LegacyPolicySource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
completedProfiles = append(completedProfiles, profile)
}),
)
if err != nil {
return nil, nil, err
}
if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
return nil, nil, err
}
return &cc, sched, nil
}