Use BindingHostKey annotation to detect scheduled pods in k8sm-scheduler

Before NodeName in the pod spec was used. Hence, pods with a fixed, pre-set
NodeName were never scheduled by the k8sm-scheduler, leading e.g. to a failing
e2e intra-pod test.

Fixes mesosphere/kubernetes-mesos#388
This commit is contained in:
Dr. Stefan Schimanski
2015-07-13 16:34:43 +02:00
parent b0d31fb794
commit f59b5f503b
7 changed files with 199 additions and 84 deletions

View File

@@ -437,25 +437,6 @@ func (k *KubernetesExecutor) attemptSuicide(driver bindings.ExecutorDriver, abor
// async continuation of LaunchTask
func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId string, pod *api.Pod) {
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go
binding := &api.Binding{
ObjectMeta: api.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
Annotations: make(map[string]string),
},
Target: api.ObjectReference{
Kind: "Node",
Name: pod.Annotations[meta.BindingHostKey],
},
}
// forward the annotations that the scheduler wants to apply
for k, v := range pod.Annotations {
binding.Annotations[k] = v
}
deleteTask := func() {
k.lock.Lock()
defer k.lock.Unlock()
@@ -463,17 +444,57 @@ func (k *KubernetesExecutor) launchTask(driver bindings.ExecutorDriver, taskId s
k.resetSuicideWatch(driver)
}
log.Infof("Binding '%v/%v' to '%v' with annotations %+v...", pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
// TODO(k8s): use Pods interface for binding once clusters are upgraded
// return b.Pods(binding.Namespace).Bind(binding)
err := k.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
if err != nil {
deleteTask()
k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED,
messages.CreateBindingFailure))
return
if pod.Spec.NodeName == "" {
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/scheduler.go
binding := &api.Binding{
ObjectMeta: api.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
Annotations: make(map[string]string),
},
Target: api.ObjectReference{
Kind: "Node",
Name: pod.Annotations[meta.BindingHostKey],
},
}
// forward the annotations that the scheduler wants to apply
for k, v := range pod.Annotations {
binding.Annotations[k] = v
}
// create binding on apiserver
log.Infof("Binding '%v/%v' to '%v' with annotations %+v...", pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
err := k.client.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
if err != nil {
deleteTask()
k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED,
messages.CreateBindingFailure))
return
}
} else {
// post annotations update to apiserver
patch := struct {
Metadata struct {
Annotations map[string]string `json:"annotations"`
} `json:"metadata"`
}{}
patch.Metadata.Annotations = pod.Annotations
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching annotations %v of pod %v/%v: %v", pod.Annotations, pod.Namespace, pod.Name, string(patchJson))
err := k.client.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error()
if err != nil {
log.Errorf("Error updating annotations of ready-to-launch pod %v/%v: %v", pod.Namespace, pod.Name, err)
deleteTask()
k.sendStatus(driver, newStatus(mutil.NewTaskID(taskId), mesos.TaskState_TASK_FAILED,
messages.AnnotationUpdateFailure))
return
}
}
podFullName := container.GetPodFullName(pod)
// allow a recently failed-over scheduler the chance to recover the task/pod binding:

View File

@@ -29,4 +29,6 @@ const (
UnmarshalTaskDataFailure = "unmarshal-task-data-failure"
TaskLostAck = "task-lost-ack" // executor acknowledgement of forwarded TASK_LOST framework message
Kamikaze = "kamikaze"
WrongSlaveFailure = "pod-for-wrong-slave-failure"
AnnotationUpdateFailure = "annotation-update-failure"
)

View File

@@ -18,7 +18,10 @@ package meta
// kubernetes api object annotations
const (
BindingHostKey = "k8s.mesosphere.io/bindingHost"
// the BindingHostKey pod annotation marks a pod as being assigned to a Mesos
// slave. It is already or will be launched on the slave as a task.
BindingHostKey = "k8s.mesosphere.io/bindingHost"
TaskIdKey = "k8s.mesosphere.io/taskId"
SlaveIdKey = "k8s.mesosphere.io/slaveId"
OfferIdKey = "k8s.mesosphere.io/offerId"

View File

@@ -151,6 +151,11 @@ func (b *binder) rollback(task *podtask.T, err error) error {
}
// assumes that: caller has acquired scheduler lock and that the task is still pending
//
// bind does not actually do the binding itself, but launches the pod as a Mesos task. The
// kubernetes executor on the slave will finally do the binding. This is different from the
// upstream scheduler in the sense that the upstream scheduler does the binding and the
// kubelet will notice that and launches the pod.
func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (err error) {
// sanity check: ensure that the task hasAcceptedOffer(), it's possible that between
// Schedule() and now that the offer for this task was rescinded or invalidated.
@@ -193,16 +198,7 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
oemCt := pod.Spec.Containers
pod.Spec.Containers = append([]api.Container{}, oemCt...) // (shallow) clone before mod
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
} else {
oemAnn := pod.Annotations
pod.Annotations = make(map[string]string)
for k, v := range oemAnn {
pod.Annotations[k] = v
}
}
pod.Annotations[annotation.BindingHostKey] = machine
annotateForExecutorOnSlave(&pod, machine)
task.SaveRecoveryInfo(pod.Annotations)
for _, entry := range task.Spec.PortMap {
@@ -237,6 +233,31 @@ type kubeScheduler struct {
defaultContainerMemLimit mresource.MegaBytes
}
// annotatedForExecutor checks whether a pod is assigned to a Mesos slave, and
// possibly already launched. It can, but doesn't have to be scheduled already
// in the sense of kubernetes, i.e. the NodeName field might still be empty.
func annotatedForExecutor(pod *api.Pod) bool {
_, ok := pod.ObjectMeta.Annotations[annotation.BindingHostKey]
return ok
}
// annotateForExecutorOnSlave sets the BindingHostKey annotation which
// marks the pod to be processed by the scheduler and launched as a Mesos
// task. The executor on the slave will to the final binding to finish the
// scheduling in the kubernetes sense.
func annotateForExecutorOnSlave(pod *api.Pod, slave string) {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
} else {
oemAnn := pod.Annotations
pod.Annotations = make(map[string]string)
for k, v := range oemAnn {
pod.Annotations[k] = v
}
}
pod.Annotations[annotation.BindingHostKey] = slave
}
// Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's name and error (if there's any).
func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.MinionLister) (string, error) {
@@ -441,7 +462,7 @@ func (q *queuer) Run(done <-chan struct{}) {
}
pod := p.(*Pod)
if pod.Spec.NodeName != "" {
if annotatedForExecutor(pod.Pod) {
log.V(3).Infof("dequeuing pod for scheduling: %v", pod.Pod.Name)
q.dequeue(pod.GetUID())
} else {
@@ -490,7 +511,7 @@ func (q *queuer) yield() *api.Pod {
log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err)
} else if !q.podUpdates.Poll(podName, queue.POP_EVENT) {
log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod)
} else if pod.Spec.NodeName != "" {
} else if annotatedForExecutor(pod) {
// should never happen if enqueuePods is filtering properly
log.Warningf("yield popped an already-scheduled pod, skipping: %+v", pod)
} else {
@@ -798,7 +819,7 @@ func (s *schedulingPlugin) reconcilePod(oldPod api.Pod) {
return
}
if oldPod.Spec.NodeName != pod.Spec.NodeName {
if pod.Spec.NodeName == "" {
if annotatedForExecutor(pod) {
// pod is unscheduled.
// it's possible that we dropped the pod in the scheduler error handler
// because of task misalignment with the pod (task.Has(podtask.Launched) == true)

View File

@@ -37,6 +37,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/queue"
schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
log "github.com/golang/glog"
@@ -189,8 +190,11 @@ func (lw *MockPodsListWatch) Delete(pod *api.Pod, notify bool) {
}
// Create a pod with a given index, requiring one port
func NewTestPod(i int) *api.Pod {
name := fmt.Sprintf("pod%d", i)
var currentPodNum int = 0
func NewTestPod() (*api.Pod, int) {
currentPodNum = currentPodNum + 1
name := fmt.Sprintf("pod%d", currentPodNum)
return &api.Pod{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{
@@ -203,7 +207,7 @@ func NewTestPod(i int) *api.Pod {
{
Ports: []api.ContainerPort{
{
ContainerPort: 8000 + i,
ContainerPort: 8000 + currentPodNum,
Protocol: api.ProtocolTCP,
},
},
@@ -211,7 +215,7 @@ func NewTestPod(i int) *api.Pod {
},
},
Status: api.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
PodIP: fmt.Sprintf("1.2.3.%d", 4+currentPodNum),
Conditions: []api.PodCondition{
{
Type: api.PodReady,
@@ -219,12 +223,12 @@ func NewTestPod(i int) *api.Pod {
},
},
},
}
}, currentPodNum
}
// Offering some cpus and memory and the 8000-9000 port range
func NewTestOffer(i int) *mesos.Offer {
hostname := fmt.Sprintf("h%d", i)
func NewTestOffer(id string) *mesos.Offer {
hostname := "some_hostname"
cpus := util.NewScalarResource("cpus", 3.75)
mem := util.NewScalarResource("mem", 940)
var port8000 uint64 = 8000
@@ -232,7 +236,7 @@ func NewTestOffer(i int) *mesos.Offer {
ports8000to9000 := mesos.Value_Range{Begin: &port8000, End: &port9000}
ports := util.NewRangesResource("ports", []*mesos.Value_Range{&ports8000to9000})
return &mesos.Offer{
Id: util.NewOfferID(fmt.Sprintf("offer%d", i)),
Id: util.NewOfferID(id),
Hostname: &hostname,
SlaveId: util.NewSlaveID(hostname),
Resources: []*mesos.Resource{cpus, mem, ports},
@@ -435,11 +439,20 @@ func TestPlugin_LifeCycle(t *testing.T) {
mockDriver.On("SendFrameworkMessage", mAny("*mesosproto.ExecutorID"), mAny("*mesosproto.SlaveID"), mAny("string")).
Return(mesos.Status_DRIVER_RUNNING, nil)
launchedTasks := make(chan *mesos.TaskInfo, 1)
type LaunchedTask struct {
offerId mesos.OfferID
taskInfo *mesos.TaskInfo
}
launchedTasks := make(chan LaunchedTask, 1)
launchTasksCalledFunc := func(args mock.Arguments) {
offerIDs := args.Get(0).([]*mesos.OfferID)
taskInfos := args.Get(1).([]*mesos.TaskInfo)
assert.Equal(1, len(offerIDs))
assert.Equal(1, len(taskInfos))
launchedTasks <- taskInfos[0]
launchedTasks <- LaunchedTask{
offerId: *offerIDs[0],
taskInfo: taskInfos[0],
}
}
mockDriver.On("LaunchTasks", mAny("[]*mesosproto.OfferID"), mAny("[]*mesosproto.TaskInfo"), mAny("*mesosproto.Filters")).
Return(mesos.Status_DRIVER_RUNNING, nil).Run(launchTasksCalledFunc)
@@ -469,30 +482,30 @@ func TestPlugin_LifeCycle(t *testing.T) {
//TODO(jdef) refactor things above here into a test suite setup of some sort
// fake new, unscheduled pod
pod1 := NewTestPod(1)
podListWatch.Add(pod1, true) // notify watchers
pod, i := NewTestPod()
podListWatch.Add(pod, true) // notify watchers
// wait for failedScheduling event because there is no offer
assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received")
// add some matching offer
offers1 := []*mesos.Offer{NewTestOffer(1)}
testScheduler.ResourceOffers(nil, offers1)
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
testScheduler.ResourceOffers(nil, offers)
// and wait for scheduled pod
assert.EventWithReason(eventObserver, "scheduled")
select {
case launchedTask := <-launchedTasks:
// report back that the task has been staged, and then started by mesos
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_RUNNING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
// check that ExecutorInfo.data has the static pod data
assert.Len(launchedTask.Executor.Data, 3)
assert.Len(launchedTask.taskInfo.Executor.Data, 3)
// report back that the task has been lost
mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0)
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_LOST))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_LOST))
// and wait that framework message is sent to executor
mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1)
@@ -501,20 +514,15 @@ func TestPlugin_LifeCycle(t *testing.T) {
t.Fatalf("timed out waiting for launchTasks call")
}
// start another pod
podNum := 2
startPod := func() (*api.Pod, *mesos.TaskInfo, *mesos.Offer) {
podNum = podNum + 1
// create pod
pod := NewTestPod(podNum)
podListWatch.Add(pod, true) // notify watchers
// define generic pod startup
startPodWithOffers := func(pod *api.Pod, offers []*mesos.Offer) (*api.Pod, *LaunchedTask, *mesos.Offer) {
// notify watchers of new pod
podListWatch.Add(pod, true)
// wait for failedScheduling event because there is no offer
assert.EventWithReason(eventObserver, "failedScheduling", "failedScheduling event not received")
// supply a matching offer
offers := []*mesos.Offer{NewTestOffer(podNum)}
testScheduler.ResourceOffers(mockDriver, offers)
// and wait to get scheduled
@@ -523,9 +531,15 @@ func TestPlugin_LifeCycle(t *testing.T) {
// wait for driver.launchTasks call
select {
case launchedTask := <-launchedTasks:
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_RUNNING))
return pod, launchedTask, offers[0]
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_RUNNING))
for _, offer := range offers {
if offer.Id.GetValue() == launchedTask.offerId.GetValue() {
return pod, &launchedTask, offer
}
}
t.Fatalf("unknown offer used to start a pod")
return nil, nil, nil
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for launchTasks")
@@ -533,12 +547,19 @@ func TestPlugin_LifeCycle(t *testing.T) {
}
}
pod, launchedTask, _ := startPod()
startTestPod := func() (*api.Pod, *LaunchedTask, *mesos.Offer) {
pod, i := NewTestPod()
offers := []*mesos.Offer{NewTestOffer(fmt.Sprintf("offer%d", i))}
return startPodWithOffers(pod, offers)
}
// start another pod
pod, launchedTask, _ := startTestPod()
// mock drvier.KillTask, should be invoked when a pod is deleted
mockDriver.On("KillTask", mAny("*mesosproto.TaskID")).Return(mesos.Status_DRIVER_RUNNING, nil).Run(func(args mock.Arguments) {
killedTaskId := *(args.Get(0).(*mesos.TaskID))
assert.Equal(*launchedTask.TaskId, killedTaskId, "expected same TaskID as during launch")
assert.Equal(*launchedTask.taskInfo.TaskId, killedTaskId, "expected same TaskID as during launch")
})
killTaskCalled := mockDriver.Upon()
@@ -549,12 +570,31 @@ func TestPlugin_LifeCycle(t *testing.T) {
select {
case <-killTaskCalled:
// report back that the task is finished
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_FINISHED))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED))
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for KillTask")
}
// start a pod with on a given NodeName and check that it is scheduled to the right host
pod, i = NewTestPod()
pod.Spec.NodeName = "hostname1"
offers = []*mesos.Offer{}
for j := 0; j < 3; j++ {
offer := NewTestOffer(fmt.Sprintf("offer%d_%d", i, j))
hostname := fmt.Sprintf("hostname%d", j)
offer.Hostname = &hostname
offers = append(offers, offer)
}
_, _, usedOffer := startPodWithOffers(pod, offers)
assert.Equal(offers[1].Id.GetValue(), usedOffer.Id.GetValue())
assert.Equal(pod.Spec.NodeName, *usedOffer.Hostname)
testScheduler.OfferRescinded(mockDriver, offers[0].Id)
testScheduler.OfferRescinded(mockDriver, offers[2].Id)
// start pods:
// - which are failing while binding,
// - leading to reconciliation
@@ -574,26 +614,30 @@ func TestPlugin_LifeCycle(t *testing.T) {
}
// 1. with pod deleted from the apiserver
pod, launchedTask, _ = startPod()
pod, launchedTask, _ = startTestPod()
podListWatch.Delete(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask)
failPodFromExecutor(launchedTask.taskInfo)
// 2. with pod still on the apiserver, not bound
pod, launchedTask, _ = startPod()
failPodFromExecutor(launchedTask)
pod, launchedTask, _ = startTestPod()
failPodFromExecutor(launchedTask.taskInfo)
// 3. with pod still on the apiserver, bound i.e. host!=""
pod, launchedTask, usedOffer := startPod()
pod.Spec.NodeName = *usedOffer.Hostname
pod, launchedTask, usedOffer = startTestPod()
pod.Annotations = map[string]string{
meta.BindingHostKey: *usedOffer.Hostname,
}
podListWatch.Modify(pod, false) // not notifying the watchers
failPodFromExecutor(launchedTask)
failPodFromExecutor(launchedTask.taskInfo)
// 4. with pod still on the apiserver, bound i.e. host!="", notified via ListWatch
pod, launchedTask, usedOffer = startPod()
pod.Spec.NodeName = *usedOffer.Hostname
pod, launchedTask, usedOffer = startTestPod()
pod.Annotations = map[string]string{
meta.BindingHostKey: *usedOffer.Hostname,
}
podListWatch.Modify(pod, true) // notifying the watchers
time.Sleep(time.Second / 2)
failPodFromExecutor(launchedTask)
failPodFromExecutor(launchedTask.taskInfo)
}
func TestDeleteOne_NonexistentPod(t *testing.T) {

View File

@@ -213,6 +213,11 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool {
return false
}
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
}
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)