(ALPHA GCP FEATURE) Add IPAM controller
IPAM controller unifies handling of node pod CIDR range allocation. It is intended to supersede the logic that is currently in range_allocator and cloud_cidr_allocator. Note: for this change, the other allocators still exist and are the default. It supports two modes: * CIDR range allocations done within the cluster that are then propagated out to the cloud provider. * Cloud provider managed IPAM that is then reflected into the cluster.
This commit is contained in:
@@ -27,7 +27,6 @@ import (
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
@@ -49,6 +48,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/node/ipam"
|
||||
nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync"
|
||||
"k8s.io/kubernetes/pkg/controller/node/scheduler"
|
||||
"k8s.io/kubernetes/pkg/controller/node/util"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
@@ -66,12 +66,13 @@ func init() {
|
||||
|
||||
var (
|
||||
gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0")
|
||||
|
||||
// UnreachableTaintTemplate is the taint for when a node becomes unreachable.
|
||||
UnreachableTaintTemplate = &v1.Taint{
|
||||
Key: algorithm.TaintNodeUnreachable,
|
||||
Effect: v1.TaintEffectNoExecute,
|
||||
}
|
||||
|
||||
// NotReadyTaintTemplate is the taint for when a node is not ready for
|
||||
// executing pods
|
||||
NotReadyTaintTemplate = &v1.Taint{
|
||||
Key: algorithm.TaintNodeNotReady,
|
||||
Effect: v1.TaintEffectNoExecute,
|
||||
@@ -97,15 +98,26 @@ const (
|
||||
apiserverStartupGracePeriod = 10 * time.Minute
|
||||
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
|
||||
retrySleepTime = 20 * time.Millisecond
|
||||
|
||||
// ipamResyncInterval is the amount of time between when the cloud and node
|
||||
// CIDR range assignments are synchronized.
|
||||
ipamResyncInterval = 30 * time.Second
|
||||
// ipamMaxBackoff is the maximum backoff for retrying synchronization of a
|
||||
// given in the error state.
|
||||
ipamMaxBackoff = 10 * time.Second
|
||||
// ipamInitialRetry is the initial retry interval for retrying synchronization of a
|
||||
// given in the error state.
|
||||
ipamInitialBackoff = 250 * time.Millisecond
|
||||
)
|
||||
|
||||
type zoneState string
|
||||
// ZoneState is the state of a given zone.
|
||||
type ZoneState string
|
||||
|
||||
const (
|
||||
stateInitial = zoneState("Initial")
|
||||
stateNormal = zoneState("Normal")
|
||||
stateFullDisruption = zoneState("FullDisruption")
|
||||
statePartialDisruption = zoneState("PartialDisruption")
|
||||
stateInitial = ZoneState("Initial")
|
||||
stateNormal = ZoneState("Normal")
|
||||
stateFullDisruption = ZoneState("FullDisruption")
|
||||
statePartialDisruption = ZoneState("PartialDisruption")
|
||||
)
|
||||
|
||||
type nodeStatusData struct {
|
||||
@@ -114,7 +126,8 @@ type nodeStatusData struct {
|
||||
status v1.NodeStatus
|
||||
}
|
||||
|
||||
type NodeController struct {
|
||||
// Controller is the controller that manages node related cluster state.
|
||||
type Controller struct {
|
||||
allocateNodeCIDRs bool
|
||||
allocatorType ipam.CIDRAllocatorType
|
||||
|
||||
@@ -125,10 +138,10 @@ type NodeController struct {
|
||||
kubeClient clientset.Interface
|
||||
// Method for easy mocking in unittest.
|
||||
lookupIP func(host string) ([]net.IP, error)
|
||||
// Value used if sync_nodes_status=False. NodeController will not proactively
|
||||
// Value used if sync_nodes_status=False. Controller will not proactively
|
||||
// sync node status in this case, but will monitor node status updated from kubelet. If
|
||||
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
|
||||
// ConditionUnknown". The amount of time before which NodeController start evicting pods
|
||||
// ConditionUnknown". The amount of time before which Controller start evicting pods
|
||||
// is controlled via flag 'pod-eviction-timeout'.
|
||||
// Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
|
||||
// in kubelet. There are several constraints:
|
||||
@@ -140,7 +153,7 @@ type NodeController struct {
|
||||
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
|
||||
// longer for user to see up-to-date node status.
|
||||
nodeMonitorGracePeriod time.Duration
|
||||
// Value controlling NodeController monitoring period, i.e. how often does NodeController
|
||||
// Value controlling Controller monitoring period, i.e. how often does Controller
|
||||
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
|
||||
// TODO: Change node status monitor to watch based.
|
||||
nodeMonitorPeriod time.Duration
|
||||
@@ -170,28 +183,26 @@ type NodeController struct {
|
||||
daemonSetInformerSynced cache.InformerSynced
|
||||
|
||||
podInformerSynced cache.InformerSynced
|
||||
|
||||
cidrAllocator ipam.CIDRAllocator
|
||||
|
||||
taintManager *scheduler.NoExecuteTaintManager
|
||||
cidrAllocator ipam.CIDRAllocator
|
||||
taintManager *scheduler.NoExecuteTaintManager
|
||||
|
||||
forcefullyDeletePod func(*v1.Pod) error
|
||||
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
|
||||
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, zoneState)
|
||||
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
|
||||
enterPartialDisruptionFunc func(nodeNum int) float32
|
||||
enterFullDisruptionFunc func(nodeNum int) float32
|
||||
|
||||
zoneStates map[string]zoneState
|
||||
zoneStates map[string]ZoneState
|
||||
evictionLimiterQPS float32
|
||||
secondaryEvictionLimiterQPS float32
|
||||
largeClusterThreshold int32
|
||||
unhealthyZoneThreshold float32
|
||||
|
||||
// if set to true NodeController will start TaintManager that will evict Pods from
|
||||
// if set to true Controller will start TaintManager that will evict Pods from
|
||||
// tainted nodes, if they're not tolerated.
|
||||
runTaintManager bool
|
||||
|
||||
// if set to true NodeController will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
|
||||
// if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
|
||||
// taints instead of evicting Pods itself.
|
||||
useTaintBasedEvictions bool
|
||||
|
||||
@@ -225,17 +236,21 @@ func NewNodeController(
|
||||
allocatorType ipam.CIDRAllocatorType,
|
||||
runTaintManager bool,
|
||||
useTaintBasedEvictions bool,
|
||||
taintNodeByCondition bool,
|
||||
) (*NodeController, error) {
|
||||
taintNodeByCondition bool) (*Controller, error) {
|
||||
|
||||
if kubeClient == nil {
|
||||
glog.Fatalf("kubeClient is nil when starting Controller")
|
||||
}
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
if kubeClient != nil {
|
||||
glog.V(0).Infof("Sending events to api server.")
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
||||
} else {
|
||||
glog.Fatalf("kubeClient is nil when starting NodeController")
|
||||
}
|
||||
|
||||
glog.V(0).Infof("Sending events to api server.")
|
||||
eventBroadcaster.StartRecordingToSink(
|
||||
&v1core.EventSinkImpl{
|
||||
Interface: v1core.New(kubeClient.Core().RESTClient()).Events(""),
|
||||
})
|
||||
|
||||
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().RESTClient().GetRateLimiter())
|
||||
@@ -243,45 +258,49 @@ func NewNodeController(
|
||||
|
||||
if allocateNodeCIDRs {
|
||||
if clusterCIDR == nil {
|
||||
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
|
||||
glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.")
|
||||
}
|
||||
mask := clusterCIDR.Mask
|
||||
if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize {
|
||||
glog.Fatal("NodeController: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
|
||||
glog.Fatal("Controller: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
|
||||
}
|
||||
}
|
||||
|
||||
nc := &NodeController{
|
||||
cloud: cloud,
|
||||
knownNodeSet: make(map[string]*v1.Node),
|
||||
kubeClient: kubeClient,
|
||||
recorder: recorder,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
maximumGracePeriod: 5 * time.Minute,
|
||||
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
|
||||
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
|
||||
nodeStatusMap: make(map[string]nodeStatusData),
|
||||
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||
lookupIP: net.LookupIP,
|
||||
now: metav1.Now,
|
||||
clusterCIDR: clusterCIDR,
|
||||
serviceCIDR: serviceCIDR,
|
||||
allocateNodeCIDRs: allocateNodeCIDRs,
|
||||
allocatorType: allocatorType,
|
||||
forcefullyDeletePod: func(p *v1.Pod) error { return util.ForcefullyDeletePod(kubeClient, p) },
|
||||
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return util.NodeExistsInCloudProvider(cloud, nodeName) },
|
||||
nc := &Controller{
|
||||
cloud: cloud,
|
||||
knownNodeSet: make(map[string]*v1.Node),
|
||||
kubeClient: kubeClient,
|
||||
recorder: recorder,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
maximumGracePeriod: 5 * time.Minute,
|
||||
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
|
||||
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
|
||||
nodeStatusMap: make(map[string]nodeStatusData),
|
||||
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||
lookupIP: net.LookupIP,
|
||||
now: metav1.Now,
|
||||
clusterCIDR: clusterCIDR,
|
||||
serviceCIDR: serviceCIDR,
|
||||
allocateNodeCIDRs: allocateNodeCIDRs,
|
||||
allocatorType: allocatorType,
|
||||
forcefullyDeletePod: func(p *v1.Pod) error {
|
||||
return util.ForcefullyDeletePod(kubeClient, p)
|
||||
},
|
||||
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) {
|
||||
return util.NodeExistsInCloudProvider(cloud, nodeName)
|
||||
},
|
||||
evictionLimiterQPS: evictionLimiterQPS,
|
||||
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
|
||||
largeClusterThreshold: largeClusterThreshold,
|
||||
unhealthyZoneThreshold: unhealthyZoneThreshold,
|
||||
zoneStates: make(map[string]zoneState),
|
||||
zoneStates: make(map[string]ZoneState),
|
||||
runTaintManager: runTaintManager,
|
||||
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
|
||||
}
|
||||
if useTaintBasedEvictions {
|
||||
glog.Infof("NodeController is using taint based evictions.")
|
||||
glog.Infof("Controller is using taint based evictions.")
|
||||
}
|
||||
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
|
||||
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
|
||||
@@ -326,67 +345,34 @@ func NewNodeController(
|
||||
nc.podInformerSynced = podInformer.Informer().HasSynced
|
||||
|
||||
if nc.allocateNodeCIDRs {
|
||||
var nodeList *v1.NodeList
|
||||
var err error
|
||||
// We must poll because apiserver might not be up. This error causes
|
||||
// controller manager to restart.
|
||||
if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) {
|
||||
nodeList, err = kubeClient.Core().Nodes().List(metav1.ListOptions{
|
||||
FieldSelector: fields.Everything().String(),
|
||||
LabelSelector: labels.Everything().String(),
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list all nodes: %v", err)
|
||||
return false, nil
|
||||
if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
|
||||
cfg := &ipam.Config{
|
||||
Resync: ipamResyncInterval,
|
||||
MaxBackoff: ipamMaxBackoff,
|
||||
InitialRetry: ipamInitialBackoff,
|
||||
}
|
||||
return true, nil
|
||||
}); pollErr != nil {
|
||||
return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod)
|
||||
switch nc.allocatorType {
|
||||
case ipam.IPAMFromClusterAllocatorType:
|
||||
cfg.Mode = nodesync.SyncFromCluster
|
||||
case ipam.IPAMFromCloudAllocatorType:
|
||||
cfg.Mode = nodesync.SyncFromCloud
|
||||
}
|
||||
ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize)
|
||||
if err != nil {
|
||||
glog.Fatalf("Error creating ipam controller: %v", err)
|
||||
}
|
||||
if err := ipamc.Start(nodeInformer); err != nil {
|
||||
glog.Fatalf("Error trying to Init(): %v", err)
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
nc.cidrAllocator, err = ipam.New(
|
||||
kubeClient, cloud, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nc.cidrAllocator.Register(nodeInformer)
|
||||
}
|
||||
|
||||
switch nc.allocatorType {
|
||||
case ipam.RangeAllocatorType:
|
||||
nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator(
|
||||
kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
|
||||
case ipam.CloudAllocatorType:
|
||||
nc.cidrAllocator, err = ipam.NewCloudCIDRAllocator(kubeClient, cloud)
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", nc.allocatorType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: util.CreateAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
|
||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
// If the PodCIDR is not empty we either:
|
||||
// - already processed a Node that already had a CIDR after NC restarted
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node successfully and allocated a CIDR for it
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node but we did saw a "timeout" response and
|
||||
// request eventually got through in this case we haven't released
|
||||
// the allocated CIDR (cidr is still marked as used).
|
||||
// There's a possible error here:
|
||||
// - NC sees a new Node and assigns a CIDR X to it,
|
||||
// - Update Node call fails with a timeout,
|
||||
// - Node is updated by some other component, NC sees an update and
|
||||
// assigns CIDR Y to the Node,
|
||||
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||
// even though Node sees only CIDR Y
|
||||
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||
// which prevents it from being assigned to any new node. The cluster
|
||||
// state is correct.
|
||||
// Restart of NC fixes the issue.
|
||||
if newNode.Spec.PodCIDR == "" {
|
||||
return nc.cidrAllocator.AllocateOrOccupyCIDR(newNode)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: util.CreateDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
|
||||
})
|
||||
}
|
||||
|
||||
if nc.runTaintManager {
|
||||
@@ -427,7 +413,7 @@ func NewNodeController(
|
||||
return nc, nil
|
||||
}
|
||||
|
||||
func (nc *NodeController) doEvictionPass() {
|
||||
func (nc *Controller) doEvictionPass() {
|
||||
nc.evictorLock.Lock()
|
||||
defer nc.evictorLock.Unlock()
|
||||
for k := range nc.zonePodEvictor {
|
||||
@@ -440,23 +426,23 @@ func (nc *NodeController) doEvictionPass() {
|
||||
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
|
||||
} else {
|
||||
zone := utilnode.GetZoneKey(node)
|
||||
EvictionsNumber.WithLabelValues(zone).Inc()
|
||||
evictionsNumber.WithLabelValues(zone).Inc()
|
||||
}
|
||||
nodeUid, _ := value.UID.(string)
|
||||
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
|
||||
nodeUID, _ := value.UID.(string)
|
||||
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
|
||||
return false, 0
|
||||
}
|
||||
if remaining {
|
||||
glog.Infof("Pods awaiting deletion due to NodeController eviction")
|
||||
glog.Infof("Pods awaiting deletion due to Controller eviction")
|
||||
}
|
||||
return true, 0
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error {
|
||||
func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
|
||||
// Map node's condition to Taints.
|
||||
taints := []v1.Taint{}
|
||||
for _, condition := range node.Status.Conditions {
|
||||
@@ -484,7 +470,7 @@ func (nc *NodeController) doNoScheduleTaintingPass(node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nc *NodeController) doNoExecuteTaintingPass() {
|
||||
func (nc *Controller) doNoExecuteTaintingPass() {
|
||||
nc.evictorLock.Lock()
|
||||
defer nc.evictorLock.Unlock()
|
||||
for k := range nc.zoneNoExecuteTainer {
|
||||
@@ -500,7 +486,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
|
||||
return false, 50 * time.Millisecond
|
||||
} else {
|
||||
zone := utilnode.GetZoneKey(node)
|
||||
EvictionsNumber.WithLabelValues(zone).Inc()
|
||||
evictionsNumber.WithLabelValues(zone).Inc()
|
||||
}
|
||||
_, condition := v1node.GetNodeCondition(&node.Status, v1.NodeReady)
|
||||
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
|
||||
@@ -524,7 +510,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
|
||||
}
|
||||
|
||||
// Run starts an asynchronous loop that monitors the status of cluster nodes.
|
||||
func (nc *NodeController) Run(stopCh <-chan struct{}) {
|
||||
func (nc *Controller) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
glog.Infof("Starting node controller")
|
||||
@@ -560,7 +546,7 @@ func (nc *NodeController) Run(stopCh <-chan struct{}) {
|
||||
}
|
||||
|
||||
// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
|
||||
func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) {
|
||||
func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
|
||||
zone := utilnode.GetZoneKey(node)
|
||||
if _, found := nc.zoneStates[zone]; !found {
|
||||
nc.zoneStates[zone] = stateInitial
|
||||
@@ -575,14 +561,14 @@ func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) {
|
||||
}
|
||||
// Init the metric for the new zone.
|
||||
glog.Infof("Initializing eviction metric for zone: %v", zone)
|
||||
EvictionsNumber.WithLabelValues(zone).Add(0)
|
||||
evictionsNumber.WithLabelValues(zone).Add(0)
|
||||
}
|
||||
}
|
||||
|
||||
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
|
||||
// not reachable for a long period of time.
|
||||
func (nc *NodeController) monitorNodeStatus() error {
|
||||
func (nc *Controller) monitorNodeStatus() error {
|
||||
// We are listing nodes from local cache as we can tolerate some small delays
|
||||
// comparing to state from etcd and there is eventual consistency anyway.
|
||||
nodes, err := nc.nodeLister.List(labels.Everything())
|
||||
@@ -596,8 +582,8 @@ func (nc *NodeController) monitorNodeStatus() error {
|
||||
}
|
||||
|
||||
for i := range added {
|
||||
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
|
||||
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
|
||||
glog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
|
||||
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
|
||||
nc.knownNodeSet[added[i].Name] = added[i]
|
||||
nc.addPodEvictorForNewZone(added[i])
|
||||
if nc.useTaintBasedEvictions {
|
||||
@@ -608,8 +594,8 @@ func (nc *NodeController) monitorNodeStatus() error {
|
||||
}
|
||||
|
||||
for i := range deleted {
|
||||
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
|
||||
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
|
||||
glog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
|
||||
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
|
||||
delete(nc.knownNodeSet, deleted[i].Name)
|
||||
}
|
||||
|
||||
@@ -632,7 +618,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
||||
}
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
glog.Errorf("Update status of Node %v from NodeController error : %v. "+
|
||||
glog.Errorf("Update status of Node %v from Controller error : %v. "+
|
||||
"Skipping - no pods will be evicted.", node.Name, err)
|
||||
continue
|
||||
}
|
||||
@@ -752,14 +738,14 @@ func (nc *NodeController) monitorNodeStatus() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
|
||||
newZoneStates := map[string]zoneState{}
|
||||
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
|
||||
newZoneStates := map[string]ZoneState{}
|
||||
allAreFullyDisrupted := true
|
||||
for k, v := range zoneToNodeConditions {
|
||||
ZoneSize.WithLabelValues(k).Set(float64(len(v)))
|
||||
zoneSize.WithLabelValues(k).Set(float64(len(v)))
|
||||
unhealthy, newState := nc.computeZoneStateFunc(v)
|
||||
ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
|
||||
UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
|
||||
zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
|
||||
unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
|
||||
if newState != stateFullDisruption {
|
||||
allAreFullyDisrupted = false
|
||||
}
|
||||
@@ -773,9 +759,9 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
|
||||
allWasFullyDisrupted := true
|
||||
for k, v := range nc.zoneStates {
|
||||
if _, have := zoneToNodeConditions[k]; !have {
|
||||
ZoneSize.WithLabelValues(k).Set(0)
|
||||
ZoneHealth.WithLabelValues(k).Set(100)
|
||||
UnhealthyNodes.WithLabelValues(k).Set(0)
|
||||
zoneSize.WithLabelValues(k).Set(0)
|
||||
zoneHealth.WithLabelValues(k).Set(100)
|
||||
unhealthyNodes.WithLabelValues(k).Set(0)
|
||||
delete(nc.zoneStates, k)
|
||||
continue
|
||||
}
|
||||
@@ -793,7 +779,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
|
||||
if !allAreFullyDisrupted || !allWasFullyDisrupted {
|
||||
// We're switching to full disruption mode
|
||||
if allAreFullyDisrupted {
|
||||
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
|
||||
glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
|
||||
for i := range nodes {
|
||||
if nc.useTaintBasedEvictions {
|
||||
_, err := nc.markNodeAsReachable(nodes[i])
|
||||
@@ -820,7 +806,7 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
|
||||
}
|
||||
// We're exiting full disruption mode
|
||||
if allWasFullyDisrupted {
|
||||
glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
|
||||
glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.")
|
||||
// When exiting disruption mode update probe timestamps on all Nodes.
|
||||
now := nc.now()
|
||||
for i := range nodes {
|
||||
@@ -843,14 +829,14 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
|
||||
if v == newState {
|
||||
continue
|
||||
}
|
||||
glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
|
||||
glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState)
|
||||
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
|
||||
nc.zoneStates[k] = newState
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
|
||||
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
|
||||
switch state {
|
||||
case stateNormal:
|
||||
if nc.useTaintBasedEvictions {
|
||||
@@ -879,7 +865,7 @@ func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zone
|
||||
|
||||
// tryUpdateNodeStatus checks a given node's conditions and tries to update it. Returns grace period to
|
||||
// which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
|
||||
func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
|
||||
func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
|
||||
var err error
|
||||
var gracePeriod time.Duration
|
||||
var observedReadyCondition v1.NodeCondition
|
||||
@@ -909,7 +895,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
|
||||
savedNodeStatus, found := nc.nodeStatusMap[node.Name]
|
||||
// There are following cases to check:
|
||||
// - both saved and new status have no Ready Condition set - we leave everything as it is,
|
||||
// - saved status have no Ready Condition, but current one does - NodeController was restarted with Node data already present in etcd,
|
||||
// - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
|
||||
// - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
|
||||
// - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
|
||||
// unresponsive, so we leave it as it is,
|
||||
@@ -1036,14 +1022,13 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
|
||||
if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil {
|
||||
glog.Errorf("Error updating node %s: %v", node.Name, err)
|
||||
return gracePeriod, observedReadyCondition, currentReadyCondition, err
|
||||
} else {
|
||||
nc.nodeStatusMap[node.Name] = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
}
|
||||
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
|
||||
}
|
||||
nc.nodeStatusMap[node.Name] = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
}
|
||||
return gracePeriod, observedReadyCondition, currentReadyCondition, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1054,7 +1039,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
|
||||
// 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet'
|
||||
// 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes'
|
||||
// 3. newZoneRepresentatives: the nodes that in both 'knownNodeSet' and 'allNodes', but no zone states
|
||||
func (nc *NodeController) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
|
||||
func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
|
||||
for i := range allNodes {
|
||||
if _, has := nc.knownNodeSet[allNodes[i].Name]; !has {
|
||||
added = append(added, allNodes[i])
|
||||
@@ -1086,7 +1071,7 @@ func (nc *NodeController) classifyNodes(allNodes []*v1.Node) (added, deleted, ne
|
||||
|
||||
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
|
||||
// returns true if an eviction was queued.
|
||||
func (nc *NodeController) cancelPodEviction(node *v1.Node) bool {
|
||||
func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
|
||||
zone := utilnode.GetZoneKey(node)
|
||||
nc.evictorLock.Lock()
|
||||
defer nc.evictorLock.Unlock()
|
||||
@@ -1100,19 +1085,19 @@ func (nc *NodeController) cancelPodEviction(node *v1.Node) bool {
|
||||
|
||||
// evictPods queues an eviction for the provided node name, and returns false if the node is already
|
||||
// queued for eviction.
|
||||
func (nc *NodeController) evictPods(node *v1.Node) bool {
|
||||
func (nc *Controller) evictPods(node *v1.Node) bool {
|
||||
nc.evictorLock.Lock()
|
||||
defer nc.evictorLock.Unlock()
|
||||
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
|
||||
}
|
||||
|
||||
func (nc *NodeController) markNodeForTainting(node *v1.Node) bool {
|
||||
func (nc *Controller) markNodeForTainting(node *v1.Node) bool {
|
||||
nc.evictorLock.Lock()
|
||||
defer nc.evictorLock.Unlock()
|
||||
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
|
||||
}
|
||||
|
||||
func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) {
|
||||
func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
|
||||
nc.evictorLock.Lock()
|
||||
defer nc.evictorLock.Unlock()
|
||||
err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, UnreachableTaintTemplate)
|
||||
@@ -1128,13 +1113,15 @@ func (nc *NodeController) markNodeAsReachable(node *v1.Node) (bool, error) {
|
||||
return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil
|
||||
}
|
||||
|
||||
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
|
||||
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
|
||||
// HealthyQPSFunc returns the default value for cluster eviction rate - we take
|
||||
// nodeNum for consistency with ReducedQPSFunc.
|
||||
func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
|
||||
return nc.evictionLimiterQPS
|
||||
}
|
||||
|
||||
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
|
||||
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
|
||||
// ReducedQPSFunc returns the QPS for when a the cluster is large make
|
||||
// evictions slower, if they're small stop evictions altogether.
|
||||
func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
|
||||
if int32(nodeNum) > nc.largeClusterThreshold {
|
||||
return nc.secondaryEvictionLimiterQPS
|
||||
}
|
||||
@@ -1146,7 +1133,7 @@ func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
|
||||
// - fullyDisrupted if there're no Ready Nodes,
|
||||
// - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
|
||||
// - normal otherwise
|
||||
func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, zoneState) {
|
||||
func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
|
||||
readyNodes := 0
|
||||
notReadyNodes := 0
|
||||
for i := range nodeReadyConditions {
|
||||
@@ -1168,7 +1155,7 @@ func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeConditi
|
||||
|
||||
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
|
||||
// that should not be gracefully terminated.
|
||||
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
|
||||
func (nc *Controller) maybeDeleteTerminatingPod(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
|
||||
Reference in New Issue
Block a user