Revert "Merge pull request #9165 from smarterclayton/graceful"

This reverts commit 4f856b595d, reversing
changes made to d78525a83b.

Conflicts:
	pkg/kubelet/status_manager.go
This commit is contained in:
Robert Bailey
2015-08-18 17:34:49 -07:00
parent 301bf16792
commit 08e6a43c1d
78 changed files with 452 additions and 1494 deletions

View File

@@ -322,8 +322,7 @@ func FilterActivePods(pods []api.Pod) []*api.Pod {
var result []*api.Pod
for i := range pods {
if api.PodSucceeded != pods[i].Status.Phase &&
api.PodFailed != pods[i].Status.Phase &&
pods[i].DeletionTimestamp == nil {
api.PodFailed != pods[i].Status.Phase {
result = append(result, &pods[i])
}
}

View File

@@ -310,11 +310,7 @@ func (e *EndpointController) syncService(key string) {
continue
}
if len(pod.Status.PodIP) == 0 {
glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if pod.DeletionTimestamp != nil {
glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
glog.V(4).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}

View File

@@ -89,11 +89,8 @@ type NodeController struct {
nodeStatusMap map[string]nodeStatusData
now func() util.Time
// worker that evicts pods from unresponsive nodes.
podEvictor *RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue
podEvictor *PodEvictor
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
}
@@ -102,7 +99,7 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient client.Interface,
podEvictionTimeout time.Duration,
podEvictionLimiter util.RateLimiter,
podEvictor *PodEvictor,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
@@ -126,9 +123,7 @@ func NewNodeController(
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
podEvictor: podEvictor,
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
@@ -150,36 +145,38 @@ func (nc *NodeController) Run(period time.Duration) {
}, nc.nodeMonitorPeriod)
go util.Forever(func() {
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.deletePods(value.Value)
if err != nil {
util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.V(2).Infof("Pods terminating on %q", value.Value)
nc.terminationEvictor.Add(value.Value)
}
return true, 0
})
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
}, nodeEvictionPeriod)
}
// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go util.Forever(func() {
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.terminatePods(value.Value, value.Added)
if err != nil {
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
if remaining != 0 {
glog.V(2).Infof("Pods still terminating on %q, estimated completion %s", value.Value, remaining)
return false, remaining
}
return true, 0
})
}, nodeEvictionPeriod)
// We observed a Node deletion in etcd. Currently we only need to remove Pods that
// were assigned to it.
func (nc *NodeController) deleteNode(nodeID string) error {
return nc.deletePods(nodeID)
}
// deletePods will delete all pods from master running on given node.
func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return err
}
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
}
}
return nil
}
// Generates num pod CIDRs that could be assigned to nodes.
@@ -274,18 +271,18 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check eviction timeout against decisionTimestamp
if lastReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.podEvictor.Add(node.Name) {
if nc.podEvictor.AddNodeToEvict(node.Name) {
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
if nc.podEvictor.Add(node.Name) {
if nc.podEvictor.AddNodeToEvict(node.Name) {
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
}
}
if lastReadyCondition.Status == api.ConditionTrue {
if nc.podEvictor.Remove(node.Name) {
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
glog.Infof("Pods on %v won't be evicted", node.Name)
}
}
@@ -305,8 +302,8 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
if _, err := nc.deletePods(node.Name); err != nil {
nc.recordNodeEvent(node.Name, "DeleteingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
continue
}
@@ -314,7 +311,6 @@ func (nc *NodeController) monitorNodeStatus() error {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
}
nc.podEvictor.Add(node.Name)
}
}
}
@@ -506,88 +502,3 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
return gracePeriod, lastReadyCondition, readyCondition, err
}
// We observed a Node deletion in etcd. Currently we only need to remove Pods that
// were assigned to it.
func (nc *NodeController) deleteNode(nodeID string) error {
nc.podEvictor.Add(nodeID)
return nil
}
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func (nc *NodeController) deletePods(nodeID string) (bool, error) {
remaining := false
glog.V(2).Infof("Delete all pods from %s", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return remaining, err
}
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
// if the pod has already been deleted, ignore it
if pod.DeletionGracePeriodSeconds != nil {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
return remaining, nil
}
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up
func (nc *NodeController) terminatePods(nodeID string, since time.Time) (time.Duration, error) {
remaining := time.Duration(0)
glog.V(2).Infof("Terminating all pods on %s", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return remaining, err
}
nc.recordNodeEvent(nodeID, "TerminatingAllPods", fmt.Sprintf("Terminating all Pods on Node %s.", nodeID))
now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > nc.maximumGracePeriod {
grace = nc.maximumGracePeriod
}
next := grace - elapsed
if next < 0 {
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
nc.recordNodeEvent(nodeID, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeID))
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
next = 1
}
}
if remaining < next {
remaining = next
}
}
return remaining, nil
}

View File

@@ -324,8 +324,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
for _, item := range table {
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@@ -339,17 +340,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := nodeController.deletePods(value.Value)
if remaining {
nodeController.terminationEvictor.Add(value.Value)
}
return true, 0
})
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.terminatePods(value.Value, value.Added)
return true, 0
})
podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) })
podEvicted := false
for _, action := range item.fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource() == "pods" {
@@ -540,7 +531,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@@ -619,7 +610,7 @@ func TestNodeDeletion(t *testing.T) {
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@@ -629,10 +620,6 @@ func TestNodeDeletion(t *testing.T) {
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.deletePods(value.Value)
return true, 0
})
podEvicted := false
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource() == "pods" {

View File

@@ -0,0 +1,129 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodecontroller
import (
"sync"
"k8s.io/kubernetes/pkg/util"
"github.com/golang/glog"
)
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue []string
set util.StringSet
}
// Entity responsible for evicting Pods from inserted Nodes. It uses RateLimiter to avoid
// evicting everything at once. Note that we rate limit eviction of Nodes not individual Pods.
type PodEvictor struct {
queue UniqueQueue
deletingPodsRateLimiter util.RateLimiter
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
func (q *UniqueQueue) Add(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.set.Has(value) {
q.queue = append(q.queue, value)
q.set.Insert(value)
return true
} else {
return false
}
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
for i, val := range q.queue {
if val == value {
if i > 0 && i < len(q.queue)-1 {
q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...)
} else if i > 0 {
q.queue = q.queue[0 : len(q.queue)-1]
} else {
q.queue = q.queue[1:len(q.queue)]
}
return true
}
}
return false
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (string, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
return "", false
}
result := q.queue[0]
q.queue = q.queue[1:len(q.queue)]
return result, true
}
// Creates new PodEvictor which will use given RateLimiter to oversee eviction.
func NewPodEvictor(deletingPodsRateLimiter util.RateLimiter) *PodEvictor {
return &PodEvictor{
queue: UniqueQueue{
queue: make([]string, 0),
set: util.NewStringSet(),
},
deletingPodsRateLimiter: deletingPodsRateLimiter,
}
}
// Tries to evict all Pods from previously inserted Nodes. Ends prematurely if RateLimiter forbids any eviction.
// Each Node is processed only once, as long as it's not Removed, i.e. calling multiple AddNodeToEvict does not result
// with multiple evictions as long as RemoveNodeToEvict is not called.
func (pe *PodEvictor) TryEvict(delFunc func(string)) {
val, ok := pe.queue.Get()
for ok {
if pe.deletingPodsRateLimiter.CanAccept() {
glog.Infof("PodEvictor is evicting Pods on Node: %v", val)
delFunc(val)
} else {
glog.V(1).Info("PodEvictor is rate limitted.")
break
}
val, ok = pe.queue.Get()
}
}
// Adds Node to the Evictor to be processed later. Won't add the same Node second time if it was already
// added and not removed.
func (pe *PodEvictor) AddNodeToEvict(nodeName string) bool {
return pe.queue.Add(nodeName)
}
// Removes Node from the Evictor. The Node won't be processed until added again.
func (pe *PodEvictor) RemoveNodeToEvict(nodeName string) bool {
return pe.queue.Remove(nodeName)
}

View File

@@ -17,16 +17,14 @@ limitations under the License.
package nodecontroller
import (
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
)
func CheckQueueEq(lhs []string, rhs TimedQueue) bool {
func CheckQueueEq(lhs, rhs []string) bool {
for i := 0; i < len(lhs); i++ {
if rhs[i].Value != lhs[i] {
if rhs[i] != lhs[i] {
return false
}
}
@@ -38,10 +36,10 @@ func CheckSetEq(lhs, rhs util.StringSet) bool {
}
func TestAddNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
queuePattern := []string{"first", "second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@@ -61,11 +59,11 @@ func TestAddNode(t *testing.T) {
}
func TestDelNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("first")
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("first")
queuePattern := []string{"second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@@ -83,11 +81,11 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("second")
evictor = NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("second")
queuePattern = []string{"first", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@@ -105,11 +103,11 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("third")
evictor = NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("third")
queuePattern = []string{"first", "second"}
if len(evictor.queue.queue) != len(queuePattern) {
@@ -128,18 +126,15 @@ func TestDelNode(t *testing.T) {
}
}
func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("second")
func TestEvictNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("second")
deletedMap := util.NewStringSet()
evictor.Try(func(value TimedValue) (bool, time.Duration) {
deletedMap.Insert(value.Value)
return true, 0
})
evictor.TryEvict(func(nodeName string) { deletedMap.Insert(nodeName) })
setPattern := util.NewStringSet("first", "third")
if len(deletedMap) != len(setPattern) {
@@ -149,35 +144,3 @@ func TestTry(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
}
}
func TestTryOrdering(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
order := []string{}
count := 0
queued := false
evictor.Try(func(value TimedValue) (bool, time.Duration) {
count++
if value.Added.IsZero() {
t.Fatalf("added should not be zero")
}
if value.Next.IsZero() {
t.Fatalf("next should not be zero")
}
if !queued && value.Value == "second" {
queued = true
return false, time.Millisecond
}
order = append(order, value.Value)
return true, 0
})
if reflect.DeepEqual(order, []string{"first", "third", "second"}) {
t.Fatalf("order was wrong: %v", order)
}
if count != 4 {
t.Fatalf("unexpected iterations: %d", count)
}
}

View File

@@ -1,183 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodecontroller
import (
"container/heap"
"sync"
"time"
"k8s.io/kubernetes/pkg/util"
)
// TimedValue is a value that should be processed at a designated time.
type TimedValue struct {
Value string
Added time.Time
Next time.Time
}
// now is used to test time
var now func() time.Time = time.Now
// TimedQueue is a priority heap where the lowest Next is at the front of the queue
type TimedQueue []*TimedValue
func (h TimedQueue) Len() int { return len(h) }
func (h TimedQueue) Less(i, j int) bool { return h[i].Next.Before(h[j].Next) }
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *TimedQueue) Push(x interface{}) {
*h = append(*h, x.(*TimedValue))
}
func (h *TimedQueue) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue TimedQueue
set util.StringSet
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
func (q *UniqueQueue) Add(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
if q.set.Has(value.Value) {
return false
}
heap.Push(&q.queue, &value)
q.set.Insert(value.Value)
return true
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
for i, val := range q.queue {
if val.Value == value {
if i > 0 && i < len(q.queue)-1 {
q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...)
} else if i > 0 {
q.queue = q.queue[0 : len(q.queue)-1]
} else {
q.queue = q.queue[1:len(q.queue)]
}
return true
}
}
return false
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
return TimedValue{}, false
}
result := q.queue.Pop().(*TimedValue)
q.set.Delete(result.Value)
return *result, true
}
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter util.RateLimiter
leak bool
}
// Creates new queue which will use given RateLimiter to oversee execution. If leak is true,
// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued.
func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{
queue: UniqueQueue{
queue: TimedQueue{},
set: util.NewStringSet(),
},
limiter: limiter,
leak: leak,
}
}
// ActionFunc takes a timed value and returns false if the item must be retried, with an optional
// time.Duration if some minimum wait interval should be used.
type ActionFunc func(TimedValue) (bool, time.Duration)
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
// time to execute the next item in the queue.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Get()
for ok {
// rate limit the queue checking
if q.leak {
if !q.limiter.CanAccept() {
break
}
} else {
q.limiter.Accept()
}
now := now()
if now.Before(val.Next) {
q.queue.Add(val)
val, ok = q.queue.Get()
// we do not sleep here because other values may be added at the front of the queue
continue
}
if ok, wait := fn(val); !ok {
val.Next = now.Add(wait + 1)
q.queue.Add(val)
}
val, ok = q.queue.Get()
}
}
// Adds value to the queue to be processed. Won't add the same value a second time if it was already
// added and not removed.
func (q *RateLimitedTimedQueue) Add(value string) bool {
now := now()
return q.queue.Add(TimedValue{
Value: value,
Added: now,
Next: now,
})
}
// Removes Node from the Evictor. The Node won't be processed until added again.
func (q *RateLimitedTimedQueue) Remove(value string) bool {
return q.queue.Remove(value)
}

View File

@@ -213,12 +213,6 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (rm *ReplicationManager) addPod(obj interface{}) {
pod := obj.(*api.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rm.deletePod(pod)
return
}
if rc := rm.getPodController(pod); rc != nil {
rcKey, err := controller.KeyFunc(rc)
if err != nil {
@@ -240,15 +234,6 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
}
// TODO: Write a unittest for this case
curPod := cur.(*api.Pod)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rc never initiates a phase change, and so is never asleep waiting for the same.
rm.deletePod(curPod)
return
}
if rc := rm.getPodController(curPod); rc != nil {
rm.enqueueController(rc)
}