Rename SchedulerLoop -> Controller
This commit is contained in:
Binary file not shown.
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package schedulerloop
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
@@ -35,11 +35,11 @@ const (
|
|||||||
Scheduled = "Scheduled"
|
Scheduled = "Scheduled"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SchedulerLoop interface {
|
type Controller interface {
|
||||||
Run(<-chan struct{})
|
Run(<-chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type schedulerLoop struct {
|
type controller struct {
|
||||||
algorithm algorithm.SchedulerAlgorithm
|
algorithm algorithm.SchedulerAlgorithm
|
||||||
binder binder.Binder
|
binder binder.Binder
|
||||||
nextPod func() *api.Pod
|
nextPod func() *api.Pod
|
||||||
@@ -51,8 +51,8 @@ type schedulerLoop struct {
|
|||||||
|
|
||||||
func New(client *client.Client, algorithm algorithm.SchedulerAlgorithm,
|
func New(client *client.Client, algorithm algorithm.SchedulerAlgorithm,
|
||||||
recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error),
|
recorder record.EventRecorder, nextPod func() *api.Pod, error func(pod *api.Pod, schedulingErr error),
|
||||||
binder binder.Binder, started chan<- struct{}) SchedulerLoop {
|
binder binder.Binder, started chan<- struct{}) Controller {
|
||||||
return &schedulerLoop{
|
return &controller{
|
||||||
algorithm: algorithm,
|
algorithm: algorithm,
|
||||||
binder: binder,
|
binder: binder,
|
||||||
nextPod: nextPod,
|
nextPod: nextPod,
|
||||||
@@ -63,14 +63,14 @@ func New(client *client.Client, algorithm algorithm.SchedulerAlgorithm,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerLoop) Run(done <-chan struct{}) {
|
func (s *controller) Run(done <-chan struct{}) {
|
||||||
defer close(s.started)
|
defer close(s.started)
|
||||||
go runtime.Until(s.scheduleOne, recoveryDelay, done)
|
go runtime.Until(s.scheduleOne, recoveryDelay, done)
|
||||||
}
|
}
|
||||||
|
|
||||||
// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go,
|
// hacked from GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go,
|
||||||
// with the Modeler stuff removed since we don't use it because we have mesos.
|
// with the Modeler stuff removed since we don't use it because we have mesos.
|
||||||
func (s *schedulerLoop) scheduleOne() {
|
func (s *controller) scheduleOne() {
|
||||||
pod := s.nextPod()
|
pod := s.nextPod()
|
||||||
|
|
||||||
// pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet
|
// pods which are pre-scheduled (i.e. NodeName is set) are deleted by the kubelet
|
||||||
@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Package schedulerloop implement the scheduler loop which waits for pod
|
// Package controller implements the scheduling controller which waits for pod
|
||||||
// events from the queuer and passes them to the SchedulerAlgorithm.
|
// events from the queuer (i.e. from the apiserver), passes them to the
|
||||||
package schedulerloop
|
// SchedulerAlgorithm and in case of success to the binder which does the launch.
|
||||||
|
package controller
|
||||||
@@ -29,11 +29,11 @@ import (
|
|||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/binder"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/binder"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/controller"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/errorhandler"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/errorhandler"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/podreconciler"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/podreconciler"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/schedulerloop"
|
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
|
||||||
@@ -47,11 +47,11 @@ import (
|
|||||||
type sched struct {
|
type sched struct {
|
||||||
podReconciler podreconciler.PodReconciler
|
podReconciler podreconciler.PodReconciler
|
||||||
framework framework.Framework
|
framework framework.Framework
|
||||||
loop schedulerloop.SchedulerLoop
|
controller controller.Controller
|
||||||
|
|
||||||
// unsafe state, needs to be guarded, especially changes to podtask.T objects
|
// unsafe state, needs to be guarded, especially changes to podtask.T objects
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
taskRegistry podtask.Registry
|
taskRegistry podtask.Registry
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler,
|
func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler,
|
||||||
@@ -108,12 +108,12 @@ func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler
|
|||||||
podtask.InstallDebugHandlers(core.Tasks(), mux)
|
podtask.InstallDebugHandlers(core.Tasks(), mux)
|
||||||
})
|
})
|
||||||
|
|
||||||
core.loop = schedulerloop.New(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch)
|
core.controller = controller.New(client, algorithm, recorder, q.Yield, errorHandler.Error, binder, startLatch)
|
||||||
return core
|
return core
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sched) Run(done <-chan struct{}) {
|
func (c *sched) Run(done <-chan struct{}) {
|
||||||
c.loop.Run(done)
|
c.controller.Run(done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sched) Reconcile(t *podtask.T) {
|
func (c *sched) Reconcile(t *podtask.T) {
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package scheduler
|
|||||||
|
|
||||||
// Created from contrib/mesos/docs/scheduler.monopic:
|
// Created from contrib/mesos/docs/scheduler.monopic:
|
||||||
//
|
//
|
||||||
|
//
|
||||||
// ┌───────────────────────────────────────────────────────────────────────┐
|
// ┌───────────────────────────────────────────────────────────────────────┐
|
||||||
// │ ┌───────────────────────────────────────┐ ┌─┴──────────────────────┐ ┌───────────────┐
|
// │ ┌───────────────────────────────────────┐ ┌─┴──────────────────────┐ ┌───────────────┐
|
||||||
// ┌─────▼─────┐ │Queuer │ Await() │ podUpdates │ │ │
|
// ┌─────▼─────┐ │Queuer │ Await() │ podUpdates │ │ │
|
||||||
@@ -31,7 +32,7 @@ package scheduler
|
|||||||
// ││ │ │ ┌────────────────────┼─────────────────┐
|
// ││ │ │ ┌────────────────────┼─────────────────┐
|
||||||
// ┌───────────────────┼┼──────────────────────────────────────┐ │ ┌───────────────────┼────┼───────────┐ │ │
|
// ┌───────────────────┼┼──────────────────────────────────────┐ │ ┌───────────────────┼────┼───────────┐ │ │
|
||||||
// ┌───────────▼──────────┐┌───────┴┴───────┐ ┌───────────────────┐ ┌──┴─┴─┴──────┐ ┌────────┴────┴───┐ ┌────▼────────▼─────────────┐ │
|
// ┌───────────▼──────────┐┌───────┴┴───────┐ ┌───────────────────┐ ┌──┴─┴─┴──────┐ ┌────────┴────┴───┐ ┌────▼────────▼─────────────┐ │
|
||||||
// │Binder (task launcher)││Deleter │ │PodReconciler │ │SchedulerLoop│ │ ErrorHandler │ │SchedulerAlgorithm │ │
|
// │Binder (task launcher)││Deleter │ │PodReconciler │ │Controller │ │ ErrorHandler │ │SchedulerAlgorithm │ │
|
||||||
// │- Bind(binding) ││- DeleteOne(pod)│ │- Reconcile(pod) │ │- Run() │ │- Error(pod, err)│ │- Schedule(pod) -> NodeName│ │
|
// │- Bind(binding) ││- DeleteOne(pod)│ │- Reconcile(pod) │ │- Run() │ │- Error(pod, err)│ │- Schedule(pod) -> NodeName│ │
|
||||||
// │ ││ │◀──│ │ │ │──▶│ │ │ │ │
|
// │ ││ │◀──│ │ │ │──▶│ │ │ │ │
|
||||||
// │ ┌─────┐││ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │┌─────┐ │ │
|
// │ ┌─────┐││ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │┌─────┐ │ │
|
||||||
|
|||||||
@@ -36,8 +36,8 @@ import (
|
|||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
|
||||||
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/controller"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/schedulerloop"
|
|
||||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||||
@@ -601,7 +601,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
|
|||||||
lt.podsListWatch.Add(pod, true) // notify watchers
|
lt.podsListWatch.Add(pod, true) // notify watchers
|
||||||
|
|
||||||
// wait for failedScheduling event because there is no offer
|
// wait for failedScheduling event because there is no offer
|
||||||
assert.EventWithReason(lt.eventObs, schedulerloop.FailedScheduling, "failedScheduling event not received")
|
assert.EventWithReason(lt.eventObs, controller.FailedScheduling, "failedScheduling event not received")
|
||||||
|
|
||||||
// add some matching offer
|
// add some matching offer
|
||||||
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
|
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
|
||||||
@@ -614,7 +614,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
|
|||||||
lt.framework.ResourceOffers(nil, offers)
|
lt.framework.ResourceOffers(nil, offers)
|
||||||
|
|
||||||
// and wait for scheduled pod
|
// and wait for scheduled pod
|
||||||
assert.EventWithReason(lt.eventObs, schedulerloop.Scheduled)
|
assert.EventWithReason(lt.eventObs, controller.Scheduled)
|
||||||
select {
|
select {
|
||||||
case launchedTask := <-launchedTasks:
|
case launchedTask := <-launchedTasks:
|
||||||
// report back that the task has been staged, and then started by mesos
|
// report back that the task has been staged, and then started by mesos
|
||||||
@@ -651,7 +651,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
|
|||||||
// Launch a pod and wait until the scheduler driver is called
|
// Launch a pod and wait until the scheduler driver is called
|
||||||
schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
|
||||||
// wait for failedScheduling event because there is no offer
|
// wait for failedScheduling event because there is no offer
|
||||||
assert.EventWithReason(lt.eventObs, schedulerloop.FailedScheduling, "failedScheduling event not received")
|
assert.EventWithReason(lt.eventObs, controller.FailedScheduling, "failedScheduling event not received")
|
||||||
|
|
||||||
// supply a matching offer
|
// supply a matching offer
|
||||||
lt.framework.ResourceOffers(lt.driver, offers)
|
lt.framework.ResourceOffers(lt.driver, offers)
|
||||||
@@ -666,7 +666,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// and wait to get scheduled
|
// and wait to get scheduled
|
||||||
assert.EventWithReason(lt.eventObs, schedulerloop.Scheduled)
|
assert.EventWithReason(lt.eventObs, controller.Scheduled)
|
||||||
|
|
||||||
// wait for driver.launchTasks call
|
// wait for driver.launchTasks call
|
||||||
select {
|
select {
|
||||||
|
|||||||
@@ -764,7 +764,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
|||||||
recorder := broadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
|
recorder := broadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
|
||||||
broadcaster.StartRecordingToSink(client.Events(""))
|
broadcaster.StartRecordingToSink(client.Events(""))
|
||||||
|
|
||||||
// create scheduler loop
|
// create scheduler core with all components arranged around it
|
||||||
lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything())
|
lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything())
|
||||||
sched := components.New(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
|
sched := components.New(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user