scheduler: reenable TestPlugin_LifeCycle, increase timeouts
This commit is contained in:
		| @@ -52,6 +52,11 @@ const ( | ||||
| 	pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	FailedScheduling = "FailedScheduling" | ||||
| 	Scheduled        = "Scheduled" | ||||
| ) | ||||
|  | ||||
| // scheduler abstraction to allow for easier unit testing | ||||
| type schedulerInterface interface { | ||||
| 	sync.Locker // synchronize scheduler plugin operations | ||||
| @@ -757,7 +762,7 @@ func (s *schedulingPlugin) scheduleOne() { | ||||
| 	dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister) // call kubeScheduler.Schedule | ||||
| 	if err != nil { | ||||
| 		log.V(1).Infof("Failed to schedule: %+v", pod) | ||||
| 		s.config.Recorder.Eventf(pod, "FailedScheduling", "Error scheduling: %v", err) | ||||
| 		s.config.Recorder.Eventf(pod, FailedScheduling, "Error scheduling: %v", err) | ||||
| 		s.config.Error(pod, err) | ||||
| 		return | ||||
| 	} | ||||
| @@ -770,11 +775,11 @@ func (s *schedulingPlugin) scheduleOne() { | ||||
| 	} | ||||
| 	if err := s.config.Binder.Bind(b); err != nil { | ||||
| 		log.V(1).Infof("Failed to bind pod: %+v", err) | ||||
| 		s.config.Recorder.Eventf(pod, "FailedScheduling", "Binding rejected: %v", err) | ||||
| 		s.config.Recorder.Eventf(pod, FailedScheduling, "Binding rejected: %v", err) | ||||
| 		s.config.Error(pod, err) | ||||
| 		return | ||||
| 	} | ||||
| 	s.config.Recorder.Eventf(pod, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest) | ||||
| 	s.config.Recorder.Eventf(pod, Scheduled, "Successfully assigned %v to %v", pod.Name, dest) | ||||
| } | ||||
|  | ||||
| // this pod may be out of sync with respect to the API server registry: | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package scheduler | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| @@ -34,7 +35,7 @@ import ( | ||||
|  | ||||
| 	log "github.com/golang/glog" | ||||
| 	mesos "github.com/mesos/mesos-go/mesosproto" | ||||
| 	util "github.com/mesos/mesos-go/mesosutil" | ||||
| 	"github.com/mesos/mesos-go/mesosutil" | ||||
| 	bindings "github.com/mesos/mesos-go/scheduler" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/mock" | ||||
| @@ -46,26 +47,48 @@ import ( | ||||
| 	"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" | ||||
| 	"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" | ||||
| 	mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||
| 	"k8s.io/kubernetes/pkg/util" | ||||
| ) | ||||
|  | ||||
| // A apiserver mock which partially mocks the pods API | ||||
| type TestServer struct { | ||||
| 	stats map[string]uint | ||||
| 	nodes map[string]*api.Node | ||||
| 	lock  sync.Mutex // guards above fields | ||||
|  | ||||
| 	server *httptest.Server | ||||
| 	stats  map[string]uint | ||||
| 	lock   sync.Mutex | ||||
| 	t      *testing.T | ||||
| } | ||||
|  | ||||
| func (srv *TestServer) LookupNode(name string) *api.Node { | ||||
| 	srv.lock.Lock() | ||||
| 	defer srv.lock.Unlock() | ||||
|  | ||||
| 	node, _ := api.Scheme.DeepCopy(srv.nodes[name]) | ||||
| 	return node.(*api.Node) | ||||
| } | ||||
|  | ||||
| func (srv *TestServer) WaitForNode(name string) { | ||||
| 	assertext.EventuallyTrue(srv.t, util.ForeverTestTimeout, func() bool { | ||||
| 		return srv.LookupNode(name) != nil | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsListWatch) *TestServer { | ||||
| 	ts := TestServer{ | ||||
| 		stats: map[string]uint{}, | ||||
| 		nodes: map[string]*api.Node{}, | ||||
| 		t:     t, | ||||
| 	} | ||||
| 	mux := http.NewServeMux() | ||||
|  | ||||
| 	mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), func(w http.ResponseWriter, r *http.Request) { | ||||
| 	podListHandler := func(w http.ResponseWriter, r *http.Request) { | ||||
| 		w.WriteHeader(http.StatusOK) | ||||
| 		pods := mockPodListWatch.Pods() | ||||
| 		w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods))) | ||||
| 	}) | ||||
| 	} | ||||
| 	mux.HandleFunc(testapi.Default.ResourcePath("pods", namespace, ""), podListHandler) | ||||
| 	mux.HandleFunc(testapi.Default.ResourcePath("pods", "", ""), podListHandler) | ||||
|  | ||||
| 	podsPrefix := testapi.Default.ResourcePath("pods", namespace, "") + "/" | ||||
| 	mux.HandleFunc(podsPrefix, func(w http.ResponseWriter, r *http.Request) { | ||||
| @@ -76,7 +99,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis | ||||
| 		defer ts.lock.Unlock() | ||||
| 		ts.stats[name] = ts.stats[name] + 1 | ||||
|  | ||||
| 		p := mockPodListWatch.GetPod(name) | ||||
| 		p := mockPodListWatch.Pod(name) | ||||
| 		if p != nil { | ||||
| 			w.WriteHeader(http.StatusOK) | ||||
| 			w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p))) | ||||
| @@ -85,9 +108,33 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis | ||||
| 		w.WriteHeader(http.StatusNotFound) | ||||
| 	}) | ||||
|  | ||||
| 	mux.HandleFunc(testapi.Default.ResourcePath("events", namespace, ""), func(w http.ResponseWriter, r *http.Request) { | ||||
| 		w.WriteHeader(http.StatusOK) | ||||
| 	}) | ||||
| 	mux.HandleFunc( | ||||
| 		testapi.Default.ResourcePath("events", namespace, ""), | ||||
| 		func(w http.ResponseWriter, r *http.Request) { | ||||
| 			w.WriteHeader(http.StatusOK) | ||||
| 		}, | ||||
| 	) | ||||
|  | ||||
| 	mux.HandleFunc( | ||||
| 		testapi.Default.ResourcePath("nodes", "", ""), | ||||
| 		func(w http.ResponseWriter, r *http.Request) { | ||||
| 			var node api.Node | ||||
| 			if err := json.NewDecoder(r.Body).Decode(&node); err != nil { | ||||
| 				w.WriteHeader(http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			ts.lock.Lock() | ||||
| 			defer ts.lock.Unlock() | ||||
| 			ts.nodes[node.Name] = &node | ||||
|  | ||||
| 			if err := json.NewEncoder(w).Encode(node); err != nil { | ||||
| 				w.WriteHeader(http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
| 			w.WriteHeader(http.StatusOK) | ||||
| 		}, | ||||
| 	) | ||||
|  | ||||
| 	mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { | ||||
| 		t.Errorf("unexpected request: %v", req.RequestURI) | ||||
| @@ -97,6 +144,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis | ||||
| 	ts.server = httptest.NewServer(mux) | ||||
| 	return &ts | ||||
| } | ||||
|  | ||||
| func (ts *TestServer) Stats(name string) uint { | ||||
| 	ts.lock.Lock() | ||||
| 	defer ts.lock.Unlock() | ||||
| @@ -131,13 +179,15 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch { | ||||
| 	} | ||||
| 	return &lw | ||||
| } | ||||
|  | ||||
| func (lw *MockPodsListWatch) Pods() api.PodList { | ||||
| 	lw.lock.Lock() | ||||
| 	defer lw.lock.Unlock() | ||||
|  | ||||
| 	return lw.list | ||||
| } | ||||
| func (lw *MockPodsListWatch) GetPod(name string) *api.Pod { | ||||
|  | ||||
| func (lw *MockPodsListWatch) Pod(name string) *api.Pod { | ||||
| 	lw.lock.Lock() | ||||
| 	defer lw.lock.Unlock() | ||||
|  | ||||
| @@ -173,6 +223,7 @@ func (lw *MockPodsListWatch) Modify(pod *api.Pod, notify bool) { | ||||
| 	} | ||||
| 	log.Fatalf("Cannot find pod %v to modify in MockPodsListWatch", pod.Name) | ||||
| } | ||||
|  | ||||
| func (lw *MockPodsListWatch) Delete(pod *api.Pod, notify bool) { | ||||
| 	lw.lock.Lock() | ||||
| 	defer lw.lock.Unlock() | ||||
| @@ -229,16 +280,16 @@ func NewTestPod() (*api.Pod, int) { | ||||
| // Offering some cpus and memory and the 8000-9000 port range | ||||
| func NewTestOffer(id string) *mesos.Offer { | ||||
| 	hostname := "some_hostname" | ||||
| 	cpus := util.NewScalarResource("cpus", 3.75) | ||||
| 	mem := util.NewScalarResource("mem", 940) | ||||
| 	cpus := mesosutil.NewScalarResource("cpus", 3.75) | ||||
| 	mem := mesosutil.NewScalarResource("mem", 940) | ||||
| 	var port8000 uint64 = 8000 | ||||
| 	var port9000 uint64 = 9000 | ||||
| 	ports8000to9000 := mesos.Value_Range{Begin: &port8000, End: &port9000} | ||||
| 	ports := util.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000}) | ||||
| 	ports := mesosutil.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000}) | ||||
| 	return &mesos.Offer{ | ||||
| 		Id:        util.NewOfferID(id), | ||||
| 		Id:        mesosutil.NewOfferID(id), | ||||
| 		Hostname:  &hostname, | ||||
| 		SlaveId:   util.NewSlaveID(hostname), | ||||
| 		SlaveId:   mesosutil.NewSlaveID(hostname), | ||||
| 		Resources: []*mesos.Resource{cpus, mem, ports}, | ||||
| 	} | ||||
| } | ||||
| @@ -266,9 +317,11 @@ func NewEventObserver() *EventObserver { | ||||
| 		fifo: make(chan Event, 1000), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (o *EventObserver) Event(object runtime.Object, reason, message string) { | ||||
| 	o.fifo <- Event{Object: object, Reason: reason, Message: message} | ||||
| } | ||||
|  | ||||
| func (o *EventObserver) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) { | ||||
| 	o.fifo <- Event{Object: object, Reason: reason, Message: fmt.Sprintf(messageFmt, args...)} | ||||
| } | ||||
| @@ -278,7 +331,7 @@ func (o *EventObserver) PastEventf(object runtime.Object, timestamp unversioned. | ||||
|  | ||||
| func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, msgAndArgs ...interface{}) bool { | ||||
| 	// parse msgAndArgs: first possibly a duration, otherwise a format string with further args | ||||
| 	timeout := time.Second * 2 | ||||
| 	timeout := util.ForeverTestTimeout | ||||
| 	msg := "event not received" | ||||
| 	msgArgStart := 0 | ||||
| 	if len(msgAndArgs) > 0 { | ||||
| @@ -326,6 +379,7 @@ func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, ms | ||||
| 		return a.Fail(msg) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (a *EventAssertions) EventWithReason(observer *EventObserver, reason string, msgAndArgs ...interface{}) bool { | ||||
| 	return a.Event(observer, func(e Event) bool { | ||||
| 		return e.Reason == reason | ||||
| @@ -362,6 +416,175 @@ func newTaskStatusForTask(task *mesos.TaskInfo, state mesos.TaskState) *mesos.Ta | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type LaunchedTask struct { | ||||
| 	offerId  mesos.OfferID | ||||
| 	taskInfo *mesos.TaskInfo | ||||
| } | ||||
|  | ||||
| type lifecycleTest struct { | ||||
| 	apiServer     *TestServer | ||||
| 	driver        *joinableDriver | ||||
| 	eventObs      *EventObserver | ||||
| 	plugin        *schedulingPlugin | ||||
| 	podsListWatch *MockPodsListWatch | ||||
| 	scheduler     *KubernetesScheduler | ||||
| 	schedulerProc *ha.SchedulerProcess | ||||
| 	t             *testing.T | ||||
| } | ||||
|  | ||||
| func newLifecycleTest(t *testing.T) lifecycleTest { | ||||
| 	assert := &EventAssertions{*assert.New(t)} | ||||
|  | ||||
| 	// create a fake pod watch. We use that below to submit new pods to the scheduler | ||||
| 	podsListWatch := NewMockPodsListWatch(api.PodList{}) | ||||
|  | ||||
| 	// create fake apiserver | ||||
| 	apiServer := NewTestServer(t, api.NamespaceDefault, podsListWatch) | ||||
|  | ||||
| 	// create executor with some data for static pods if set | ||||
| 	executor := mesosutil.NewExecutorInfo( | ||||
| 		mesosutil.NewExecutorID("executor-id"), | ||||
| 		mesosutil.NewCommandInfo("executor-cmd"), | ||||
| 	) | ||||
| 	executor.Data = []byte{0, 1, 2} | ||||
|  | ||||
| 	// create scheduler | ||||
| 	strategy := NewAllocationStrategy( | ||||
| 		podtask.DefaultPredicate, | ||||
| 		podtask.NewDefaultProcurement( | ||||
| 			mresource.DefaultDefaultContainerCPULimit, | ||||
| 			mresource.DefaultDefaultContainerMemLimit, | ||||
| 		), | ||||
| 	) | ||||
|  | ||||
| 	scheduler := New(Config{ | ||||
| 		Executor: executor, | ||||
| 		Client: client.NewOrDie(&client.Config{ | ||||
| 			Host:    apiServer.server.URL, | ||||
| 			Version: testapi.Default.Version(), | ||||
| 		}), | ||||
| 		Scheduler:  NewFCFSPodScheduler(strategy, apiServer.LookupNode), | ||||
| 		Schedcfg:   *schedcfg.CreateDefaultConfig(), | ||||
| 		LookupNode: apiServer.LookupNode, | ||||
| 	}) | ||||
|  | ||||
| 	assert.NotNil(scheduler.client, "client is nil") | ||||
| 	assert.NotNil(scheduler.executor, "executor is nil") | ||||
| 	assert.NotNil(scheduler.offers, "offer registry is nil") | ||||
|  | ||||
| 	// create scheduler process | ||||
| 	schedulerProc := ha.New(scheduler) | ||||
|  | ||||
| 	// get plugin config from it | ||||
| 	config := scheduler.NewPluginConfig( | ||||
| 		schedulerProc.Terminal(), | ||||
| 		http.DefaultServeMux, | ||||
| 		&podsListWatch.ListWatch, | ||||
| 	) | ||||
| 	assert.NotNil(config) | ||||
|  | ||||
| 	// make events observable | ||||
| 	eventObs := NewEventObserver() | ||||
| 	config.Recorder = eventObs | ||||
|  | ||||
| 	// create plugin | ||||
| 	plugin := NewPlugin(config).(*schedulingPlugin) | ||||
| 	assert.NotNil(plugin) | ||||
|  | ||||
| 	// create mock mesos scheduler driver | ||||
| 	driver := &joinableDriver{} | ||||
|  | ||||
| 	return lifecycleTest{ | ||||
| 		apiServer:     apiServer, | ||||
| 		driver:        driver, | ||||
| 		eventObs:      eventObs, | ||||
| 		plugin:        plugin, | ||||
| 		podsListWatch: podsListWatch, | ||||
| 		scheduler:     scheduler, | ||||
| 		schedulerProc: schedulerProc, | ||||
| 		t:             t, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (lt lifecycleTest) Start() <-chan LaunchedTask { | ||||
| 	assert := &EventAssertions{*assert.New(lt.t)} | ||||
| 	lt.plugin.Run(lt.schedulerProc.Terminal()) | ||||
|  | ||||
| 	// init scheduler | ||||
| 	err := lt.scheduler.Init( | ||||
| 		lt.schedulerProc.Master(), | ||||
| 		lt.plugin, | ||||
| 		http.DefaultServeMux, | ||||
| 	) | ||||
| 	assert.NoError(err) | ||||
|  | ||||
| 	lt.driver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once() | ||||
| 	started := lt.driver.Upon() | ||||
|  | ||||
| 	lt.driver.On("ReconcileTasks", | ||||
| 		mock.AnythingOfType("[]*mesosproto.TaskStatus"), | ||||
| 	).Return(mesos.Status_DRIVER_RUNNING, nil) | ||||
|  | ||||
| 	lt.driver.On("SendFrameworkMessage", | ||||
| 		mock.AnythingOfType("*mesosproto.ExecutorID"), | ||||
| 		mock.AnythingOfType("*mesosproto.SlaveID"), | ||||
| 		mock.AnythingOfType("string"), | ||||
| 	).Return(mesos.Status_DRIVER_RUNNING, nil) | ||||
|  | ||||
| 	launchedTasks := make(chan LaunchedTask, 1) | ||||
| 	launchTasksFunc := func(args mock.Arguments) { | ||||
| 		offerIDs := args.Get(0).([]*mesos.OfferID) | ||||
| 		taskInfos := args.Get(1).([]*mesos.TaskInfo) | ||||
| 		assert.Equal(1, len(offerIDs)) | ||||
| 		assert.Equal(1, len(taskInfos)) | ||||
|  | ||||
| 		launchedTasks <- LaunchedTask{ | ||||
| 			offerId:  *offerIDs[0], | ||||
| 			taskInfo: taskInfos[0], | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	lt.driver.On("LaunchTasks", | ||||
| 		mock.AnythingOfType("[]*mesosproto.OfferID"), | ||||
| 		mock.AnythingOfType("[]*mesosproto.TaskInfo"), | ||||
| 		mock.AnythingOfType("*mesosproto.Filters"), | ||||
| 	).Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksFunc) | ||||
|  | ||||
| 	lt.driver.On("DeclineOffer", | ||||
| 		mock.AnythingOfType("*mesosproto.OfferID"), | ||||
| 		mock.AnythingOfType("*mesosproto.Filters"), | ||||
| 	).Return(mesos.Status_DRIVER_RUNNING, nil) | ||||
|  | ||||
| 	// elect master with mock driver | ||||
| 	driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) { | ||||
| 		return lt.driver, nil | ||||
| 	}) | ||||
| 	lt.schedulerProc.Elect(driverFactory) | ||||
| 	elected := lt.schedulerProc.Elected() | ||||
|  | ||||
| 	// driver will be started | ||||
| 	<-started | ||||
|  | ||||
| 	// tell scheduler to be registered | ||||
| 	lt.scheduler.Registered( | ||||
| 		lt.driver, | ||||
| 		mesosutil.NewFrameworkID("kubernetes-id"), | ||||
| 		mesosutil.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050), | ||||
| 	) | ||||
|  | ||||
| 	// wait for being elected | ||||
| 	<-elected | ||||
| 	return launchedTasks | ||||
| } | ||||
|  | ||||
| func (lt lifecycleTest) Close() { | ||||
| 	lt.apiServer.server.Close() | ||||
| } | ||||
|  | ||||
| func (lt lifecycleTest) End() <-chan struct{} { | ||||
| 	return lt.schedulerProc.End() | ||||
| } | ||||
|  | ||||
| // Test to create the scheduler plugin with an empty plugin config | ||||
| func TestPlugin_New(t *testing.T) { | ||||
| 	assert := assert.New(t) | ||||
| @@ -371,167 +594,89 @@ func TestPlugin_New(t *testing.T) { | ||||
| 	assert.NotNil(p) | ||||
| } | ||||
|  | ||||
| // Test to create the scheduler plugin with the config returned by the scheduler, | ||||
| // and play through the whole life cycle of the plugin while creating pods, deleting | ||||
| // TestPlugin_LifeCycle creates a scheduler plugin with the config returned by the scheduler, | ||||
| // and plays through the whole life cycle of the plugin while creating pods, deleting | ||||
| // and failing them. | ||||
| func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 	t.Skip("This test is flaky, see #11901") | ||||
| 	assert := &EventAssertions{*assert.New(t)} | ||||
|  | ||||
| 	// create a fake pod watch. We use that below to submit new pods to the scheduler | ||||
| 	podListWatch := NewMockPodsListWatch(api.PodList{}) | ||||
|  | ||||
| 	// create fake apiserver | ||||
| 	testApiServer := NewTestServer(t, api.NamespaceDefault, podListWatch) | ||||
| 	defer testApiServer.server.Close() | ||||
|  | ||||
| 	// create executor with some data for static pods if set | ||||
| 	executor := util.NewExecutorInfo( | ||||
| 		util.NewExecutorID("executor-id"), | ||||
| 		util.NewCommandInfo("executor-cmd"), | ||||
| 	) | ||||
| 	executor.Data = []byte{0, 1, 2} | ||||
|  | ||||
| 	// create scheduler | ||||
| 	nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) | ||||
| 	as := NewAllocationStrategy( | ||||
| 		podtask.DefaultPredicate, | ||||
| 		podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)) | ||||
| 	testScheduler := New(Config{ | ||||
| 		Executor: executor, | ||||
| 		Client:   client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Default.Version()}), | ||||
| 		Scheduler: NewFCFSPodScheduler(as, func(node string) *api.Node { | ||||
| 			obj, _, _ := nodeStore.GetByKey(node) | ||||
| 			if obj == nil { | ||||
| 				return nil | ||||
| 			} | ||||
| 			return obj.(*api.Node) | ||||
| 		}), | ||||
| 		Schedcfg: *schedcfg.CreateDefaultConfig(), | ||||
| 	}) | ||||
|  | ||||
| 	assert.NotNil(testScheduler.client, "client is nil") | ||||
| 	assert.NotNil(testScheduler.executor, "executor is nil") | ||||
| 	assert.NotNil(testScheduler.offers, "offer registry is nil") | ||||
|  | ||||
| 	// create scheduler process | ||||
| 	schedulerProcess := ha.New(testScheduler) | ||||
|  | ||||
| 	// get plugin config from it | ||||
| 	c := testScheduler.NewPluginConfig(schedulerProcess.Terminal(), http.DefaultServeMux, &podListWatch.ListWatch) | ||||
| 	assert.NotNil(c) | ||||
|  | ||||
| 	// make events observable | ||||
| 	eventObserver := NewEventObserver() | ||||
| 	c.Recorder = eventObserver | ||||
|  | ||||
| 	// create plugin | ||||
| 	p := NewPlugin(c).(*schedulingPlugin) | ||||
| 	assert.NotNil(p) | ||||
| 	lt := newLifecycleTest(t) | ||||
| 	defer lt.Close() | ||||
|  | ||||
| 	// run plugin | ||||
| 	p.Run(schedulerProcess.Terminal()) | ||||
| 	defer schedulerProcess.End() | ||||
|  | ||||
| 	// init scheduler | ||||
| 	err := testScheduler.Init(schedulerProcess.Master(), p, http.DefaultServeMux) | ||||
| 	assert.NoError(err) | ||||
|  | ||||
| 	// create mock mesos scheduler driver | ||||
| 	mockDriver := &joinableDriver{} | ||||
| 	mockDriver.On("Start").Return(mesos.Status_DRIVER_RUNNING, nil).Once() | ||||
| 	started := mockDriver.Upon() | ||||
|  | ||||
| 	mAny := mock.AnythingOfType | ||||
| 	mockDriver.On("ReconcileTasks", mAny("[]*mesosproto.TaskStatus")).Return(mesos.Status_DRIVER_RUNNING, nil) | ||||
| 	mockDriver.On("SendFrameworkMessage", mAny("*mesosproto.ExecutorID"), mAny("*mesosproto.SlaveID"), mAny("string")). | ||||
| 		Return(mesos.Status_DRIVER_RUNNING, nil) | ||||
|  | ||||
| 	type LaunchedTask struct { | ||||
| 		offerId  mesos.OfferID | ||||
| 		taskInfo *mesos.TaskInfo | ||||
| 	} | ||||
| 	launchedTasks := make(chan LaunchedTask, 1) | ||||
| 	launchTasksCalledFunc := func(args mock.Arguments) { | ||||
| 		offerIDs := args.Get(0).([]*mesos.OfferID) | ||||
| 		taskInfos := args.Get(1).([]*mesos.TaskInfo) | ||||
| 		assert.Equal(1, len(offerIDs)) | ||||
| 		assert.Equal(1, len(taskInfos)) | ||||
| 		launchedTasks <- LaunchedTask{ | ||||
| 			offerId:  *offerIDs[0], | ||||
| 			taskInfo: taskInfos[0], | ||||
| 		} | ||||
| 	} | ||||
| 	mockDriver.On("LaunchTasks", mAny("[]*mesosproto.OfferID"), mAny("[]*mesosproto.TaskInfo"), mAny("*mesosproto.Filters")). | ||||
| 		Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksCalledFunc) | ||||
| 	mockDriver.On("DeclineOffer", mAny("*mesosproto.OfferID"), mAny("*mesosproto.Filters")). | ||||
| 		Return(mesos.Status_DRIVER_RUNNING, nil) | ||||
|  | ||||
| 	// elect master with mock driver | ||||
| 	driverFactory := ha.DriverFactory(func() (bindings.SchedulerDriver, error) { | ||||
| 		return mockDriver, nil | ||||
| 	}) | ||||
| 	schedulerProcess.Elect(driverFactory) | ||||
| 	elected := schedulerProcess.Elected() | ||||
|  | ||||
| 	// driver will be started | ||||
| 	<-started | ||||
|  | ||||
| 	// tell scheduler to be registered | ||||
| 	testScheduler.Registered( | ||||
| 		mockDriver, | ||||
| 		util.NewFrameworkID("kubernetes-id"), | ||||
| 		util.NewMasterInfo("master-id", (192<<24)+(168<<16)+(0<<8)+1, 5050), | ||||
| 	) | ||||
|  | ||||
| 	// wait for being elected | ||||
| 	<-elected | ||||
|  | ||||
| 	//TODO(jdef) refactor things above here into a test suite setup of some sort | ||||
| 	launchedTasks := lt.Start() | ||||
| 	defer lt.End() | ||||
|  | ||||
| 	// fake new, unscheduled pod | ||||
| 	pod, i := NewTestPod() | ||||
| 	podListWatch.Add(pod, true) // notify watchers | ||||
| 	lt.podsListWatch.Add(pod, true) // notify watchers | ||||
|  | ||||
| 	// wait for failedScheduling event because there is no offer | ||||
| 	assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") | ||||
| 	assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received") | ||||
|  | ||||
| 	// add some matching offer | ||||
| 	offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))} | ||||
| 	testScheduler.ResourceOffers(nil, offers) | ||||
| 	lt.scheduler.ResourceOffers(nil, offers) | ||||
|  | ||||
| 	// first offer is declined because node is not available yet | ||||
| 	lt.apiServer.WaitForNode("some_hostname") | ||||
|  | ||||
| 	// add one more offer | ||||
| 	lt.scheduler.ResourceOffers(nil, offers) | ||||
|  | ||||
| 	// and wait for scheduled pod | ||||
| 	assert.EventWithReason(eventObserver, "scheduled") | ||||
| 	assert.EventWithReason(lt.eventObs, Scheduled) | ||||
| 	select { | ||||
| 	case launchedTask := <-launchedTasks: | ||||
| 		// report back that the task has been staged, and then started by mesos | ||||
| 		testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) | ||||
| 		testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) | ||||
| 		lt.scheduler.StatusUpdate( | ||||
| 			lt.driver, | ||||
| 			newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING), | ||||
| 		) | ||||
|  | ||||
| 		lt.scheduler.StatusUpdate( | ||||
| 			lt.driver, | ||||
| 			newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING), | ||||
| 		) | ||||
|  | ||||
| 		// check that ExecutorInfo.data has the static pod data | ||||
| 		assert.Len(launchedTask.taskInfo.Executor.Data, 3) | ||||
|  | ||||
| 		// report back that the task has been lost | ||||
| 		mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) | ||||
| 		testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST)) | ||||
| 		lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0) | ||||
|  | ||||
| 		lt.scheduler.StatusUpdate( | ||||
| 			lt.driver, | ||||
| 			newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST), | ||||
| 		) | ||||
|  | ||||
| 		// and wait that framework message is sent to executor | ||||
| 		mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1) | ||||
| 		lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1) | ||||
|  | ||||
| 	case <-time.After(5 * time.Second): | ||||
| 	case <-time.After(util.ForeverTestTimeout): | ||||
| 		t.Fatalf("timed out waiting for launchTasks call") | ||||
| 	} | ||||
|  | ||||
| 	offeredNodes := make(map[string]struct{}) | ||||
|  | ||||
| 	// Launch a pod and wait until the scheduler driver is called | ||||
| 	schedulePodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { | ||||
| 		// wait for failedScheduling event because there is no offer | ||||
| 		assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received") | ||||
| 		assert.EventWithReason(lt.eventObs, FailedScheduling, "failedScheduling event not received") | ||||
|  | ||||
| 		// supply a matching offer | ||||
| 		testScheduler.ResourceOffers(mockDriver, offers) | ||||
| 		lt.scheduler.ResourceOffers(lt.driver, offers) | ||||
| 		for _, offer := range offers { | ||||
| 			if _, ok := offeredNodes[offer.GetHostname()]; !ok { | ||||
| 				offeredNodes[offer.GetHostname()] = struct{}{} | ||||
| 				lt.apiServer.WaitForNode(offer.GetHostname()) | ||||
|  | ||||
| 				// reoffer since it must have been declined above | ||||
| 				lt.scheduler.ResourceOffers(lt.driver, []*mesos.Offer{offer}) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// and wait to get scheduled | ||||
| 		assert.EventWithReason(eventObserver, "scheduled") | ||||
| 		assert.EventWithReason(lt.eventObs, Scheduled) | ||||
|  | ||||
| 		// wait for driver.launchTasks call | ||||
| 		select { | ||||
| @@ -543,14 +688,15 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 			} | ||||
| 			t.Fatalf("unknown offer used to start a pod") | ||||
| 			return nil, nil, nil | ||||
| 		case <-time.After(5 * time.Second): | ||||
| 		case <-time.After(util.ForeverTestTimeout): | ||||
| 			t.Fatal("timed out waiting for launchTasks") | ||||
| 			return nil, nil, nil | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Launch a pod and wait until the scheduler driver is called | ||||
| 	launchPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) { | ||||
| 		podListWatch.Add(pod, true) | ||||
| 		lt.podsListWatch.Add(pod, true) | ||||
| 		return schedulePodWithOffers(pod, offers) | ||||
| 	} | ||||
|  | ||||
| @@ -560,8 +706,15 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 		pod, launchedTask, offer := launchPodWithOffers(pod, offers) | ||||
| 		if pod != nil { | ||||
| 			// report back status | ||||
| 			testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING)) | ||||
| 			testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING)) | ||||
| 			lt.scheduler.StatusUpdate( | ||||
| 				lt.driver, | ||||
| 				newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING), | ||||
| 			) | ||||
| 			lt.scheduler.StatusUpdate( | ||||
| 				lt.driver, | ||||
| 				newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING), | ||||
| 			) | ||||
|  | ||||
| 			return pod, launchedTask, offer | ||||
| 		} | ||||
|  | ||||
| @@ -577,23 +730,28 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 	// start another pod | ||||
| 	pod, launchedTask, _ := startTestPod() | ||||
|  | ||||
| 	// mock drvier.KillTask, should be invoked when a pod is deleted | ||||
| 	mockDriver.On("KillTask", mAny("*mesosproto.TaskID")).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) { | ||||
| 	// mock driver.KillTask, should be invoked when a pod is deleted | ||||
| 	lt.driver.On("KillTask", | ||||
| 		mock.AnythingOfType("*mesosproto.TaskID"), | ||||
| 	).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) { | ||||
| 		killedTaskId := *(args.Get(0).(*mesos.TaskID)) | ||||
| 		assert.Equal(*launchedTask.taskInfo.TaskId, killedTaskId, "expected same TaskID as during launch") | ||||
| 	}) | ||||
| 	killTaskCalled := mockDriver.Upon() | ||||
| 	killTaskCalled := lt.driver.Upon() | ||||
|  | ||||
| 	// stop it again via the apiserver mock | ||||
| 	podListWatch.Delete(pod, true) // notify watchers | ||||
| 	lt.podsListWatch.Delete(pod, true) // notify watchers | ||||
|  | ||||
| 	// and wait for the driver killTask call with the correct TaskId | ||||
| 	select { | ||||
| 	case <-killTaskCalled: | ||||
| 		// report back that the task is finished | ||||
| 		testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED)) | ||||
| 		lt.scheduler.StatusUpdate( | ||||
| 			lt.driver, | ||||
| 			newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED), | ||||
| 		) | ||||
|  | ||||
| 	case <-time.After(5 * time.Second): | ||||
| 	case <-time.After(util.ForeverTestTimeout): | ||||
| 		t.Fatal("timed out waiting for KillTask") | ||||
| 	} | ||||
|  | ||||
| @@ -613,8 +771,8 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 	assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue()) | ||||
| 	assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname) | ||||
|  | ||||
| 	testScheduler.OfferRescinded(mockDriver, offers[0].Id) | ||||
| 	testScheduler.OfferRescinded(mockDriver, offers[2].Id) | ||||
| 	lt.scheduler.OfferRescinded(lt.driver, offers[0].Id) | ||||
| 	lt.scheduler.OfferRescinded(lt.driver, offers[2].Id) | ||||
|  | ||||
| 	// start pods: | ||||
| 	// - which are failing while binding, | ||||
| @@ -622,15 +780,15 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 	// - with different states on the apiserver | ||||
|  | ||||
| 	failPodFromExecutor := func(task *mesos.TaskInfo) { | ||||
| 		beforePodLookups := testApiServer.Stats(pod.Name) | ||||
| 		beforePodLookups := lt.apiServer.Stats(pod.Name) | ||||
| 		status := newTaskStatusForTask(task, mesos.TaskState_TASK_FAILED) | ||||
| 		message := messages.CreateBindingFailure | ||||
| 		status.Message = &message | ||||
| 		testScheduler.StatusUpdate(mockDriver, status) | ||||
| 		lt.scheduler.StatusUpdate(lt.driver, status) | ||||
|  | ||||
| 		// wait until pod is looked up at the apiserver | ||||
| 		assertext.EventuallyTrue(t, time.Second, func() bool { | ||||
| 			return testApiServer.Stats(pod.Name) == beforePodLookups+1 | ||||
| 		assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { | ||||
| 			return lt.apiServer.Stats(pod.Name) == beforePodLookups+1 | ||||
| 		}, "expect that reconcileTask will access apiserver for pod %v", pod.Name) | ||||
| 	} | ||||
|  | ||||
| @@ -643,12 +801,12 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 	// 1. with pod deleted from the apiserver | ||||
| 	//    expected: pod is removed from internal task registry | ||||
| 	pod, launchedTask, _ = launchTestPod() | ||||
| 	podListWatch.Delete(pod, false) // not notifying the watchers | ||||
| 	lt.podsListWatch.Delete(pod, false) // not notifying the watchers | ||||
| 	failPodFromExecutor(launchedTask.taskInfo) | ||||
|  | ||||
| 	podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) | ||||
| 	assertext.EventuallyTrue(t, time.Second, func() bool { | ||||
| 		t, _ := p.api.tasks().ForPod(podKey) | ||||
| 	assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { | ||||
| 		t, _ := lt.plugin.api.tasks().ForPod(podKey) | ||||
| 		return t == nil | ||||
| 	}) | ||||
|  | ||||
| @@ -667,7 +825,7 @@ func TestPlugin_LifeCycle(t *testing.T) { | ||||
| 		meta.BindingHostKey: *usedOffer.Hostname, | ||||
| 	} | ||||
| 	pod.Spec.NodeName = *usedOffer.Hostname | ||||
| 	podListWatch.Modify(pod, true) // notifying the watchers | ||||
| 	lt.podsListWatch.Modify(pod, true) // notifying the watchers | ||||
| 	time.Sleep(time.Second / 2) | ||||
| 	failPodFromExecutor(launchedTask.taskInfo) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Sergiusz Urbaniak
					Sergiusz Urbaniak