Split binder, deleter, podScheduler initialion from NewSchedulerLoop
This commit is contained in:
		@@ -427,6 +427,7 @@ type lifecycleTest struct {
 | 
			
		||||
	driver        *mmock.JoinableDriver
 | 
			
		||||
	eventObs      *EventObserver
 | 
			
		||||
	loop          operations.SchedulerLoopInterface
 | 
			
		||||
	podReconciler *operations.PodReconciler
 | 
			
		||||
	podsListWatch *MockPodsListWatch
 | 
			
		||||
	scheduler     *MesosScheduler
 | 
			
		||||
	schedulerProc *ha.SchedulerProcess
 | 
			
		||||
@@ -485,7 +486,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
 | 
			
		||||
	// create scheduler loop
 | 
			
		||||
	fw := &MesosFramework{MesosScheduler: mesosScheduler}
 | 
			
		||||
	eventObs := NewEventObserver()
 | 
			
		||||
	loop := operations.NewSchedulerLoop(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
 | 
			
		||||
	loop, _ := operations.NewScheduler(&c, fw, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
 | 
			
		||||
	assert.NotNil(loop)
 | 
			
		||||
 | 
			
		||||
	// create mock mesos scheduler driver
 | 
			
		||||
@@ -510,7 +511,7 @@ func (lt lifecycleTest) Start() <-chan LaunchedTask {
 | 
			
		||||
	// init scheduler
 | 
			
		||||
	err := lt.scheduler.Init(
 | 
			
		||||
		lt.schedulerProc.Master(),
 | 
			
		||||
		lt.loop,
 | 
			
		||||
		lt.podReconciler,
 | 
			
		||||
		http.DefaultServeMux,
 | 
			
		||||
	)
 | 
			
		||||
	assert.NoError(err)
 | 
			
		||||
 
 | 
			
		||||
@@ -42,9 +42,6 @@ const (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type SchedulerLoopInterface interface {
 | 
			
		||||
	ReconcilePodTask(t *podtask.T)
 | 
			
		||||
 | 
			
		||||
	// execute the Scheduling plugin, should start a go routine and return immediately
 | 
			
		||||
	Run(<-chan struct{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -55,12 +52,11 @@ type SchedulerLoop struct {
 | 
			
		||||
	error     func(*api.Pod, error)
 | 
			
		||||
	recorder  record.EventRecorder
 | 
			
		||||
	client    *client.Client
 | 
			
		||||
	pr        *PodReconciler
 | 
			
		||||
	starting  chan struct{} // startup latch
 | 
			
		||||
	started   chan<- struct{} // startup latch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder,
 | 
			
		||||
	terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) *SchedulerLoop {
 | 
			
		||||
func NewScheduler(c *config.Config, fw types.Framework, client *client.Client, recorder record.EventRecorder,
 | 
			
		||||
	terminate <-chan struct{}, mux *http.ServeMux, podsWatcher *cache.ListWatch) (SchedulerLoopInterface, *PodReconciler) {
 | 
			
		||||
 | 
			
		||||
	// Watch and queue pods that need scheduling.
 | 
			
		||||
	updates := make(chan queue.Entry, c.UpdatesBacklog)
 | 
			
		||||
@@ -74,10 +70,10 @@ func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Clien
 | 
			
		||||
	q := queuer.New(podUpdates)
 | 
			
		||||
	podDeleter := NewDeleter(fw, q)
 | 
			
		||||
	podReconciler := NewPodReconciler(fw, client, q, podDeleter)
 | 
			
		||||
	bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration)
 | 
			
		||||
	eh := NewErrorHandler(fw, bo, q)
 | 
			
		||||
 | 
			
		||||
	startLatch := make(chan struct{})
 | 
			
		||||
	eventBroadcaster := record.NewBroadcaster()
 | 
			
		||||
 | 
			
		||||
	runtime.On(startLatch, func() {
 | 
			
		||||
		eventBroadcaster.StartRecordingToSink(client.Events(""))
 | 
			
		||||
		reflector.Run() // TODO(jdef) should listen for termination
 | 
			
		||||
@@ -87,20 +83,27 @@ func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Clien
 | 
			
		||||
		q.InstallDebugHandlers(mux)
 | 
			
		||||
		podtask.InstallDebugHandlers(fw.Tasks(), mux)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return NewSchedulerLoop(c, fw, client, recorder, podUpdates, q, startLatch), podReconciler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewSchedulerLoop(c *config.Config, fw types.Framework, client *client.Client,
 | 
			
		||||
	recorder record.EventRecorder, podUpdates queue.FIFO, q *queuer.Queuer,
 | 
			
		||||
	started chan<- struct{}) *SchedulerLoop {
 | 
			
		||||
	bo := backoff.New(c.InitialPodBackoff.Duration, c.MaxPodBackoff.Duration)
 | 
			
		||||
	return &SchedulerLoop{
 | 
			
		||||
		algorithm: NewSchedulerAlgorithm(fw, podUpdates),
 | 
			
		||||
		binder:    NewBinder(fw),
 | 
			
		||||
		nextPod:   q.Yield,
 | 
			
		||||
		error:     eh.Error,
 | 
			
		||||
		error:     NewErrorHandler(fw, bo, q).Error,
 | 
			
		||||
		recorder:  recorder,
 | 
			
		||||
		client:    client,
 | 
			
		||||
		pr:        podReconciler,
 | 
			
		||||
		starting:  startLatch,
 | 
			
		||||
		started:   started,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *SchedulerLoop) Run(done <-chan struct{}) {
 | 
			
		||||
	defer close(s.starting)
 | 
			
		||||
	defer close(s.started)
 | 
			
		||||
	go runtime.Until(s.scheduleOne, recoveryDelay, done)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -141,7 +144,3 @@ func (s *SchedulerLoop) scheduleOne() {
 | 
			
		||||
	}
 | 
			
		||||
	s.recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *SchedulerLoop) ReconcilePodTask(t *podtask.T) {
 | 
			
		||||
	s.pr.Reconcile(t)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -56,8 +56,7 @@ import (
 | 
			
		||||
 | 
			
		||||
// KubernetesScheduler implements:
 | 
			
		||||
// 1: A mesos scheduler.
 | 
			
		||||
// 2: A kubernetes scheduler plugin.
 | 
			
		||||
// 3: A kubernetes pod.Registry.
 | 
			
		||||
// 2: A kubernetes pod.Registry.
 | 
			
		||||
type MesosScheduler struct {
 | 
			
		||||
	// We use a lock here to avoid races
 | 
			
		||||
	// between invoking the mesos callback
 | 
			
		||||
@@ -93,9 +92,8 @@ type MesosScheduler struct {
 | 
			
		||||
	taskRegistry podtask.Registry
 | 
			
		||||
 | 
			
		||||
	// via deferred init
 | 
			
		||||
 | 
			
		||||
	loop               operations.SchedulerLoopInterface
 | 
			
		||||
	reconciler         *operations.TasksReconciler
 | 
			
		||||
	podReconciler      *operations.PodReconciler
 | 
			
		||||
	tasksReconciler    *operations.TasksReconciler
 | 
			
		||||
	reconcileCooldown  time.Duration
 | 
			
		||||
	asRegisteredMaster proc.Doer
 | 
			
		||||
	terminate          <-chan struct{} // signal chan, closes when we should kill background tasks
 | 
			
		||||
@@ -172,7 +170,7 @@ func New(config Config) *MesosScheduler {
 | 
			
		||||
	return k
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (k *MesosScheduler) Init(electedMaster proc.Process, sl operations.SchedulerLoopInterface, mux *http.ServeMux) error {
 | 
			
		||||
func (k *MesosScheduler) Init(electedMaster proc.Process, pr *operations.PodReconciler, mux *http.ServeMux) error {
 | 
			
		||||
	log.V(1).Infoln("initializing kubernetes mesos scheduler")
 | 
			
		||||
 | 
			
		||||
	k.asRegisteredMaster = proc.DoerFunc(func(a proc.Action) <-chan error {
 | 
			
		||||
@@ -182,7 +180,7 @@ func (k *MesosScheduler) Init(electedMaster proc.Process, sl operations.Schedule
 | 
			
		||||
		return electedMaster.Do(a)
 | 
			
		||||
	})
 | 
			
		||||
	k.terminate = electedMaster.Done()
 | 
			
		||||
	k.loop = sl
 | 
			
		||||
	k.podReconciler = pr
 | 
			
		||||
	k.offers.Init(k.terminate)
 | 
			
		||||
	k.InstallDebugHandlers(mux)
 | 
			
		||||
	k.nodeRegistrator.Run(k.terminate)
 | 
			
		||||
@@ -223,8 +221,8 @@ func (k *MesosScheduler) InstallDebugHandlers(mux *http.ServeMux) {
 | 
			
		||||
			w.WriteHeader(http.StatusNoContent)
 | 
			
		||||
		}))
 | 
			
		||||
	}
 | 
			
		||||
	requestReconciliation("/debug/actions/requestExplicit", k.reconciler.RequestExplicit)
 | 
			
		||||
	requestReconciliation("/debug/actions/requestImplicit", k.reconciler.RequestImplicit)
 | 
			
		||||
	requestReconciliation("/debug/actions/requestExplicit", k.tasksReconciler.RequestExplicit)
 | 
			
		||||
	requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit)
 | 
			
		||||
 | 
			
		||||
	wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		slaves := k.slaveHostNames.SlaveIDs()
 | 
			
		||||
@@ -257,7 +255,7 @@ func (k *MesosScheduler) Registered(drv bindings.SchedulerDriver, fid *mesos.Fra
 | 
			
		||||
	k.registered = true
 | 
			
		||||
 | 
			
		||||
	k.onRegistration.Do(func() { k.onInitialRegistration(drv) })
 | 
			
		||||
	k.reconciler.RequestExplicit()
 | 
			
		||||
	k.tasksReconciler.RequestExplicit()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Reregistered is called when the scheduler re-registered with the master successfully.
 | 
			
		||||
@@ -270,7 +268,7 @@ func (k *MesosScheduler) Reregistered(drv bindings.SchedulerDriver, mi *mesos.Ma
 | 
			
		||||
	k.registered = true
 | 
			
		||||
 | 
			
		||||
	k.onRegistration.Do(func() { k.onInitialRegistration(drv) })
 | 
			
		||||
	k.reconciler.RequestExplicit()
 | 
			
		||||
	k.tasksReconciler.RequestExplicit()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// perform one-time initialization actions upon the first registration event received from Mesos.
 | 
			
		||||
@@ -290,13 +288,13 @@ func (k *MesosScheduler) onInitialRegistration(driver bindings.SchedulerDriver)
 | 
			
		||||
	r1 := k.makeTaskRegistryReconciler()
 | 
			
		||||
	r2 := k.makePodRegistryReconciler()
 | 
			
		||||
 | 
			
		||||
	k.reconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
 | 
			
		||||
	k.tasksReconciler = operations.NewTasksReconciler(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2),
 | 
			
		||||
		k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate)
 | 
			
		||||
	go k.reconciler.Run(driver)
 | 
			
		||||
	go k.tasksReconciler.Run(driver)
 | 
			
		||||
 | 
			
		||||
	if k.reconcileInterval > 0 {
 | 
			
		||||
		ri := time.Duration(k.reconcileInterval) * time.Second
 | 
			
		||||
		time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.reconciler.RequestImplicit, ri, k.terminate) })
 | 
			
		||||
		time.AfterFunc(k.schedulerConfig.InitialImplicitReconciliationDelay.Duration, func() { runtime.Until(k.tasksReconciler.RequestImplicit, ri, k.terminate) })
 | 
			
		||||
		log.Infof("will perform implicit task reconciliation at interval: %v after %v", ri, k.schedulerConfig.InitialImplicitReconciliationDelay.Duration)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -392,7 +390,7 @@ func (k *MesosScheduler) StatusUpdate(driver bindings.SchedulerDriver, taskStatu
 | 
			
		||||
	case mesos.TaskState_TASK_FAILED, mesos.TaskState_TASK_ERROR:
 | 
			
		||||
		if task, _ := k.taskRegistry.UpdateStatus(taskStatus); task != nil {
 | 
			
		||||
			if task.Has(podtask.Launched) && !task.Has(podtask.Bound) {
 | 
			
		||||
				go k.loop.ReconcilePodTask(task)
 | 
			
		||||
				go k.podReconciler.Reconcile(task)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
@@ -440,7 +438,7 @@ func (k *MesosScheduler) reconcileTerminalTask(driver bindings.SchedulerDriver,
 | 
			
		||||
	} else if taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_TERMINATED || taskStatus.GetReason() == mesos.TaskStatus_REASON_EXECUTOR_UNREGISTERED {
 | 
			
		||||
		// attempt to prevent dangling pods in the pod and task registries
 | 
			
		||||
		log.V(1).Infof("request explicit reconciliation to clean up for task %v after executor reported (terminated/unregistered)", taskStatus.TaskId.GetValue())
 | 
			
		||||
		k.reconciler.RequestExplicit()
 | 
			
		||||
		k.tasksReconciler.RequestExplicit()
 | 
			
		||||
	} else if taskStatus.GetState() == mesos.TaskState_TASK_LOST && state == podtask.StateRunning && taskStatus.ExecutorId != nil && taskStatus.SlaveId != nil {
 | 
			
		||||
		//TODO(jdef) this may not be meaningful once we have proper checkpointing and master detection
 | 
			
		||||
		//If we're reconciling and receive this then the executor may be
 | 
			
		||||
 
 | 
			
		||||
@@ -765,14 +765,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
 | 
			
		||||
	eventBroadcaster := record.NewBroadcaster()
 | 
			
		||||
	recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
 | 
			
		||||
	lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything())
 | 
			
		||||
	loop := operations.NewSchedulerLoop(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
 | 
			
		||||
	loop, pr := operations.NewScheduler(sc, fw, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
 | 
			
		||||
 | 
			
		||||
	runtime.On(mesosScheduler.Registration(), func() { loop.Run(schedulerProcess.Terminal()) })
 | 
			
		||||
	runtime.On(mesosScheduler.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))
 | 
			
		||||
 | 
			
		||||
	driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) {
 | 
			
		||||
		log.V(1).Infoln("performing deferred initialization")
 | 
			
		||||
		if err = mesosScheduler.Init(schedulerProcess.Master(), loop, s.mux); err != nil {
 | 
			
		||||
		if err = mesosScheduler.Init(schedulerProcess.Master(), pr, s.mux); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		log.V(1).Infoln("deferred init complete")
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user