daemonset: apply predicated when placing daemon pods
This commit is contained in:
		| @@ -19,10 +19,12 @@ package daemon | |||||||
| import ( | import ( | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"sort" | 	"sort" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/golang/glog" | 	"github.com/golang/glog" | ||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/validation" | ||||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||||
| 	"k8s.io/kubernetes/pkg/client/cache" | 	"k8s.io/kubernetes/pkg/client/cache" | ||||||
| 	"k8s.io/kubernetes/pkg/client/record" | 	"k8s.io/kubernetes/pkg/client/record" | ||||||
| @@ -32,9 +34,11 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/labels" | 	"k8s.io/kubernetes/pkg/labels" | ||||||
| 	"k8s.io/kubernetes/pkg/runtime" | 	"k8s.io/kubernetes/pkg/runtime" | ||||||
| 	"k8s.io/kubernetes/pkg/util" | 	"k8s.io/kubernetes/pkg/util" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/sets" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util/validation/field" | ||||||
| 	"k8s.io/kubernetes/pkg/util/workqueue" | 	"k8s.io/kubernetes/pkg/util/workqueue" | ||||||
| 	"k8s.io/kubernetes/pkg/watch" | 	"k8s.io/kubernetes/pkg/watch" | ||||||
| 	"sync" | 	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -325,7 +329,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) { | |||||||
| 	node := obj.(*api.Node) | 	node := obj.(*api.Node) | ||||||
| 	for i := range dsList.Items { | 	for i := range dsList.Items { | ||||||
| 		ds := &dsList.Items[i] | 		ds := &dsList.Items[i] | ||||||
| 		shouldEnqueue := nodeShouldRunDaemonPod(node, ds) | 		shouldEnqueue := dsc.nodeShouldRunDaemonPod(node, ds) | ||||||
| 		if shouldEnqueue { | 		if shouldEnqueue { | ||||||
| 			dsc.enqueueDaemonSet(ds) | 			dsc.enqueueDaemonSet(ds) | ||||||
| 		} | 		} | ||||||
| @@ -346,7 +350,7 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { | |||||||
| 	} | 	} | ||||||
| 	for i := range dsList.Items { | 	for i := range dsList.Items { | ||||||
| 		ds := &dsList.Items[i] | 		ds := &dsList.Items[i] | ||||||
| 		shouldEnqueue := (nodeShouldRunDaemonPod(oldNode, ds) != nodeShouldRunDaemonPod(curNode, ds)) | 		shouldEnqueue := (dsc.nodeShouldRunDaemonPod(oldNode, ds) != dsc.nodeShouldRunDaemonPod(curNode, ds)) | ||||||
| 		if shouldEnqueue { | 		if shouldEnqueue { | ||||||
| 			dsc.enqueueDaemonSet(ds) | 			dsc.enqueueDaemonSet(ds) | ||||||
| 		} | 		} | ||||||
| @@ -387,7 +391,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { | |||||||
| 	} | 	} | ||||||
| 	var nodesNeedingDaemonPods, podsToDelete []string | 	var nodesNeedingDaemonPods, podsToDelete []string | ||||||
| 	for _, node := range nodeList.Items { | 	for _, node := range nodeList.Items { | ||||||
| 		shouldRun := nodeShouldRunDaemonPod(&node, ds) | 		shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds) | ||||||
|  |  | ||||||
| 		daemonPods, isRunning := nodeToDaemonPods[node.Name] | 		daemonPods, isRunning := nodeToDaemonPods[node.Name] | ||||||
|  |  | ||||||
| @@ -498,7 +502,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) | |||||||
|  |  | ||||||
| 	var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int | 	var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int | ||||||
| 	for _, node := range nodeList.Items { | 	for _, node := range nodeList.Items { | ||||||
| 		shouldRun := nodeShouldRunDaemonPod(&node, ds) | 		shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds) | ||||||
|  |  | ||||||
| 		numDaemonPods := len(nodeToDaemonPods[node.Name]) | 		numDaemonPods := len(nodeToDaemonPods[node.Name]) | ||||||
|  |  | ||||||
| @@ -563,21 +567,50 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { | func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { | ||||||
| 	// Check if the node satisfies the daemon set's node selector. | 	// Check if the node satisfies the daemon set's node selector. | ||||||
| 	nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() | 	nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() | ||||||
| 	shouldRun := nodeSelector.Matches(labels.Set(node.Labels)) | 	if !nodeSelector.Matches(labels.Set(node.Labels)) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	// If the daemon set specifies a node name, check that it matches with node.Name. | 	// If the daemon set specifies a node name, check that it matches with node.Name. | ||||||
| 	shouldRun = shouldRun && (ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) | 	if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	// If the node is not ready, don't run on it. | 	// If the node is not ready, don't run on it. | ||||||
| 	// TODO(mikedanese): remove this once daemonpods forgive nodes | 	// TODO(mikedanese): remove this once daemonpods forgive nodes | ||||||
| 	shouldRun = shouldRun && api.IsNodeReady(node) | 	if !api.IsNodeReady(node) { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// If the node is unschedulable, don't run it | 	for _, c := range node.Status.Conditions { | ||||||
| 	// TODO(mikedanese): remove this once we have the right node admitance levels. | 		if c.Type == api.NodeOutOfDisk && c.Status == api.ConditionTrue { | ||||||
| 	// See https://github.com/kubernetes/kubernetes/issues/17297#issuecomment-156857375. | 			return false | ||||||
| 	shouldRun = shouldRun && !node.Spec.Unschedulable | 		} | ||||||
| 	return shouldRun | 	} | ||||||
|  |  | ||||||
|  | 	newPod := &api.Pod{Spec: ds.Spec.Template.Spec} | ||||||
|  | 	newPod.Spec.NodeName = node.Name | ||||||
|  | 	pods := []*api.Pod{newPod} | ||||||
|  |  | ||||||
|  | 	for _, m := range dsc.podStore.Store.List() { | ||||||
|  | 		pod := m.(*api.Pod) | ||||||
|  | 		if pod.Spec.NodeName != node.Name { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		pods = append(pods, pod) | ||||||
|  | 	} | ||||||
|  | 	_, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, node.Status.Allocatable) | ||||||
|  | 	if len(notFittingCPU)+len(notFittingMemory) != 0 { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	ports := sets.String{} | ||||||
|  | 	for _, pod := range pods { | ||||||
|  | 		if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 { | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return true | ||||||
| } | } | ||||||
|  |  | ||||||
| // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. | // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ import ( | |||||||
| 	"testing" | 	"testing" | ||||||
|  |  | ||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/resource" | ||||||
| 	"k8s.io/kubernetes/pkg/api/testapi" | 	"k8s.io/kubernetes/pkg/api/testapi" | ||||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||||
| @@ -200,17 +201,127 @@ func TestNotReadNodeDaemonDoesNotLaunchPod(t *testing.T) { | |||||||
| 	syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) | 	syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) | ||||||
| } | } | ||||||
|  |  | ||||||
| // DaemonSets should not place onto Unschedulable nodes | // DaemonSets should not place onto OutOfDisk nodes | ||||||
| func TestUnschedulableNodeDaemonDoesNotLaunchPod(t *testing.T) { | func TestOutOfDiskNodeDaemonDoesNotLaunchPod(t *testing.T) { | ||||||
| 	manager, podControl := newTestController() | 	manager, podControl := newTestController() | ||||||
| 	node := newNode("not-ready", nil) | 	node := newNode("not-enough-disk", nil) | ||||||
| 	node.Spec.Unschedulable = true | 	node.Status.Conditions = []api.NodeCondition{{Type: api.NodeOutOfDisk, Status: api.ConditionTrue}} | ||||||
| 	manager.nodeStore.Add(node) | 	manager.nodeStore.Add(node) | ||||||
| 	ds := newDaemonSet("foo") | 	ds := newDaemonSet("foo") | ||||||
| 	manager.dsStore.Add(ds) | 	manager.dsStore.Add(ds) | ||||||
| 	syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) | 	syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // DaemonSets should not place onto nodes with insufficient free resource | ||||||
|  | func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { | ||||||
|  | 	podSpec := api.PodSpec{ | ||||||
|  | 		NodeName: "too-much-mem", | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Resources: api.ResourceRequirements{ | ||||||
|  | 				Requests: api.ResourceList{ | ||||||
|  | 					api.ResourceMemory: resource.MustParse("75M"), | ||||||
|  | 					api.ResourceCPU:    resource.MustParse("75m"), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  | 	manager, podControl := newTestController() | ||||||
|  | 	node := newNode("too-much-mem", nil) | ||||||
|  | 	node.Status.Allocatable = api.ResourceList{ | ||||||
|  | 		api.ResourceMemory: resource.MustParse("100M"), | ||||||
|  | 		api.ResourceCPU:    resource.MustParse("200m"), | ||||||
|  | 	} | ||||||
|  | 	manager.nodeStore.Add(node) | ||||||
|  | 	manager.podStore.Add(&api.Pod{ | ||||||
|  | 		Spec: podSpec, | ||||||
|  | 	}) | ||||||
|  | 	ds := newDaemonSet("foo") | ||||||
|  | 	ds.Spec.Template.Spec = podSpec | ||||||
|  | 	manager.dsStore.Add(ds) | ||||||
|  | 	syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // DaemonSets should place onto nodes with sufficient free resource | ||||||
|  | func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) { | ||||||
|  | 	podSpec := api.PodSpec{ | ||||||
|  | 		NodeName: "not-too-much-mem", | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Resources: api.ResourceRequirements{ | ||||||
|  | 				Requests: api.ResourceList{ | ||||||
|  | 					api.ResourceMemory: resource.MustParse("75M"), | ||||||
|  | 					api.ResourceCPU:    resource.MustParse("75m"), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  | 	manager, podControl := newTestController() | ||||||
|  | 	node := newNode("not-too-much-mem", nil) | ||||||
|  | 	node.Status.Allocatable = api.ResourceList{ | ||||||
|  | 		api.ResourceMemory: resource.MustParse("200M"), | ||||||
|  | 		api.ResourceCPU:    resource.MustParse("200m"), | ||||||
|  | 	} | ||||||
|  | 	manager.nodeStore.Add(node) | ||||||
|  | 	manager.podStore.Add(&api.Pod{ | ||||||
|  | 		Spec: podSpec, | ||||||
|  | 	}) | ||||||
|  | 	ds := newDaemonSet("foo") | ||||||
|  | 	ds.Spec.Template.Spec = podSpec | ||||||
|  | 	manager.dsStore.Add(ds) | ||||||
|  | 	syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // DaemonSets should not place onto nodes that would cause port conflicts | ||||||
|  | func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) { | ||||||
|  | 	podSpec := api.PodSpec{ | ||||||
|  | 		NodeName: "port-conflict", | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Ports: []api.ContainerPort{{ | ||||||
|  | 				HostPort: 666, | ||||||
|  | 			}}, | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  | 	manager, podControl := newTestController() | ||||||
|  | 	node := newNode("port-conflict", nil) | ||||||
|  | 	manager.nodeStore.Add(node) | ||||||
|  | 	manager.podStore.Add(&api.Pod{ | ||||||
|  | 		Spec: podSpec, | ||||||
|  | 	}) | ||||||
|  | 	ds := newDaemonSet("foo") | ||||||
|  | 	ds.Spec.Template.Spec = podSpec | ||||||
|  | 	manager.dsStore.Add(ds) | ||||||
|  | 	syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // DaemonSets should place onto nodes that would not cause port conflicts | ||||||
|  | func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) { | ||||||
|  | 	podSpec1 := api.PodSpec{ | ||||||
|  | 		NodeName: "no-port-conflict", | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Ports: []api.ContainerPort{{ | ||||||
|  | 				HostPort: 6661, | ||||||
|  | 			}}, | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  | 	podSpec2 := api.PodSpec{ | ||||||
|  | 		NodeName: "no-port-conflict", | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Ports: []api.ContainerPort{{ | ||||||
|  | 				HostPort: 6662, | ||||||
|  | 			}}, | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  | 	manager, podControl := newTestController() | ||||||
|  | 	node := newNode("no-port-conflict", nil) | ||||||
|  | 	manager.nodeStore.Add(node) | ||||||
|  | 	manager.podStore.Add(&api.Pod{ | ||||||
|  | 		Spec: podSpec1, | ||||||
|  | 	}) | ||||||
|  | 	ds := newDaemonSet("foo") | ||||||
|  | 	ds.Spec.Template.Spec = podSpec2 | ||||||
|  | 	manager.dsStore.Add(ds) | ||||||
|  | 	syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. | // Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. | ||||||
| func TestDealsWithExistingPods(t *testing.T) { | func TestDealsWithExistingPods(t *testing.T) { | ||||||
| 	manager, podControl := newTestController() | 	manager, podControl := newTestController() | ||||||
|   | |||||||
| @@ -2223,6 +2223,10 @@ func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) { | |||||||
| // new pod. The function returns a boolean value indicating whether the pod | // new pod. The function returns a boolean value indicating whether the pod | ||||||
| // can be admitted, a brief single-word reason and a message explaining why | // can be admitted, a brief single-word reason and a message explaining why | ||||||
| // the pod cannot be admitted. | // the pod cannot be admitted. | ||||||
|  | // | ||||||
|  | // This needs to be kept in sync with the scheduler's and daemonset's fit predicates, | ||||||
|  | // otherwise there will inevitably be pod delete create loops. This will be fixed | ||||||
|  | // once we can extract these predicates into a common library. (#12744) | ||||||
| func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { | func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { | ||||||
| 	if hasHostPortConflicts(pods) { | 	if hasHostPortConflicts(pods) { | ||||||
| 		return false, "HostPortConflict", "cannot start the pod due to host port conflict." | 		return false, "HostPortConflict", "cannot start the pod due to host port conflict." | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Mike Danese
					Mike Danese