Revert "Merge pull request #15976 from mesosphere/sur-k8sm-475-error-checking"
The given merge will be rebased manually and appended to the scheduler refactoring. This reverts commit8d923afe23, reversing changes made tod7458ddd4c.
This commit is contained in:
		@@ -278,18 +278,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
 | 
				
			|||||||
			log.Infof("aborting Schedule, pod has been deleted %+v", pod)
 | 
								log.Infof("aborting Schedule, pod has been deleted %+v", pod)
 | 
				
			||||||
			return "", noSuchPodErr
 | 
								return "", noSuchPodErr
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							return k.doSchedule(k.api.tasks().Register(k.api.createPodTask(ctx, pod)))
 | 
				
			||||||
		task, err := k.api.createPodTask(ctx, pod)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return "", err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		task, err = k.api.tasks().Register(task)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return "", err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		return k.doSchedule(task)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	//TODO(jdef) it's possible that the pod state has diverged from what
 | 
						//TODO(jdef) it's possible that the pod state has diverged from what
 | 
				
			||||||
	//we knew previously, we should probably update the task.Pod state here
 | 
						//we knew previously, we should probably update the task.Pod state here
 | 
				
			||||||
@@ -305,7 +294,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
 | 
				
			|||||||
			// but we're going to let someone else handle it, probably the mesos task error handler
 | 
								// but we're going to let someone else handle it, probably the mesos task error handler
 | 
				
			||||||
			return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
 | 
								return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			return k.doSchedule(task)
 | 
								return k.doSchedule(task, nil)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
@@ -313,18 +302,16 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// doSchedule schedules the given task and returns the machine the task is scheduled on
 | 
					// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
 | 
				
			||||||
// or an error if the scheduling failed.
 | 
					func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
 | 
				
			||||||
func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) {
 | 
					 | 
				
			||||||
	var offer offers.Perishable
 | 
						var offer offers.Perishable
 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if task.HasAcceptedOffer() {
 | 
						if task.HasAcceptedOffer() {
 | 
				
			||||||
		// verify that the offer is still on the table
 | 
							// verify that the offer is still on the table
 | 
				
			||||||
		var ok bool
 | 
							offerId := task.GetOfferId()
 | 
				
			||||||
		offer, ok = k.api.offers().Get(task.GetOfferId())
 | 
							if offer, ok := k.api.offers().Get(offerId); ok && !offer.HasExpired() {
 | 
				
			||||||
 | 
								// skip tasks that have already have assigned offers
 | 
				
			||||||
		if !ok || offer.HasExpired() {
 | 
								offer = task.Offer
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
			task.Offer.Release()
 | 
								task.Offer.Release()
 | 
				
			||||||
			task.Reset()
 | 
								task.Reset()
 | 
				
			||||||
			if err = k.api.tasks().Update(task); err != nil {
 | 
								if err = k.api.tasks().Update(task); err != nil {
 | 
				
			||||||
@@ -332,46 +319,36 @@ func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if err == nil && offer == nil {
 | 
				
			||||||
	if offer == nil {
 | 
					 | 
				
			||||||
		offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
 | 
							offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return "", err
 | 
							return "", err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	details := offer.Details()
 | 
						details := offer.Details()
 | 
				
			||||||
	if details == nil {
 | 
						if details == nil {
 | 
				
			||||||
		return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
 | 
							return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	slaveId := details.GetSlaveId().GetValue()
 | 
						slaveId := details.GetSlaveId().GetValue()
 | 
				
			||||||
	slaveHostName := k.api.slaveHostNameFor(slaveId)
 | 
						if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" {
 | 
				
			||||||
	if slaveHostName == "" {
 | 
					 | 
				
			||||||
		// not much sense in Release()ing the offer here since its owner died
 | 
							// not much sense in Release()ing the offer here since its owner died
 | 
				
			||||||
		offer.Release()
 | 
							offer.Release()
 | 
				
			||||||
		k.api.offers().Invalidate(details.Id.GetValue())
 | 
							k.api.offers().Invalidate(details.Id.GetValue())
 | 
				
			||||||
		return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
 | 
							return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
 | 
				
			||||||
	}
 | 
						} else {
 | 
				
			||||||
 | 
							if task.Offer != nil && task.Offer != offer {
 | 
				
			||||||
 | 
								return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if task.Offer != nil && task.Offer != offer {
 | 
							task.Offer = offer
 | 
				
			||||||
		return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
 | 
							k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	task.Offer = offer
 | 
							if err := k.api.tasks().Update(task); err != nil {
 | 
				
			||||||
	if err := k.api.algorithm().Procurement()(task, details); err != nil {
 | 
								offer.Release()
 | 
				
			||||||
		offer.Release()
 | 
								return "", err
 | 
				
			||||||
		task.Reset()
 | 
							}
 | 
				
			||||||
		return "", err
 | 
							return slaveHostName, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := k.api.tasks().Update(task); err != nil {
 | 
					 | 
				
			||||||
		offer.Release()
 | 
					 | 
				
			||||||
		return "", err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return slaveHostName, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type queuer struct {
 | 
					type queuer struct {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -880,17 +880,11 @@ func TestDeleteOne_PendingPod(t *testing.T) {
 | 
				
			|||||||
			UID:       "foo0",
 | 
								UID:       "foo0",
 | 
				
			||||||
			Namespace: api.NamespaceDefault,
 | 
								Namespace: api.NamespaceDefault,
 | 
				
			||||||
		}}}
 | 
							}}}
 | 
				
			||||||
 | 
						_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
 | 
				
			||||||
	task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("failed to create task: %v", err)
 | 
							t.Fatalf("failed to create task: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	_, err = reg.Register(task)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("failed to register task: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// preconditions
 | 
						// preconditions
 | 
				
			||||||
	qr := newQueuer(nil)
 | 
						qr := newQueuer(nil)
 | 
				
			||||||
	qr.podQueue.Add(pod, queue.ReplaceExisting)
 | 
						qr.podQueue.Add(pod, queue.ReplaceExisting)
 | 
				
			||||||
@@ -923,13 +917,7 @@ func TestDeleteOne_Running(t *testing.T) {
 | 
				
			|||||||
			UID:       "foo0",
 | 
								UID:       "foo0",
 | 
				
			||||||
			Namespace: api.NamespaceDefault,
 | 
								Namespace: api.NamespaceDefault,
 | 
				
			||||||
		}}}
 | 
							}}}
 | 
				
			||||||
 | 
						task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))
 | 
				
			||||||
	task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		t.Fatalf("unexpected error: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	task, err = reg.Register(task)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("unexpected error: %v", err)
 | 
							t.Fatalf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -41,7 +41,7 @@ const (
 | 
				
			|||||||
type Registry interface {
 | 
					type Registry interface {
 | 
				
			||||||
	// register the specified task with this registry, as long as the current error
 | 
						// register the specified task with this registry, as long as the current error
 | 
				
			||||||
	// condition is nil. if no errors occur then return a copy of the registered task.
 | 
						// condition is nil. if no errors occur then return a copy of the registered task.
 | 
				
			||||||
	Register(*T) (*T, error)
 | 
						Register(*T, error) (*T, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// unregister the specified task from this registry
 | 
						// unregister the specified task from this registry
 | 
				
			||||||
	Unregister(*T)
 | 
						Unregister(*T)
 | 
				
			||||||
@@ -103,19 +103,20 @@ func (k *inMemoryRegistry) ForPod(podID string) (task *T, currentState StateType
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// registers a pod task unless the spec'd error is not nil
 | 
					// registers a pod task unless the spec'd error is not nil
 | 
				
			||||||
func (k *inMemoryRegistry) Register(task *T) (*T, error) {
 | 
					func (k *inMemoryRegistry) Register(task *T, err error) (*T, error) {
 | 
				
			||||||
	k.rw.Lock()
 | 
						if err == nil {
 | 
				
			||||||
	defer k.rw.Unlock()
 | 
							k.rw.Lock()
 | 
				
			||||||
	if _, found := k.podToTask[task.podKey]; found {
 | 
							defer k.rw.Unlock()
 | 
				
			||||||
		return nil, fmt.Errorf("task already registered for pod key %q", task.podKey)
 | 
							if _, found := k.podToTask[task.podKey]; found {
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("task already registered for pod key %q", task.podKey)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if _, found := k.taskRegistry[task.ID]; found {
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("task already registered for id %q", task.ID)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							k.podToTask[task.podKey] = task.ID
 | 
				
			||||||
 | 
							k.taskRegistry[task.ID] = task
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if _, found := k.taskRegistry[task.ID]; found {
 | 
						return task.Clone(), err
 | 
				
			||||||
		return nil, fmt.Errorf("task already registered for id %q", task.ID)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	k.podToTask[task.podKey] = task.ID
 | 
					 | 
				
			||||||
	k.taskRegistry[task.ID] = task
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return task.Clone(), nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// updates internal task state. updates are limited to Spec, Flags, and Offer for
 | 
					// updates internal task state. updates are limited to Spec, Flags, and Offer for
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// add a task
 | 
						// add a task
 | 
				
			||||||
	a, _ := fakePodTask("a")
 | 
						a, _ := fakePodTask("a")
 | 
				
			||||||
	a_clone, err := registry.Register(a)
 | 
						a_clone, err := registry.Register(a, nil)
 | 
				
			||||||
	assert.NoError(err)
 | 
						assert.NoError(err)
 | 
				
			||||||
	assert.Equal(a_clone.ID, a.ID)
 | 
						assert.Equal(a_clone.ID, a.ID)
 | 
				
			||||||
	assert.Equal(a_clone.podKey, a.podKey)
 | 
						assert.Equal(a_clone.podKey, a.podKey)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// add another task
 | 
						// add another task
 | 
				
			||||||
	b, _ := fakePodTask("b")
 | 
						b, _ := fakePodTask("b")
 | 
				
			||||||
	b_clone, err := registry.Register(b)
 | 
						b_clone, err := registry.Register(b, nil)
 | 
				
			||||||
	assert.NoError(err)
 | 
						assert.NoError(err)
 | 
				
			||||||
	assert.Equal(b_clone.ID, b.ID)
 | 
						assert.Equal(b_clone.ID, b.ID)
 | 
				
			||||||
	assert.Equal(b_clone.podKey, b.podKey)
 | 
						assert.Equal(b_clone.podKey, b.podKey)
 | 
				
			||||||
@@ -79,21 +79,21 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
 | 
				
			|||||||
	assert.Nil(task)
 | 
						assert.Nil(task)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// re-add a task
 | 
						// re-add a task
 | 
				
			||||||
	a_clone, err = registry.Register(a)
 | 
						a_clone, err = registry.Register(a, nil)
 | 
				
			||||||
	assert.Error(err)
 | 
						assert.Error(err)
 | 
				
			||||||
	assert.Nil(a_clone)
 | 
						assert.Nil(a_clone)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// re-add a task with another podKey, but same task id
 | 
						// re-add a task with another podKey, but same task id
 | 
				
			||||||
	another_a := a.Clone()
 | 
						another_a := a.Clone()
 | 
				
			||||||
	another_a.podKey = "another-pod"
 | 
						another_a.podKey = "another-pod"
 | 
				
			||||||
	another_a_clone, err := registry.Register(another_a)
 | 
						another_a_clone, err := registry.Register(another_a, nil)
 | 
				
			||||||
	assert.Error(err)
 | 
						assert.Error(err)
 | 
				
			||||||
	assert.Nil(another_a_clone)
 | 
						assert.Nil(another_a_clone)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// re-add a task with another task ID, but same podKey
 | 
						// re-add a task with another task ID, but same podKey
 | 
				
			||||||
	another_b := b.Clone()
 | 
						another_b := b.Clone()
 | 
				
			||||||
	another_b.ID = "another-task-id"
 | 
						another_b.ID = "another-task-id"
 | 
				
			||||||
	another_b_clone, err := registry.Register(another_b)
 | 
						another_b_clone, err := registry.Register(another_b, nil)
 | 
				
			||||||
	assert.Error(err)
 | 
						assert.Error(err)
 | 
				
			||||||
	assert.Nil(another_b_clone)
 | 
						assert.Nil(another_b_clone)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// add a task
 | 
						// add a task
 | 
				
			||||||
	a, _ := fakePodTask("a")
 | 
						a, _ := fakePodTask("a")
 | 
				
			||||||
	a_clone, err := registry.Register(a)
 | 
						a_clone, err := registry.Register(a, nil)
 | 
				
			||||||
	assert.NoError(err)
 | 
						assert.NoError(err)
 | 
				
			||||||
	assert.Equal(a.State, a_clone.State)
 | 
						assert.Equal(a.State, a_clone.State)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
 | 
				
			|||||||
	// create registry
 | 
						// create registry
 | 
				
			||||||
	registry := NewInMemoryRegistry()
 | 
						registry := NewInMemoryRegistry()
 | 
				
			||||||
	a, _ := fakePodTask("a")
 | 
						a, _ := fakePodTask("a")
 | 
				
			||||||
	registry.Register(a.Clone()) // here clone a because we change it below
 | 
						registry.Register(a.Clone(), nil) // here clone a because we change it below
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// state changes are ignored
 | 
						// state changes are ignored
 | 
				
			||||||
	a.State = StateRunning
 | 
						a.State = StateRunning
 | 
				
			||||||
@@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	registry := NewInMemoryRegistry()
 | 
						registry := NewInMemoryRegistry()
 | 
				
			||||||
	a, _ := fakePodTask("a")
 | 
						a, _ := fakePodTask("a")
 | 
				
			||||||
	a, _ = registry.Register(a)
 | 
						a, _ = registry.Register(a, nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// initial pending state
 | 
						// initial pending state
 | 
				
			||||||
	assert.Equal(a.State, StatePending)
 | 
						assert.Equal(a.State, StatePending)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -502,7 +502,7 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler
 | 
				
			|||||||
	} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
 | 
						} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
 | 
				
			||||||
		if t, ok, err := podtask.RecoverFrom(*pod); ok {
 | 
							if t, ok, err := podtask.RecoverFrom(*pod); ok {
 | 
				
			||||||
			log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
 | 
								log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
 | 
				
			||||||
			_, err := k.taskRegistry.Register(t)
 | 
								_, err := k.taskRegistry.Register(t, nil)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				// someone beat us to it?!
 | 
									// someone beat us to it?!
 | 
				
			||||||
				log.Warningf("failed to register recovered task: %v", err)
 | 
									log.Warningf("failed to register recovered task: %v", err)
 | 
				
			||||||
@@ -912,7 +912,7 @@ func (ks *KubernetesScheduler) recoverTasks() error {
 | 
				
			|||||||
				log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
 | 
									log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		} else if ok {
 | 
							} else if ok {
 | 
				
			||||||
			ks.taskRegistry.Register(t)
 | 
								ks.taskRegistry.Register(t, nil)
 | 
				
			||||||
			recoverSlave(t)
 | 
								recoverSlave(t)
 | 
				
			||||||
			log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
 | 
								log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user