schedulercache: remove bind() from AssumePod
This commit is contained in:
		@@ -106,7 +106,7 @@ func (s *Scheduler) scheduleOne() {
 | 
				
			|||||||
	// will self-repair.
 | 
						// will self-repair.
 | 
				
			||||||
	assumed := *pod
 | 
						assumed := *pod
 | 
				
			||||||
	assumed.Spec.NodeName = dest
 | 
						assumed.Spec.NodeName = dest
 | 
				
			||||||
	s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, func() bool { return true })
 | 
						s.config.SchedulerCache.AssumePod(&assumed)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
							defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -98,19 +98,15 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*api.Pod, error)
 | 
				
			|||||||
	return pods, nil
 | 
						return pods, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (cache *schedulerCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
 | 
					func (cache *schedulerCache) AssumePod(pod *api.Pod) error {
 | 
				
			||||||
	return cache.assumePodIfBindSucceed(pod, bind, time.Now())
 | 
						return cache.assumePod(pod, time.Now())
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// assumePodScheduled exists for making test deterministic by taking time as input argument.
 | 
					// assumePod exists for making test deterministic by taking time as input argument.
 | 
				
			||||||
func (cache *schedulerCache) assumePodIfBindSucceed(pod *api.Pod, bind func() bool, now time.Time) error {
 | 
					func (cache *schedulerCache) assumePod(pod *api.Pod, now time.Time) error {
 | 
				
			||||||
	cache.mu.Lock()
 | 
						cache.mu.Lock()
 | 
				
			||||||
	defer cache.mu.Unlock()
 | 
						defer cache.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if !bind() {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	key, err := getPodKey(pod)
 | 
						key, err := getPodKey(pod)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -87,8 +87,8 @@ func TestAssumePodScheduled(t *testing.T) {
 | 
				
			|||||||
	for i, tt := range tests {
 | 
						for i, tt := range tests {
 | 
				
			||||||
		cache := newSchedulerCache(time.Second, time.Second, nil)
 | 
							cache := newSchedulerCache(time.Second, time.Second, nil)
 | 
				
			||||||
		for _, pod := range tt.pods {
 | 
							for _, pod := range tt.pods {
 | 
				
			||||||
			if err := cache.AssumePodIfBindSucceed(pod, alwaysTrue); err != nil {
 | 
								if err := cache.AssumePod(pod); err != nil {
 | 
				
			||||||
				t.Fatalf("AssumePodScheduled failed: %v", err)
 | 
									t.Fatalf("AssumePod failed: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		n := cache.nodes[nodeName]
 | 
							n := cache.nodes[nodeName]
 | 
				
			||||||
@@ -147,7 +147,7 @@ func TestExpirePod(t *testing.T) {
 | 
				
			|||||||
		cache := newSchedulerCache(ttl, time.Second, nil)
 | 
							cache := newSchedulerCache(ttl, time.Second, nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		for _, pod := range tt.pods {
 | 
							for _, pod := range tt.pods {
 | 
				
			||||||
			if err := cache.assumePodIfBindSucceed(pod.pod, alwaysTrue, pod.assumedTime); err != nil {
 | 
								if err := cache.assumePod(pod.pod, pod.assumedTime); err != nil {
 | 
				
			||||||
				t.Fatalf("assumePod failed: %v", err)
 | 
									t.Fatalf("assumePod failed: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -195,7 +195,7 @@ func TestAddPodWillConfirm(t *testing.T) {
 | 
				
			|||||||
	for i, tt := range tests {
 | 
						for i, tt := range tests {
 | 
				
			||||||
		cache := newSchedulerCache(ttl, time.Second, nil)
 | 
							cache := newSchedulerCache(ttl, time.Second, nil)
 | 
				
			||||||
		for _, podToAssume := range tt.podsToAssume {
 | 
							for _, podToAssume := range tt.podsToAssume {
 | 
				
			||||||
			if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil {
 | 
								if err := cache.assumePod(podToAssume, now); err != nil {
 | 
				
			||||||
				t.Fatalf("assumePod failed: %v", err)
 | 
									t.Fatalf("assumePod failed: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -240,7 +240,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
 | 
				
			|||||||
	now := time.Now()
 | 
						now := time.Now()
 | 
				
			||||||
	for i, tt := range tests {
 | 
						for i, tt := range tests {
 | 
				
			||||||
		cache := newSchedulerCache(ttl, time.Second, nil)
 | 
							cache := newSchedulerCache(ttl, time.Second, nil)
 | 
				
			||||||
		if err := cache.assumePodIfBindSucceed(tt.pod, alwaysTrue, now); err != nil {
 | 
							if err := cache.assumePod(tt.pod, now); err != nil {
 | 
				
			||||||
			t.Fatalf("assumePod failed: %v", err)
 | 
								t.Fatalf("assumePod failed: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		cache.cleanupAssumedPods(now.Add(2 * ttl))
 | 
							cache.cleanupAssumedPods(now.Add(2 * ttl))
 | 
				
			||||||
@@ -369,7 +369,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
 | 
				
			|||||||
	for _, tt := range tests {
 | 
						for _, tt := range tests {
 | 
				
			||||||
		cache := newSchedulerCache(ttl, time.Second, nil)
 | 
							cache := newSchedulerCache(ttl, time.Second, nil)
 | 
				
			||||||
		for _, podToAssume := range tt.podsToAssume {
 | 
							for _, podToAssume := range tt.podsToAssume {
 | 
				
			||||||
			if err := cache.assumePodIfBindSucceed(podToAssume, alwaysTrue, now); err != nil {
 | 
								if err := cache.assumePod(podToAssume, now); err != nil {
 | 
				
			||||||
				t.Fatalf("assumePod failed: %v", err)
 | 
									t.Fatalf("assumePod failed: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -527,14 +527,10 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
 | 
				
			|||||||
		objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
 | 
							objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
 | 
				
			||||||
		pod := makeBasePod(nodeName, objName, "0", "0", nil)
 | 
							pod := makeBasePod(nodeName, objName, "0", "0", nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		err := cache.assumePodIfBindSucceed(pod, alwaysTrue, assumedTime)
 | 
							err := cache.assumePod(pod, assumedTime)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			b.Fatalf("assumePodIfBindSucceed failed: %v", err)
 | 
								b.Fatalf("assumePod failed: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return cache
 | 
						return cache
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
func alwaysTrue() bool {
 | 
					 | 
				
			||||||
	return true
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -56,14 +56,10 @@ import (
 | 
				
			|||||||
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
 | 
					// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
 | 
				
			||||||
//   a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
 | 
					//   a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
 | 
				
			||||||
type Cache interface {
 | 
					type Cache interface {
 | 
				
			||||||
	// AssumePodIfBindSucceed assumes a pod to be scheduled if binding the pod succeeded.
 | 
						// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
 | 
				
			||||||
	// If binding return true, the pod's information is aggregated into designated node.
 | 
					 | 
				
			||||||
	// Note that both binding and assuming are done as one atomic operation from cache's view.
 | 
					 | 
				
			||||||
	// No other events like Add would happen in between binding and assuming.
 | 
					 | 
				
			||||||
	// We are passing the binding function and let implementation take care of concurrency control details.
 | 
					 | 
				
			||||||
	// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
 | 
						// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
 | 
				
			||||||
	// After expiration, its information would be subtracted.
 | 
						// After expiration, its information would be subtracted.
 | 
				
			||||||
	AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error
 | 
						AssumePod(pod *api.Pod) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
 | 
						// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
 | 
				
			||||||
	// If added back, the pod's information would be added again.
 | 
						// If added back, the pod's information would be added again.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,10 +27,7 @@ type FakeCache struct {
 | 
				
			|||||||
	AssumeFunc func(*api.Pod)
 | 
						AssumeFunc func(*api.Pod)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
 | 
					func (f *FakeCache) AssumePod(pod *api.Pod) error {
 | 
				
			||||||
	if !bind() {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	f.AssumeFunc(pod)
 | 
						f.AssumeFunc(pod)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,10 +25,7 @@ import (
 | 
				
			|||||||
// PodsToCache is used for testing
 | 
					// PodsToCache is used for testing
 | 
				
			||||||
type PodsToCache []*api.Pod
 | 
					type PodsToCache []*api.Pod
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
 | 
					func (p PodsToCache) AssumePod(pod *api.Pod) error {
 | 
				
			||||||
	if !bind() {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user