Remove pods from the assumed pod list when they are deleted

This commit is contained in:
Daniel Smith
2015-03-27 15:33:03 -07:00
parent 31324a0ba5
commit fd952862c3
4 changed files with 156 additions and 24 deletions

View File

@@ -103,6 +103,21 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
return f.CreateFromKeys(predicateKeys, priorityKeys)
}
// ReflectorDeletionHook passes all operations through to Store, but calls
// OnDelete in a goroutine if there is a deletion.
type ReflectorDeletionHook struct {
cache.Store
OnDelete func(obj interface{})
}
func (r ReflectorDeletionHook) Delete(obj interface{}) error {
go func() {
defer util.HandleCrash()
r.OnDelete(obj)
}()
return r.Store.Delete(obj)
}
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
@@ -125,9 +140,22 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
// Watch and queue pods that need scheduling.
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run()
// Pass through all events to the scheduled pod store, but on a deletion,
// also remove from the assumed pods.
assumedPodDeleter := ReflectorDeletionHook{
Store: f.ScheduledPodLister.Store,
OnDelete: func(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
f.modeler.LockedAction(func() {
f.modeler.ForgetPod(pod)
})
}
},
}
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.ScheduledPodLister.Store, 0).Run()
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.

View File

@@ -19,6 +19,7 @@ package scheduler
import (
"fmt"
"strings"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
@@ -40,9 +41,24 @@ type ExtendedPodLister interface {
Exists(pod *api.Pod) (bool, error)
}
// actionLocker implements lockedAction (so the fake and SimpleModeler can both
// use it)
type actionLocker struct {
sync.Mutex
}
// LockedAction serializes calls of whatever is passed as 'do'.
func (a *actionLocker) LockedAction(do func()) {
a.Lock()
defer a.Unlock()
do()
}
// FakeModeler implements the SystemModeler interface.
type FakeModeler struct {
AssumePodFunc func(pod *api.Pod)
ForgetPodFunc func(pod *api.Pod)
actionLocker
}
// AssumePod calls the function variable if it is not nil.
@@ -52,6 +68,13 @@ func (f *FakeModeler) AssumePod(pod *api.Pod) {
}
}
// ForgetPod calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPod(pod *api.Pod) {
if f.ForgetPodFunc != nil {
f.ForgetPodFunc(pod)
}
}
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
type SimpleModeler struct {
queuedPods ExtendedPodLister
@@ -61,6 +84,8 @@ type SimpleModeler struct {
// haven't yet shown up in the scheduledPods variable.
// TODO: periodically clear this.
assumedPods *cache.StoreToPodLister
actionLocker
}
// NewSimpleModeler returns a new SimpleModeler.
@@ -78,6 +103,10 @@ func (s *SimpleModeler) AssumePod(pod *api.Pod) {
s.assumedPods.Add(pod)
}
func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
s.assumedPods.Delete(pod)
}
// Extract names for readable logging.
func podNames(pods []api.Pod) []string {
out := make([]string, len(pods))

View File

@@ -44,6 +44,16 @@ type SystemModeler interface {
// The assumtion should last until the system confirms the
// assumtion or disconfirms it.
AssumePod(pod *api.Pod)
// ForgetPod removes a pod assumtion. (It won't make the model
// show the absence of the given pod if the pod is in the scheduled
// pods list!)
ForgetPod(pod *api.Pod)
// For serializing calls to Assume/ForgetPod: imagine you want to add
// a pod iff a bind succeeds, but also remove a pod if it is deleted.
// TODO: if SystemModeler begins modeling things other than pods, this
// should probably be parameterized or specialized for pods.
LockedAction(f func())
}
// Scheduler watches for new unscheduled pods. It attempts to find
@@ -104,16 +114,21 @@ func (s *Scheduler) scheduleOne() {
Name: dest,
},
}
if err := s.config.Binder.Bind(b); err != nil {
glog.V(1).Infof("Failed to bind pod: %v", err)
s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
// tell the model to assume that this binding took effect.
assumed := *pod
assumed.Spec.Host = dest
assumed.Status.Host = dest
s.config.Modeler.AssumePod(&assumed)
// We want to add the pod to the model iff the bind succeeds, but we don't want to race
// with any deletions, which happen asyncronously.
s.config.Modeler.LockedAction(func() {
if err := s.config.Binder.Bind(b); err != nil {
glog.V(1).Infof("Failed to bind pod: %v", err)
s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
}
s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
// tell the model to assume that this binding took effect.
assumed := *pod
assumed.Spec.Host = dest
assumed.Status.Host = dest
s.config.Modeler.AssumePod(&assumed)
})
}