Merge pull request #15976 from mesosphere/sur-k8sm-475-error-checking
Rebased previously reverted merge, just before this scheduler refactoring. Auto commit by PR queue bot
This commit is contained in:

committed by
Dr. Stefan Schimanski

parent
d7964de230
commit
e71f43de93
@@ -79,12 +79,19 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
|
||||
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
|
||||
return "", errors.NoSuchPodErr
|
||||
}
|
||||
|
||||
podTask, err := podtask.New(ctx, "", pod)
|
||||
if err != nil {
|
||||
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
|
||||
return "", err
|
||||
}
|
||||
return k.doSchedule(k.sched.Tasks().Register(podTask, nil))
|
||||
|
||||
podTask, err = k.sched.Tasks().Register(podTask)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return k.doSchedule(podTask)
|
||||
|
||||
//TODO(jdef) it's possible that the pod state has diverged from what
|
||||
//we knew previously, we should probably update the task.Pod state here
|
||||
@@ -100,7 +107,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
|
||||
// but we're going to let someone else handle it, probably the mesos task error handler
|
||||
return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
|
||||
} else {
|
||||
return k.doSchedule(task, nil)
|
||||
return k.doSchedule(task)
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -109,15 +116,16 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
|
||||
}
|
||||
|
||||
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
|
||||
func (k *schedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
func (k *schedulerAlgorithm) doSchedule(task *podtask.T) (string, error) {
|
||||
var offer offers.Perishable
|
||||
var err error
|
||||
|
||||
if task.HasAcceptedOffer() {
|
||||
// verify that the offer is still on the table
|
||||
offerId := task.GetOfferId()
|
||||
if offer, ok := k.sched.Offers().Get(offerId); ok && !offer.HasExpired() {
|
||||
// skip tasks that have already have assigned offers
|
||||
offer = task.Offer
|
||||
} else {
|
||||
var ok bool
|
||||
offer, ok = k.sched.Offers().Get(task.GetOfferId())
|
||||
|
||||
if !ok || offer.HasExpired() {
|
||||
task.Offer.Release()
|
||||
task.Reset()
|
||||
if err = k.sched.Tasks().Update(task); err != nil {
|
||||
@@ -125,26 +133,35 @@ func (k *schedulerAlgorithm) doSchedule(task *podtask.T, err error) (string, err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err == nil && offer == nil {
|
||||
|
||||
if offer == nil {
|
||||
offer, err = k.podScheduler.SchedulePod(k.sched.Offers(), task)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
details := offer.Details()
|
||||
if details == nil {
|
||||
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
|
||||
}
|
||||
|
||||
if task.Offer != nil && task.Offer != offer {
|
||||
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
|
||||
}
|
||||
|
||||
task.Offer = offer
|
||||
k.podScheduler.Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
|
||||
if err := k.podScheduler.Procurement()(task, details); err != nil {
|
||||
offer.Release()
|
||||
task.Reset()
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := k.sched.Tasks().Update(task); err != nil {
|
||||
offer.Release()
|
||||
return "", err
|
||||
}
|
||||
|
||||
return details.GetHostname(), nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user