Add stopCh to apiserver & context to kublet commands
Remove SetupSignalContext call from the apiserver Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
		 Darren Shepherd
					Darren Shepherd
				
			
				
					committed by
					
						![rafaelbreno[commit]](/assets/img/avatar_default.png) rafaelbreno[commit]
						rafaelbreno[commit]
					
				
			
			
				
	
			
			
			![rafaelbreno[commit]](/assets/img/avatar_default.png) rafaelbreno[commit]
						rafaelbreno[commit]
					
				
			
						parent
						
							1d6158557c
						
					
				
				
					commit
					4e407fa5b5
				
			| @@ -24,6 +24,7 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/spf13/cobra/doc" | 	"github.com/spf13/cobra/doc" | ||||||
| 	"github.com/spf13/pflag" | 	"github.com/spf13/pflag" | ||||||
|  | 	"k8s.io/apiserver/pkg/server" | ||||||
| 	"k8s.io/kubernetes/cmd/genutils" | 	"k8s.io/kubernetes/cmd/genutils" | ||||||
| 	apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app" | 	apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app" | ||||||
| 	cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" | 	cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" | ||||||
| @@ -54,7 +55,7 @@ func main() { | |||||||
| 	switch module { | 	switch module { | ||||||
| 	case "kube-apiserver": | 	case "kube-apiserver": | ||||||
| 		// generate docs for kube-apiserver | 		// generate docs for kube-apiserver | ||||||
| 		apiserver := apiservapp.NewAPIServerCommand() | 		apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler()) | ||||||
| 		doc.GenMarkdownTree(apiserver, outDir) | 		doc.GenMarkdownTree(apiserver, outDir) | ||||||
| 	case "kube-controller-manager": | 	case "kube-controller-manager": | ||||||
| 		// generate docs for kube-controller-manager | 		// generate docs for kube-controller-manager | ||||||
| @@ -70,7 +71,7 @@ func main() { | |||||||
| 		doc.GenMarkdownTree(scheduler, outDir) | 		doc.GenMarkdownTree(scheduler, outDir) | ||||||
| 	case "kubelet": | 	case "kubelet": | ||||||
| 		// generate docs for kubelet | 		// generate docs for kubelet | ||||||
| 		kubelet := kubeletapp.NewKubeletCommand() | 		kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext()) | ||||||
| 		doc.GenMarkdownTree(kubelet, outDir) | 		doc.GenMarkdownTree(kubelet, outDir) | ||||||
| 	case "kubeadm": | 	case "kubeadm": | ||||||
| 		// resets global flags created by kubelet or other commands e.g. | 		// resets global flags created by kubelet or other commands e.g. | ||||||
|   | |||||||
| @@ -26,6 +26,7 @@ import ( | |||||||
| 	mangen "github.com/cpuguy83/go-md2man/v2/md2man" | 	mangen "github.com/cpuguy83/go-md2man/v2/md2man" | ||||||
| 	"github.com/spf13/cobra" | 	"github.com/spf13/cobra" | ||||||
| 	"github.com/spf13/pflag" | 	"github.com/spf13/pflag" | ||||||
|  | 	"k8s.io/apiserver/pkg/server" | ||||||
| 	"k8s.io/cli-runtime/pkg/genericiooptions" | 	"k8s.io/cli-runtime/pkg/genericiooptions" | ||||||
| 	kubectlcmd "k8s.io/kubectl/pkg/cmd" | 	kubectlcmd "k8s.io/kubectl/pkg/cmd" | ||||||
| 	"k8s.io/kubernetes/cmd/genutils" | 	"k8s.io/kubernetes/cmd/genutils" | ||||||
| @@ -62,7 +63,7 @@ func main() { | |||||||
| 	switch module { | 	switch module { | ||||||
| 	case "kube-apiserver": | 	case "kube-apiserver": | ||||||
| 		// generate manpage for kube-apiserver | 		// generate manpage for kube-apiserver | ||||||
| 		apiserver := apiservapp.NewAPIServerCommand() | 		apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler()) | ||||||
| 		genMarkdown(apiserver, "", outDir) | 		genMarkdown(apiserver, "", outDir) | ||||||
| 		for _, c := range apiserver.Commands() { | 		for _, c := range apiserver.Commands() { | ||||||
| 			genMarkdown(c, "kube-apiserver", outDir) | 			genMarkdown(c, "kube-apiserver", outDir) | ||||||
| @@ -90,7 +91,7 @@ func main() { | |||||||
| 		} | 		} | ||||||
| 	case "kubelet": | 	case "kubelet": | ||||||
| 		// generate manpage for kubelet | 		// generate manpage for kubelet | ||||||
| 		kubelet := kubeletapp.NewKubeletCommand() | 		kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext()) | ||||||
| 		genMarkdown(kubelet, "", outDir) | 		genMarkdown(kubelet, "", outDir) | ||||||
| 		for _, c := range kubelet.Commands() { | 		for _, c := range kubelet.Commands() { | ||||||
| 			genMarkdown(c, "kubelet", outDir) | 			genMarkdown(c, "kubelet", outDir) | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ import ( | |||||||
| 	"os" | 	"os" | ||||||
| 	_ "time/tzdata" // for timeZone support in CronJob | 	_ "time/tzdata" // for timeZone support in CronJob | ||||||
|  |  | ||||||
|  | 	"k8s.io/apiserver/pkg/server" | ||||||
| 	"k8s.io/component-base/cli" | 	"k8s.io/component-base/cli" | ||||||
| 	_ "k8s.io/component-base/logs/json/register"          // for JSON log format registration | 	_ "k8s.io/component-base/logs/json/register"          // for JSON log format registration | ||||||
| 	_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins | 	_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins | ||||||
| @@ -30,7 +31,7 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
| 	command := app.NewAPIServerCommand() | 	command := app.NewAPIServerCommand(server.SetupSignalHandler()) | ||||||
| 	code := cli.Run(command) | 	code := cli.Run(command) | ||||||
| 	os.Exit(code) | 	os.Exit(code) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -63,7 +63,7 @@ func init() { | |||||||
| } | } | ||||||
|  |  | ||||||
| // NewAPIServerCommand creates a *cobra.Command object with default parameters | // NewAPIServerCommand creates a *cobra.Command object with default parameters | ||||||
| func NewAPIServerCommand() *cobra.Command { | func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command { | ||||||
| 	_, featureGate := utilversion.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister( | 	_, featureGate := utilversion.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister( | ||||||
| 		utilversion.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate) | 		utilversion.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate) | ||||||
| 	s := options.NewServerRunOptions() | 	s := options.NewServerRunOptions() | ||||||
| @@ -108,7 +108,7 @@ cluster's shared state through which all other components interact.`, | |||||||
| 			} | 			} | ||||||
| 			// add feature enablement metrics | 			// add feature enablement metrics | ||||||
| 			featureGate.AddMetrics() | 			featureGate.AddMetrics() | ||||||
| 			return Run(cmd.Context(), completedOptions) | 			return Run(cmd.Context(), completedOptions, stopCh) | ||||||
| 		}, | 		}, | ||||||
| 		Args: func(cmd *cobra.Command, args []string) error { | 		Args: func(cmd *cobra.Command, args []string) error { | ||||||
| 			for _, arg := range args { | 			for _, arg := range args { | ||||||
| @@ -119,7 +119,6 @@ cluster's shared state through which all other components interact.`, | |||||||
| 			return nil | 			return nil | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	cmd.SetContext(genericapiserver.SetupSignalContext()) |  | ||||||
|  |  | ||||||
| 	fs := cmd.Flags() | 	fs := cmd.Flags() | ||||||
| 	namedFlagSets := s.Flags() | 	namedFlagSets := s.Flags() | ||||||
| @@ -137,7 +136,7 @@ cluster's shared state through which all other components interact.`, | |||||||
| } | } | ||||||
|  |  | ||||||
| // Run runs the specified APIServer.  This should never exit. | // Run runs the specified APIServer.  This should never exit. | ||||||
| func Run(ctx context.Context, opts options.CompletedOptions) error { | func Run(ctx context.Context, opts options.CompletedOptions, stopCh <-chan struct{}) error { | ||||||
| 	// To help debugging, immediately log version | 	// To help debugging, immediately log version | ||||||
| 	klog.Infof("Version: %+v", version.Get()) | 	klog.Infof("Version: %+v", version.Get()) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -60,7 +60,6 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/util/sets" | 	"k8s.io/apimachinery/pkg/util/sets" | ||||||
| 	"k8s.io/apimachinery/pkg/util/validation/field" | 	"k8s.io/apimachinery/pkg/util/validation/field" | ||||||
| 	"k8s.io/apimachinery/pkg/util/wait" | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	genericapiserver "k8s.io/apiserver/pkg/server" |  | ||||||
| 	"k8s.io/apiserver/pkg/server/healthz" | 	"k8s.io/apiserver/pkg/server/healthz" | ||||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
| 	clientset "k8s.io/client-go/kubernetes" | 	clientset "k8s.io/client-go/kubernetes" | ||||||
| @@ -134,7 +133,7 @@ const ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| // NewKubeletCommand creates a *cobra.Command object with default parameters | // NewKubeletCommand creates a *cobra.Command object with default parameters | ||||||
| func NewKubeletCommand() *cobra.Command { | func NewKubeletCommand(ctx context.Context) *cobra.Command { | ||||||
| 	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) | 	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) | ||||||
| 	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) | 	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) | ||||||
| 	kubeletFlags := options.NewKubeletFlags() | 	kubeletFlags := options.NewKubeletFlags() | ||||||
| @@ -271,7 +270,6 @@ is checked every 20 seconds (also configurable with a flag).`, | |||||||
| 			if err := checkPermissions(); err != nil { | 			if err := checkPermissions(); err != nil { | ||||||
| 				klog.ErrorS(err, "kubelet running with insufficient permissions") | 				klog.ErrorS(err, "kubelet running with insufficient permissions") | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// make the kubelet's config safe for logging | 			// make the kubelet's config safe for logging | ||||||
| 			config := kubeletServer.KubeletConfiguration.DeepCopy() | 			config := kubeletServer.KubeletConfiguration.DeepCopy() | ||||||
| 			for k := range config.StaticPodURLHeader { | 			for k := range config.StaticPodURLHeader { | ||||||
| @@ -280,9 +278,6 @@ is checked every 20 seconds (also configurable with a flag).`, | |||||||
| 			// log the kubelet's config for inspection | 			// log the kubelet's config for inspection | ||||||
| 			klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config)) | 			klog.V(5).InfoS("KubeletConfiguration", "configuration", klog.Format(config)) | ||||||
|  |  | ||||||
| 			// set up signal context for kubelet shutdown |  | ||||||
| 			ctx := genericapiserver.SetupSignalContext() |  | ||||||
|  |  | ||||||
| 			utilfeature.DefaultMutableFeatureGate.AddMetrics() | 			utilfeature.DefaultMutableFeatureGate.AddMetrics() | ||||||
| 			// run the kubelet | 			// run the kubelet | ||||||
| 			return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate) | 			return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate) | ||||||
|   | |||||||
| @@ -24,6 +24,7 @@ package main | |||||||
| import ( | import ( | ||||||
| 	"os" | 	"os" | ||||||
|  |  | ||||||
|  | 	"k8s.io/apiserver/pkg/server" | ||||||
| 	"k8s.io/component-base/cli" | 	"k8s.io/component-base/cli" | ||||||
| 	_ "k8s.io/component-base/logs/json/register"          // for JSON log format registration | 	_ "k8s.io/component-base/logs/json/register"          // for JSON log format registration | ||||||
| 	_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration | 	_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration | ||||||
| @@ -32,7 +33,7 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
| 	command := app.NewKubeletCommand() | 	command := app.NewKubeletCommand(server.SetupSignalContext()) | ||||||
| 	code := cli.Run(command) | 	code := cli.Run(command) | ||||||
| 	os.Exit(code) | 	os.Exit(code) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -47,6 +47,7 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0 | |||||||
| // APIServer is a server which manages apiserver. | // APIServer is a server which manages apiserver. | ||||||
| type APIServer struct { | type APIServer struct { | ||||||
| 	storageConfig storagebackend.Config | 	storageConfig storagebackend.Config | ||||||
|  | 	stopCh        chan struct{} | ||||||
| 	cancel        func(error) | 	cancel        func(error) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -54,6 +55,7 @@ type APIServer struct { | |||||||
| func NewAPIServer(storageConfig storagebackend.Config) *APIServer { | func NewAPIServer(storageConfig storagebackend.Config) *APIServer { | ||||||
| 	return &APIServer{ | 	return &APIServer{ | ||||||
| 		storageConfig: storageConfig, | 		storageConfig: storageConfig, | ||||||
|  | 		stopCh:        make(chan struct{}), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -112,7 +114,7 @@ func (a *APIServer) Start(ctx context.Context) error { | |||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		err = apiserver.Run(ctx, completedOptions) | 		err = apiserver.Run(ctx, completedOptions, a.stopCh) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			errCh <- fmt.Errorf("run apiserver error: %w", err) | 			errCh <- fmt.Errorf("run apiserver error: %w", err) | ||||||
| 			return | 			return | ||||||
| @@ -132,6 +134,10 @@ func (a *APIServer) Stop() error { | |||||||
| 	if a.cancel != nil { | 	if a.cancel != nil { | ||||||
| 		a.cancel(errors.New("stopping API server")) | 		a.cancel(errors.New("stopping API server")) | ||||||
| 	} | 	} | ||||||
|  | 	if a.stopCh != nil { | ||||||
|  | 		close(a.stopCh) | ||||||
|  | 		a.stopCh = nil | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user