Make CIDR allocation retry backoff exponentially
This also sets to the retry time to be less aggressive fixes #67348
This commit is contained in:
		@@ -71,7 +71,10 @@ const (
 | 
				
			|||||||
	cidrUpdateRetries = 3
 | 
						cidrUpdateRetries = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// updateRetryTimeout is the time to wait before requeing a failed node for retry
 | 
						// updateRetryTimeout is the time to wait before requeing a failed node for retry
 | 
				
			||||||
	updateRetryTimeout = 100 * time.Millisecond
 | 
						updateRetryTimeout = 250 * time.Millisecond
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// maxUpdateRetryTimeout is the maximum amount of time between timeouts.
 | 
				
			||||||
 | 
						maxUpdateRetryTimeout = 5 * time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// updateMaxRetries is the max retries for a failed node
 | 
						// updateMaxRetries is the max retries for a failed node
 | 
				
			||||||
	updateMaxRetries = 10
 | 
						updateMaxRetries = 10
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,6 +24,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/api/core/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/types"
 | 
						"k8s.io/apimachinery/pkg/types"
 | 
				
			||||||
@@ -33,7 +34,6 @@ import (
 | 
				
			|||||||
	"k8s.io/client-go/tools/cache"
 | 
						"k8s.io/client-go/tools/cache"
 | 
				
			||||||
	"k8s.io/client-go/tools/record"
 | 
						"k8s.io/client-go/tools/record"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/api/core/v1"
 | 
					 | 
				
			||||||
	clientset "k8s.io/client-go/kubernetes"
 | 
						clientset "k8s.io/client-go/kubernetes"
 | 
				
			||||||
	"k8s.io/client-go/kubernetes/scheme"
 | 
						"k8s.io/client-go/kubernetes/scheme"
 | 
				
			||||||
	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | 
						v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | 
				
			||||||
@@ -155,16 +155,21 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
 | 
				
			|||||||
				glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
 | 
									glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			if err := ca.updateCIDRAllocation(workItem); err != nil {
 | 
								if err := ca.updateCIDRAllocation(workItem); err == nil {
 | 
				
			||||||
				if ca.canRetry(workItem) {
 | 
									glog.V(3).Infof("Updated CIDR for %q", workItem)
 | 
				
			||||||
					time.AfterFunc(updateRetryTimeout, func() {
 | 
									ca.removeNodeFromProcessing(workItem)
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									glog.Errorf("Error updating CIDR for %q: %v", workItem, err)
 | 
				
			||||||
 | 
									if canRetry, timeout := ca.retryParams(workItem); canRetry {
 | 
				
			||||||
 | 
										glog.V(2).Infof("Retrying update for %q after %v", workItem, timeout)
 | 
				
			||||||
 | 
										time.AfterFunc(timeout, func() {
 | 
				
			||||||
						// Requeue the failed node for update again.
 | 
											// Requeue the failed node for update again.
 | 
				
			||||||
						ca.nodeUpdateChannel <- workItem
 | 
											ca.nodeUpdateChannel <- workItem
 | 
				
			||||||
					})
 | 
										})
 | 
				
			||||||
					continue
 | 
										continue
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									glog.Errorf("Exceeded retry count for %q, dropping from queue", workItem)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			ca.removeNodeFromProcessing(workItem)
 | 
					 | 
				
			||||||
		case <-stopChan:
 | 
							case <-stopChan:
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -181,15 +186,34 @@ func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
 | 
				
			|||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (ca *cloudCIDRAllocator) canRetry(nodeName string) bool {
 | 
					func (ca *cloudCIDRAllocator) retryParams(nodeName string) (bool, time.Duration) {
 | 
				
			||||||
	ca.lock.Lock()
 | 
						ca.lock.Lock()
 | 
				
			||||||
	defer ca.lock.Unlock()
 | 
						defer ca.lock.Unlock()
 | 
				
			||||||
	count := ca.nodesInProcessing[nodeName].retries + 1
 | 
					
 | 
				
			||||||
 | 
						entry, ok := ca.nodesInProcessing[nodeName]
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							glog.Errorf("Cannot get retryParams for %q as entry does not exist", nodeName)
 | 
				
			||||||
 | 
							return false, 0
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						count := entry.retries + 1
 | 
				
			||||||
	if count > updateMaxRetries {
 | 
						if count > updateMaxRetries {
 | 
				
			||||||
		return false
 | 
							return false, 0
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ca.nodesInProcessing[nodeName].retries = count
 | 
						ca.nodesInProcessing[nodeName].retries = count
 | 
				
			||||||
	return true
 | 
					
 | 
				
			||||||
 | 
						return true, nodeUpdateRetryTimeout(count)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func nodeUpdateRetryTimeout(count int) time.Duration {
 | 
				
			||||||
 | 
						timeout := updateRetryTimeout
 | 
				
			||||||
 | 
						for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
 | 
				
			||||||
 | 
							timeout *= 2
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if timeout > maxUpdateRetryTimeout {
 | 
				
			||||||
 | 
							return maxUpdateRetryTimeout
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return timeout
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
 | 
					func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,7 @@ limitations under the License.
 | 
				
			|||||||
package ipam
 | 
					package ipam
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -57,3 +58,22 @@ func TestBoundedRetries(t *testing.T) {
 | 
				
			|||||||
		// wait for node to finish processing (should terminate and not time out)
 | 
							// wait for node to finish processing (should terminate and not time out)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestNodeUpdateRetryTimeout(t *testing.T) {
 | 
				
			||||||
 | 
						for _, tc := range []struct {
 | 
				
			||||||
 | 
							count int
 | 
				
			||||||
 | 
							want  time.Duration
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							{count: 0, want: 250 * time.Millisecond},
 | 
				
			||||||
 | 
							{count: 1, want: 500 * time.Millisecond},
 | 
				
			||||||
 | 
							{count: 2, want: 1000 * time.Millisecond},
 | 
				
			||||||
 | 
							{count: 3, want: 2000 * time.Millisecond},
 | 
				
			||||||
 | 
							{count: 50, want: 5000 * time.Millisecond},
 | 
				
			||||||
 | 
						} {
 | 
				
			||||||
 | 
							t.Run(fmt.Sprintf("count %d", tc.count), func(t *testing.T) {
 | 
				
			||||||
 | 
								if got := nodeUpdateRetryTimeout(tc.count); got != tc.want {
 | 
				
			||||||
 | 
									t.Errorf("nodeUpdateRetryTimeout(tc.count) = %v; want %v", got, tc.want)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user