Merge branch 'master' into node_ipam_branch
This commit is contained in:
@@ -144,7 +144,7 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
|
||||
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
|
||||
podControl: controller.RealPodControl{
|
||||
KubeClient: kubeClient,
|
||||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemon-set"}),
|
||||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
|
||||
},
|
||||
crControl: controller.RealControllerRevisionControl{
|
||||
KubeClient: kubeClient,
|
||||
|
@@ -243,7 +243,7 @@ func NewNodeController(
|
||||
}
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
|
||||
glog.V(0).Infof("Sending events to api server.")
|
||||
@@ -406,6 +406,17 @@ func NewNodeController(
|
||||
})
|
||||
}
|
||||
|
||||
// NOTE(resouer): nodeInformer to substitute deprecated taint key (notReady -> not-ready).
|
||||
// Remove this logic when we don't need this backwards compatibility
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
|
||||
return nc.doFixDeprecatedTaintKeyPass(node)
|
||||
}),
|
||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
return nc.doFixDeprecatedTaintKeyPass(newNode)
|
||||
}),
|
||||
})
|
||||
|
||||
nc.nodeLister = nodeInformer.Lister()
|
||||
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
|
||||
|
||||
@@ -444,6 +455,38 @@ func (nc *Controller) doEvictionPass() {
|
||||
}
|
||||
}
|
||||
|
||||
// doFixDeprecatedTaintKeyPass checks and replaces deprecated taint key with proper key name if needed.
|
||||
func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error {
|
||||
taintsToAdd := []*v1.Taint{}
|
||||
taintsToDel := []*v1.Taint{}
|
||||
|
||||
for _, taint := range node.Spec.Taints {
|
||||
if taint.Key == algorithm.DeprecatedTaintNodeNotReady {
|
||||
// delete old taint
|
||||
tDel := taint
|
||||
taintsToDel = append(taintsToDel, &tDel)
|
||||
|
||||
// add right taint
|
||||
tAdd := taint
|
||||
tAdd.Key = algorithm.TaintNodeNotReady
|
||||
taintsToAdd = append(taintsToAdd, &tAdd)
|
||||
|
||||
glog.Warningf("Detected deprecated taint key: %v on node: %v, will substitute it with %v",
|
||||
algorithm.DeprecatedTaintNodeNotReady, node.GetName(), algorithm.TaintNodeNotReady)
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
|
||||
return nil
|
||||
}
|
||||
if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
|
||||
return fmt.Errorf("failed to swap taints of node %+v", node)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
|
||||
// Map node's condition to Taints.
|
||||
taints := []v1.Taint{}
|
||||
|
@@ -2445,3 +2445,109 @@ func TestCheckNodeKubeletVersionParsing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFixDeprecatedTaintKey(t *testing.T) {
|
||||
fakeNow := metav1.Date(2017, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
evictionTimeout := 10 * time.Minute
|
||||
|
||||
fakeNodeHandler := &testutil.FakeNodeHandler{
|
||||
Existing: []*v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node0",
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Labels: map[string]string{
|
||||
kubeletapis.LabelZoneRegion: "region1",
|
||||
kubeletapis.LabelZoneFailureDomain: "zone1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
|
||||
}
|
||||
|
||||
nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout,
|
||||
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod,
|
||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true)
|
||||
nodeController.now = func() metav1.Time { return fakeNow }
|
||||
nodeController.recorder = testutil.NewFakeRecorder()
|
||||
|
||||
deprecatedTaint := &v1.Taint{
|
||||
Key: algorithm.DeprecatedTaintNodeNotReady,
|
||||
Effect: v1.TaintEffectNoExecute,
|
||||
}
|
||||
nodeNotReadyTaint := &v1.Taint{
|
||||
Key: algorithm.TaintNodeNotReady,
|
||||
Effect: v1.TaintEffectNoExecute,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
Name string
|
||||
Node *v1.Node
|
||||
ExpectedTaints []*v1.Taint
|
||||
}{
|
||||
{
|
||||
Name: "Node with deprecated taint key",
|
||||
Node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node0",
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Labels: map[string]string{
|
||||
kubeletapis.LabelZoneRegion: "region1",
|
||||
kubeletapis.LabelZoneFailureDomain: "zone1",
|
||||
},
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Taints: []v1.Taint{
|
||||
*deprecatedTaint,
|
||||
},
|
||||
},
|
||||
},
|
||||
ExpectedTaints: []*v1.Taint{nodeNotReadyTaint},
|
||||
},
|
||||
{
|
||||
Name: "Node with not-ready taint key",
|
||||
Node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node0",
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Labels: map[string]string{
|
||||
kubeletapis.LabelZoneRegion: "region1",
|
||||
kubeletapis.LabelZoneFailureDomain: "zone1",
|
||||
},
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Taints: []v1.Taint{
|
||||
*nodeNotReadyTaint,
|
||||
},
|
||||
},
|
||||
},
|
||||
ExpectedTaints: []*v1.Taint{nodeNotReadyTaint},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
fakeNodeHandler.Update(test.Node)
|
||||
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
nodeController.doFixDeprecatedTaintKeyPass(test.Node)
|
||||
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
node0, err := nodeController.nodeLister.Get("node0")
|
||||
if err != nil {
|
||||
t.Errorf("Can't get current node0...")
|
||||
return
|
||||
}
|
||||
if len(node0.Spec.Taints) != len(test.ExpectedTaints) {
|
||||
t.Errorf("%s: Unexpected number of taints: expected %d, got %d",
|
||||
test.Name, len(test.ExpectedTaints), len(node0.Spec.Taints))
|
||||
}
|
||||
for _, taint := range test.ExpectedTaints {
|
||||
if !taintutils.TaintExists(node0.Spec.Taints, taint) {
|
||||
t.Errorf("%s: Can't find taint %v in %v", test.Name, taint, node0.Spec.Taints)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -152,7 +152,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
|
||||
// communicate with the API server.
|
||||
func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
if c != nil {
|
||||
glog.V(0).Infof("Sending events to api server.")
|
||||
|
@@ -75,7 +75,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
|
||||
}
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route-controller"})
|
||||
|
||||
rc := &RouteController{
|
||||
routes: routes,
|
||||
|
@@ -86,7 +86,7 @@ func NewStatefulSetController(
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset"})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
|
||||
|
||||
ssc := &StatefulSetController{
|
||||
kubeClient: kubeClient,
|
||||
|
@@ -135,7 +135,7 @@ func NewAttachDetachController(
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach"})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
|
||||
|
||||
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
|
||||
adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
|
||||
|
@@ -101,6 +101,10 @@ type DesiredStateOfWorld interface {
|
||||
// GetKeepTerminatedPodVolumesForNode determines if node wants volumes to be
|
||||
// mounted and attached for terminated pods
|
||||
GetKeepTerminatedPodVolumesForNode(k8stypes.NodeName) bool
|
||||
|
||||
// Mark multiattach error as reported to prevent spamming multiple
|
||||
// events for same error
|
||||
SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName)
|
||||
}
|
||||
|
||||
// VolumeToAttach represents a volume that should be attached to a node.
|
||||
@@ -329,6 +333,21 @@ func (dsw *desiredStateOfWorld) VolumeExists(
|
||||
return false
|
||||
}
|
||||
|
||||
func (dsw *desiredStateOfWorld) SetMultiAttachError(
|
||||
volumeName v1.UniqueVolumeName,
|
||||
nodeName k8stypes.NodeName) {
|
||||
dsw.RLock()
|
||||
defer dsw.RUnlock()
|
||||
|
||||
nodeObj, nodeExists := dsw.nodesManaged[nodeName]
|
||||
if nodeExists {
|
||||
if volumeObj, volumeExists := nodeObj.volumesToAttach[volumeName]; volumeExists {
|
||||
volumeObj.multiAttachErrorReported = true
|
||||
dsw.nodesManaged[nodeName].volumesToAttach[volumeName] = volumeObj
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetKeepTerminatedPodVolumesForNode determines if node wants volumes to be
|
||||
// mounted and attached for terminated pods
|
||||
func (dsw *desiredStateOfWorld) GetKeepTerminatedPodVolumesForNode(nodeName k8stypes.NodeName) bool {
|
||||
|
@@ -241,48 +241,7 @@ func (rc *reconciler) reconcile() {
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure volumes that should be attached are attached.
|
||||
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
|
||||
if rc.actualStateOfWorld.VolumeNodeExists(
|
||||
volumeToAttach.VolumeName, volumeToAttach.NodeName) {
|
||||
// Volume/Node exists, touch it to reset detachRequestedTime
|
||||
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
|
||||
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
} else {
|
||||
// Don't even try to start an operation if there is already one running
|
||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
|
||||
glog.V(10).Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
continue
|
||||
}
|
||||
|
||||
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
|
||||
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
|
||||
if len(nodes) > 0 {
|
||||
if !volumeToAttach.MultiAttachErrorReported {
|
||||
simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
|
||||
for _, pod := range volumeToAttach.ScheduledPods {
|
||||
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
|
||||
}
|
||||
volumeToAttach.MultiAttachErrorReported = true
|
||||
glog.Warningf(detailedMsg)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
||||
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
|
||||
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
glog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
|
||||
}
|
||||
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
// Log all other errors.
|
||||
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
rc.attachDesiredVolumes()
|
||||
|
||||
// Update Node Status
|
||||
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
|
||||
@@ -290,3 +249,48 @@ func (rc *reconciler) reconcile() {
|
||||
glog.Warningf("UpdateNodeStatuses failed with: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (rc *reconciler) attachDesiredVolumes() {
|
||||
// Ensure volumes that should be attached are attached.
|
||||
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
|
||||
if rc.actualStateOfWorld.VolumeNodeExists(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
|
||||
// Volume/Node exists, touch it to reset detachRequestedTime
|
||||
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
|
||||
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
continue
|
||||
}
|
||||
// Don't even try to start an operation if there is already one running
|
||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
|
||||
glog.V(10).Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
continue
|
||||
}
|
||||
|
||||
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
|
||||
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
|
||||
if len(nodes) > 0 {
|
||||
if !volumeToAttach.MultiAttachErrorReported {
|
||||
simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
|
||||
for _, pod := range volumeToAttach.ScheduledPods {
|
||||
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
|
||||
}
|
||||
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
glog.Warningf(detailedMsg)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
||||
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
|
||||
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
glog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
|
||||
}
|
||||
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
// Log all other errors.
|
||||
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -455,6 +455,17 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
|
||||
|
||||
nodesForVolume := asw.GetNodesForVolume(generatedVolumeName)
|
||||
|
||||
// check if multiattach is marked
|
||||
// at least one volume should be marked with multiattach error
|
||||
nodeAttachedTo := nodesForVolume[0]
|
||||
for _, volumeToAttach := range dsw.GetVolumesToAttach() {
|
||||
if volumeToAttach.NodeName != nodeAttachedTo {
|
||||
if !volumeToAttach.MultiAttachErrorReported {
|
||||
t.Fatalf("Expected volume %q on node %q to have multiattach error", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Act
|
||||
podToDelete := ""
|
||||
if nodesForVolume[0] == nodeName1 {
|
||||
|
Reference in New Issue
Block a user