Merge pull request #82072 from draveness/feature/use-context-instead-of-channel
feat(scheduler): use context in scheduler package
This commit is contained in:
@@ -136,7 +136,6 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
// Get the completed config
|
||||
cc := c.Complete()
|
||||
|
||||
@@ -151,11 +150,14 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
|
||||
return fmt.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
|
||||
return Run(cc, stopCh, registryOptions...)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
return Run(ctx, cc, registryOptions...)
|
||||
}
|
||||
|
||||
// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
|
||||
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
|
||||
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
|
||||
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, registryOptions ...Option) error {
|
||||
// To help debugging, immediately log version
|
||||
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
|
||||
|
||||
@@ -172,7 +174,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
|
||||
cc.PodInformer,
|
||||
cc.Recorder,
|
||||
cc.ComponentConfig.AlgorithmSource,
|
||||
stopCh,
|
||||
ctx.Done(),
|
||||
scheduler.WithName(cc.ComponentConfig.SchedulerName),
|
||||
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
|
||||
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
|
||||
@@ -190,7 +192,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
|
||||
|
||||
// Prepare the event broadcaster.
|
||||
if cc.Broadcaster != nil && cc.EventClient != nil {
|
||||
cc.Broadcaster.StartRecordingToSink(stopCh)
|
||||
cc.Broadcaster.StartRecordingToSink(ctx.Done())
|
||||
}
|
||||
if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil {
|
||||
cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
|
||||
@@ -205,53 +207,36 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
|
||||
if cc.InsecureServing != nil {
|
||||
separateMetrics := cc.InsecureMetricsServing != nil
|
||||
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
|
||||
if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
|
||||
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
|
||||
return fmt.Errorf("failed to start healthz server: %v", err)
|
||||
}
|
||||
}
|
||||
if cc.InsecureMetricsServing != nil {
|
||||
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
|
||||
if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
|
||||
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
|
||||
return fmt.Errorf("failed to start metrics server: %v", err)
|
||||
}
|
||||
}
|
||||
if cc.SecureServing != nil {
|
||||
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
|
||||
// TODO: handle stoppedCh returned by c.SecureServing.Serve
|
||||
if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
|
||||
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
|
||||
// fail early for secure handlers, removing the old error loop from above
|
||||
return fmt.Errorf("failed to start secure server: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start all informers.
|
||||
go cc.PodInformer.Informer().Run(stopCh)
|
||||
cc.InformerFactory.Start(stopCh)
|
||||
go cc.PodInformer.Informer().Run(ctx.Done())
|
||||
cc.InformerFactory.Start(ctx.Done())
|
||||
|
||||
// Wait for all caches to sync before scheduling.
|
||||
cc.InformerFactory.WaitForCacheSync(stopCh)
|
||||
|
||||
// Prepare a reusable runCommand function.
|
||||
run := func(ctx context.Context) {
|
||||
sched.Run()
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-stopCh:
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
cc.InformerFactory.WaitForCacheSync(ctx.Done())
|
||||
|
||||
// If leader election is enabled, runCommand via LeaderElector until done and exit.
|
||||
if cc.LeaderElection != nil {
|
||||
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: run,
|
||||
OnStartedLeading: sched.Run,
|
||||
OnStoppedLeading: func() {
|
||||
klog.Fatalf("leaderelection lost")
|
||||
},
|
||||
@@ -267,7 +252,7 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
|
||||
}
|
||||
|
||||
// Leader election is disabled, so runCommand inline until done.
|
||||
run(ctx)
|
||||
sched.Run(ctx)
|
||||
return fmt.Errorf("finished without leader elect")
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
@@ -62,9 +63,9 @@ type Logger interface {
|
||||
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
|
||||
// enough time to remove temporary files.
|
||||
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
|
||||
stopCh := make(chan struct{})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
tearDown := func() {
|
||||
close(stopCh)
|
||||
cancel()
|
||||
if len(result.TmpDir) != 0 {
|
||||
os.RemoveAll(result.TmpDir)
|
||||
}
|
||||
@@ -119,11 +120,11 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
go func(stopCh <-chan struct{}) {
|
||||
if err := app.Run(config.Complete(), stopCh); err != nil {
|
||||
go func(ctx context.Context) {
|
||||
if err := app.Run(ctx, config.Complete()); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}(stopCh)
|
||||
}(ctx)
|
||||
|
||||
t.Logf("Waiting for /healthz to be ok...")
|
||||
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
|
||||
|
||||
Reference in New Issue
Block a user