Move makeCompositeReconciler into taskreconciler package
This commit is contained in:
		| @@ -283,7 +283,7 @@ func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) { | ||||
| 	r1 := k.makeTaskRegistryReconciler() | ||||
| 	r2 := k.makePodRegistryReconciler() | ||||
|  | ||||
| 	k.tasksReconciler = taskreconciler.New(k.asRegisteredMaster, k.makeCompositeReconciler(r1, r2), | ||||
| 	k.tasksReconciler = taskreconciler.New(k.asRegisteredMaster, taskreconciler.MakeComposite(k.terminate, r1, r2), | ||||
| 		k.reconcileCooldown, k.schedulerConfig.ExplicitReconciliationAbortTimeout.Duration, k.terminate) | ||||
| 	go k.tasksReconciler.Run(driver, k.terminate) | ||||
|  | ||||
| @@ -569,58 +569,6 @@ func explicitTaskFilter(t *podtask.T) bool { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // invoke the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation | ||||
| // is cancelled. if any other errors occur the composite reconciler will attempt to complete the | ||||
| // sequence, reporting only the last generated error. | ||||
| func (k *framework) makeCompositeReconciler(actions ...taskreconciler.Action) taskreconciler.Action { | ||||
| 	if x := len(actions); x == 0 { | ||||
| 		// programming error | ||||
| 		panic("no actions specified for composite reconciler") | ||||
| 	} else if x == 1 { | ||||
| 		return actions[0] | ||||
| 	} | ||||
| 	chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b taskreconciler.Action) <-chan error { | ||||
| 		ech := a(d, c) | ||||
| 		ch := make(chan error, 1) | ||||
| 		go func() { | ||||
| 			select { | ||||
| 			case <-k.terminate: | ||||
| 			case <-c: | ||||
| 			case e := <-ech: | ||||
| 				if e != nil { | ||||
| 					ch <- e | ||||
| 					return | ||||
| 				} | ||||
| 				ech = b(d, c) | ||||
| 				select { | ||||
| 				case <-k.terminate: | ||||
| 				case <-c: | ||||
| 				case e := <-ech: | ||||
| 					if e != nil { | ||||
| 						ch <- e | ||||
| 						return | ||||
| 					} | ||||
| 					close(ch) | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 			ch <- fmt.Errorf("aborting composite reconciler action") | ||||
| 		}() | ||||
| 		return ch | ||||
| 	} | ||||
| 	result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { | ||||
| 		return chained(d, c, actions[0], actions[1]) | ||||
| 	} | ||||
| 	for i := 2; i < len(actions); i++ { | ||||
| 		i := i | ||||
| 		next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { | ||||
| 			return chained(d, c, taskreconciler.Action(result), actions[i]) | ||||
| 		} | ||||
| 		result = next | ||||
| 	} | ||||
| 	return taskreconciler.Action(result) | ||||
| } | ||||
|  | ||||
| // reconciler action factory, performs explicit task reconciliation for non-terminal | ||||
| // tasks listed in the scheduler's internal taskRegistry. | ||||
| func (k *framework) makeTaskRegistryReconciler() taskreconciler.Action { | ||||
|   | ||||
| @@ -17,6 +17,7 @@ limitations under the License. | ||||
| package taskreconciler | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	log "github.com/golang/glog" | ||||
| @@ -180,3 +181,55 @@ requestLoop: | ||||
| 		} | ||||
| 	} // for | ||||
| } | ||||
|  | ||||
| // MakeComposite invokes the given ReconcilerAction funcs in sequence, aborting the sequence if reconciliation | ||||
| // is cancelled. if any other errors occur the composite reconciler will attempt to complete the | ||||
| // sequence, reporting only the last generated error. | ||||
| func MakeComposite(done <-chan struct{}, actions ...Action) Action { | ||||
| 	if x := len(actions); x == 0 { | ||||
| 		// programming error | ||||
| 		panic("no actions specified for composite reconciler") | ||||
| 	} else if x == 1 { | ||||
| 		return actions[0] | ||||
| 	} | ||||
| 	chained := func(d bindings.SchedulerDriver, c <-chan struct{}, a, b Action) <-chan error { | ||||
| 		ech := a(d, c) | ||||
| 		ch := make(chan error, 1) | ||||
| 		go func() { | ||||
| 			select { | ||||
| 			case <-done: | ||||
| 			case <-c: | ||||
| 			case e := <-ech: | ||||
| 				if e != nil { | ||||
| 					ch <- e | ||||
| 					return | ||||
| 				} | ||||
| 				ech = b(d, c) | ||||
| 				select { | ||||
| 				case <-done: | ||||
| 				case <-c: | ||||
| 				case e := <-ech: | ||||
| 					if e != nil { | ||||
| 						ch <- e | ||||
| 						return | ||||
| 					} | ||||
| 					close(ch) | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 			ch <- fmt.Errorf("aborting composite reconciler action") | ||||
| 		}() | ||||
| 		return ch | ||||
| 	} | ||||
| 	result := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { | ||||
| 		return chained(d, c, actions[0], actions[1]) | ||||
| 	} | ||||
| 	for i := 2; i < len(actions); i++ { | ||||
| 		i := i | ||||
| 		next := func(d bindings.SchedulerDriver, c <-chan struct{}) <-chan error { | ||||
| 			return chained(d, c, Action(result), actions[i]) | ||||
| 		} | ||||
| 		result = next | ||||
| 	} | ||||
| 	return Action(result) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Dr. Stefan Schimanski
					Dr. Stefan Schimanski