Merge pull request #52421 from WIZARD-CXY/fixpredicate

Automatic merge from submit-queue (batch tested with PRs 53780, 55663, 55321, 52421, 55659). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

add hostip and protocol to the hostport predicates

**What this PR does / why we need it**:
This PR adds "hostIP and protocol" to scheduler hostport predicate procedure
**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #
fix #51950 
**Special notes for your reviewer**:
- [x] basic implementation, need review
- [x] e2e test
- [x] update doc (will be done in seperate PR)

**Release note**:

```release-note
add hostIP and protocol to the original hostport predicates procedure in scheduler.
```
This commit is contained in:
Kubernetes Submit Queue
2017-11-15 09:30:36 -08:00
committed by GitHub
12 changed files with 505 additions and 81 deletions

View File

@@ -46,7 +46,7 @@ type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
podPorts map[string]bool
//key is a pod full name with the anti-affinity rules.
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
serviceAffinityInUse bool
@@ -172,7 +172,7 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
}
newPredMeta.podPorts = map[int]bool{}
newPredMeta.podPorts = map[string]bool{}
for k, v := range meta.podPorts {
newPredMeta.podPorts[k] = v
}

View File

@@ -373,7 +373,7 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
Memory: 300,
AllowedPodNumber: 4,
},
podPorts: map[int]bool{1234: true, 456: false},
podPorts: map[string]bool{"1234": true, "456": false},
matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{
"term1": {
{

View File

@@ -890,7 +890,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta algorithm.Predi
// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var wantPorts map[int]bool
var wantPorts map[string]bool
if predicateMeta, ok := meta.(*predicateMetadata); ok {
wantPorts = predicateMeta.podPorts
} else {
@@ -902,11 +902,12 @@ func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s
}
existingPorts := nodeInfo.UsedPorts()
for wport := range wantPorts {
if wport != 0 && existingPorts[wport] {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
// try to see whether existingPorts and wantPorts will conflict or not
if portsConflict(existingPorts, wantPorts) {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
return true, nil, nil
}

View File

@@ -554,10 +554,17 @@ func TestPodFitsHost(t *testing.T) {
}
}
func newPod(host string, hostPorts ...int) *v1.Pod {
func newPod(host string, hostPortInfos ...string) *v1.Pod {
networkPorts := []v1.ContainerPort{}
for _, port := range hostPorts {
networkPorts = append(networkPorts, v1.ContainerPort{HostPort: int32(port)})
for _, portInfo := range hostPortInfos {
hostPortInfo := decode(portInfo)
hostPort, _ := strconv.Atoi(hostPortInfo.hostPort)
networkPorts = append(networkPorts, v1.ContainerPort{
HostIP: hostPortInfo.hostIP,
HostPort: int32(hostPort),
Protocol: v1.Protocol(hostPortInfo.protocol),
})
}
return &v1.Pod{
Spec: v1.PodSpec{
@@ -585,32 +592,88 @@ func TestPodFitsHostPorts(t *testing.T) {
test: "nothing running",
},
{
pod: newPod("m1", 8080),
pod: newPod("m1", "UDP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 9090)),
newPod("m1", "UDP/127.0.0.1/9090")),
fits: true,
test: "other port",
},
{
pod: newPod("m1", 8080),
pod: newPod("m1", "UDP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 8080)),
newPod("m1", "UDP/127.0.0.1/8080")),
fits: false,
test: "same port",
test: "same udp port",
},
{
pod: newPod("m1", 8000, 8080),
pod: newPod("m1", "TCP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 8080)),
newPod("m1", "TCP/127.0.0.1/8080")),
fits: false,
test: "second port",
test: "same tcp port",
},
{
pod: newPod("m1", 8000, 8080),
pod: newPod("m1", "TCP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 8001, 8080)),
newPod("m1", "TCP/127.0.0.2/8080")),
fits: true,
test: "different host ip",
},
{
pod: newPod("m1", "UDP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/127.0.0.1/8080")),
fits: true,
test: "different protocol",
},
{
pod: newPod("m1", "UDP/127.0.0.1/8000", "UDP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "UDP/127.0.0.1/8080")),
fits: false,
test: "second port",
test: "second udp port conflict",
},
{
pod: newPod("m1", "TCP/127.0.0.1/8001", "UDP/127.0.0.1/8080"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/127.0.0.1/8001", "UDP/127.0.0.1/8081")),
fits: false,
test: "first tcp port conflict",
},
{
pod: newPod("m1", "TCP/0.0.0.0/8001"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/127.0.0.1/8001")),
fits: false,
test: "first tcp port conflict due to 0.0.0.0 hostIP",
},
{
pod: newPod("m1", "TCP/10.0.10.10/8001", "TCP/0.0.0.0/8001"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/127.0.0.1/8001")),
fits: false,
test: "TCP hostPort conflict due to 0.0.0.0 hostIP",
},
{
pod: newPod("m1", "TCP/127.0.0.1/8001"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/0.0.0.0/8001")),
fits: false,
test: "second tcp port conflict to 0.0.0.0 hostIP",
},
{
pod: newPod("m1", "UDP/127.0.0.1/8001"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/0.0.0.0/8001")),
fits: true,
test: "second different protocol",
},
{
pod: newPod("m1", "UDP/127.0.0.1/8001"),
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", "TCP/0.0.0.0/8001", "UDP/0.0.0.0/8001")),
fits: false,
test: "UDP hostPort conflict due to 0.0.0.0 hostIP",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}
@@ -631,29 +694,28 @@ func TestPodFitsHostPorts(t *testing.T) {
func TestGetUsedPorts(t *testing.T) {
tests := []struct {
pods []*v1.Pod
ports map[int]bool
pods []*v1.Pod
ports map[string]bool
}{
{
[]*v1.Pod{
newPod("m1", 9090),
newPod("m1", "UDP/127.0.0.1/9090"),
},
map[int]bool{9090: true},
map[string]bool{"UDP/127.0.0.1/9090": true},
},
{
[]*v1.Pod{
newPod("m1", 9090),
newPod("m1", 9091),
newPod("m1", "UDP/127.0.0.1/9090"),
newPod("m1", "UDP/127.0.0.1/9091"),
},
map[int]bool{9090: true, 9091: true},
map[string]bool{"UDP/127.0.0.1/9090": true, "UDP/127.0.0.1/9091": true},
},
{
[]*v1.Pod{
newPod("m1", 9090),
newPod("m2", 9091),
newPod("m1", "TCP/0.0.0.0/9090"),
newPod("m2", "UDP/127.0.0.1/9091"),
},
map[int]bool{9090: true, 9091: true},
map[string]bool{"TCP/0.0.0.0/9090": true, "UDP/127.0.0.1/9091": true},
},
}

View File

@@ -17,9 +17,12 @@ limitations under the License.
package predicates
import (
"strings"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
)
// FindLabelsInSet gets as many key/value pairs as possible out of a label set.
@@ -89,3 +92,69 @@ func GetEquivalencePod(pod *v1.Pod) interface{} {
type EquivalencePod struct {
ControllerRef metav1.OwnerReference
}
type hostPortInfo struct {
protocol string
hostIP string
hostPort string
}
// decode decodes string ("protocol/hostIP/hostPort") to *hostPortInfo object.
func decode(info string) *hostPortInfo {
hostPortInfoSlice := strings.Split(info, "/")
protocol := hostPortInfoSlice[0]
hostIP := hostPortInfoSlice[1]
hostPort := hostPortInfoSlice[2]
return &hostPortInfo{
protocol: protocol,
hostIP: hostIP,
hostPort: hostPort,
}
}
// specialPortConflictCheck detects whether specailHostPort(whose hostIP is 0.0.0.0) is conflict with otherHostPorts.
// return true if we have a conflict.
func specialPortConflictCheck(specialHostPort string, otherHostPorts map[string]bool) bool {
specialHostPortInfo := decode(specialHostPort)
if specialHostPortInfo.hostIP == schedutil.DefaultBindAllHostIP {
// loop through all the otherHostPorts to see if there exists a conflict
for hostPortItem := range otherHostPorts {
hostPortInfo := decode(hostPortItem)
// if there exists one hostPortItem which has the same hostPort and protocol with the specialHostPort, that will cause a conflict
if specialHostPortInfo.hostPort == hostPortInfo.hostPort && specialHostPortInfo.protocol == hostPortInfo.protocol {
return true
}
}
}
return false
}
// portsConflict check whether existingPorts and wantPorts conflict with each other
// return true if we have a conflict
func portsConflict(existingPorts, wantPorts map[string]bool) bool {
for existingPort := range existingPorts {
if specialPortConflictCheck(existingPort, wantPorts) {
return true
}
}
for wantPort := range wantPorts {
if specialPortConflictCheck(wantPort, existingPorts) {
return true
}
// general check hostPort conflict procedure for hostIP is not 0.0.0.0
if existingPorts[wantPort] {
return true
}
}
return false
}

View File

@@ -18,6 +18,8 @@ package predicates
import (
"fmt"
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -68,3 +70,194 @@ func ExampleFindLabelsInSet() {
// label1=value1,label2=value2,label3=will_see_this
// pod1,pod2,
}
func Test_decode(t *testing.T) {
tests := []struct {
name string
args string
want *hostPortInfo
}{
{
name: "test1",
args: "UDP/127.0.0.1/80",
want: &hostPortInfo{
protocol: "UDP",
hostIP: "127.0.0.1",
hostPort: "80",
},
},
{
name: "test2",
args: "TCP/127.0.0.1/80",
want: &hostPortInfo{
protocol: "TCP",
hostIP: "127.0.0.1",
hostPort: "80",
},
},
{
name: "test3",
args: "TCP/0.0.0.0/80",
want: &hostPortInfo{
protocol: "TCP",
hostIP: "0.0.0.0",
hostPort: "80",
},
},
}
for _, tt := range tests {
if got := decode(tt.args); !reflect.DeepEqual(got, tt.want) {
t.Errorf("test name = %v, decode() = %v, want %v", tt.name, got, tt.want)
}
}
}
func Test_specialPortConflictCheck(t *testing.T) {
type args struct {
specialHostPort string
otherHostPorts map[string]bool
}
tests := []struct {
name string
args args
want bool
}{
{
name: "test-1",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"TCP/127.0.0.2/8080": true,
"TCP/127.0.0.1/80": true,
"UDP/127.0.0.2/8080": true,
},
},
want: true,
},
{
name: "test-2",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"TCP/127.0.0.2/8080": true,
"UDP/127.0.0.1/80": true,
"UDP/127.0.0.2/8080": true,
},
},
want: false,
},
{
name: "test-3",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"TCP/127.0.0.2/8080": true,
"TCP/127.0.0.1/8090": true,
"UDP/127.0.0.2/8080": true,
},
},
want: false,
},
{
name: "test-4",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"UDP/127.0.0.2/8080": true,
"UDP/127.0.0.1/8090": true,
"TCP/127.0.0.2/8080": true,
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := specialPortConflictCheck(tt.args.specialHostPort, tt.args.otherHostPorts); got != tt.want {
t.Errorf("specialPortConflictCheck() = %v, want %v", got, tt.want)
}
})
}
}
func Test_portsConflict(t *testing.T) {
type args struct {
existingPorts map[string]bool
wantPorts map[string]bool
}
tests := []struct {
name string
args args
want bool
}{
{
name: "test1",
args: args{
existingPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
wantPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
},
want: true,
},
{
name: "test2",
args: args{
existingPorts: map[string]bool{
"UDP/127.0.0.2/8080": true,
},
wantPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
},
want: false,
},
{
name: "test3",
args: args{
existingPorts: map[string]bool{
"TCP/127.0.0.1/8080": true,
},
wantPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
},
want: false,
},
{
name: "test4",
args: args{
existingPorts: map[string]bool{
"TCP/0.0.0.0/8080": true,
},
wantPorts: map[string]bool{
"TCP/127.0.0.1/8080": true,
},
},
want: true,
},
{
name: "test5",
args: args{
existingPorts: map[string]bool{
"TCP/127.0.0.1/8080": true,
},
wantPorts: map[string]bool{
"TCP/0.0.0.0/8080": true,
},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := portsConflict(tt.args.existingPorts, tt.args.wantPorts); got != tt.want {
t.Errorf("portsConflict() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -330,7 +330,6 @@ func (sched *Scheduler) scheduleOne() {
if err != nil {
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
err := sched.bind(&assumedPod, &v1.Binding{

View File

@@ -256,7 +256,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
case <-waitPodExpireChan:
case <-time.After(wait.ForeverTestTimeout):
close(timeout)
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
}
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
@@ -273,7 +273,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
}
}
@@ -307,7 +307,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
t.Errorf("err want=%v, get=%v", expectErr, err)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
}
// We mimic the workflow of cache behavior when a pod is removed by user.
@@ -334,7 +334,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
}
}

View File

@@ -49,12 +49,12 @@ func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *No
func TestAssumePodScheduled(t *testing.T) {
nodeName := "node"
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostPort: 8080}}),
makeBasePod(t, nodeName, "test-nonzero", "", "", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test", "100m", "500", "oir-foo:3", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "oir-foo:5", []v1.ContainerPort{{HostPort: 8080}}),
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-nonzero", "", "", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test", "100m", "500", "oir-foo:3", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "oir-foo:5", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test", "100m", "500", "random-invalid-oir-key:100", []v1.ContainerPort{{}}),
}
@@ -75,7 +75,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}, {
pods: []*v1.Pod{testPods[1], testPods[2]},
@@ -90,7 +90,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1], testPods[2]},
usedPorts: map[int]bool{80: true, 8080: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": true},
},
}, { // test non-zero request
pods: []*v1.Pod{testPods[3]},
@@ -105,7 +105,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[3]},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}, {
pods: []*v1.Pod{testPods[4]},
@@ -121,7 +121,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[4]},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}, {
pods: []*v1.Pod{testPods[4], testPods[5]},
@@ -137,7 +137,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[4], testPods[5]},
usedPorts: map[int]bool{80: true, 8080: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": true},
},
}, {
pods: []*v1.Pod{testPods[6]},
@@ -152,7 +152,7 @@ func TestAssumePodScheduled(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[6]},
usedPorts: map[int]bool{},
usedPorts: map[string]bool{},
},
},
}
@@ -195,8 +195,8 @@ func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time
func TestExpirePod(t *testing.T) {
nodeName := "node"
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostPort: 8080}}),
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
}
now := time.Now()
ttl := 10 * time.Second
@@ -228,7 +228,7 @@ func TestExpirePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
},
}}
@@ -255,8 +255,8 @@ func TestAddPodWillConfirm(t *testing.T) {
ttl := 10 * time.Second
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostPort: 8080}}),
makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
}
tests := []struct {
podsToAssume []*v1.Pod
@@ -277,7 +277,7 @@ func TestAddPodWillConfirm(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}}
@@ -332,7 +332,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{updatedPod.DeepCopy()},
usedPorts: map[int]bool{90: true},
usedPorts: map[string]bool{"TCP/0.0.0.0/90": true},
},
},
}}
@@ -366,7 +366,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
func TestAddPodAfterExpiration(t *testing.T) {
nodeName := "node"
ttl := 10 * time.Second
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct {
pod *v1.Pod
@@ -384,7 +384,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}}
@@ -414,8 +414,8 @@ func TestUpdatePod(t *testing.T) {
nodeName := "node"
ttl := 10 * time.Second
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostPort: 8080}}),
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
}
tests := []struct {
podsToAssume []*v1.Pod
@@ -437,7 +437,7 @@ func TestUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
}, {
requestedResource: &Resource{
MilliCPU: 100,
@@ -449,7 +449,7 @@ func TestUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
}},
}}
@@ -480,8 +480,8 @@ func TestExpireAddUpdatePod(t *testing.T) {
nodeName := "node"
ttl := 10 * time.Second
testPods := []*v1.Pod{
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}),
makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostPort: 8080}}),
makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}),
makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}),
}
tests := []struct {
podsToAssume []*v1.Pod
@@ -504,7 +504,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
}, {
requestedResource: &Resource{
MilliCPU: 100,
@@ -516,7 +516,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
}},
}}
@@ -553,7 +553,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
// TestRemovePod tests after added pod is removed, its information should also be subtracted.
func TestRemovePod(t *testing.T) {
nodeName := "node"
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct {
pod *v1.Pod
wNodeInfo *NodeInfo
@@ -570,7 +570,7 @@ func TestRemovePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: map[int]bool{80: true},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}}
@@ -595,7 +595,7 @@ func TestRemovePod(t *testing.T) {
func TestForgetPod(t *testing.T) {
nodeName := "node"
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct {
pods []*v1.Pod
}{{

View File

@@ -38,7 +38,7 @@ type NodeInfo struct {
pods []*v1.Pod
podsWithAffinity []*v1.Pod
usedPorts map[int]bool
usedPorts map[string]bool
// Total requested resource of all pods on this node.
// It includes assumed pods which scheduler sends binding to apiserver but
@@ -164,7 +164,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
nonzeroRequest: &Resource{},
allocatableResource: &Resource{},
generation: 0,
usedPorts: make(map[int]bool),
usedPorts: make(map[string]bool),
}
for _, pod := range pods {
ni.AddPod(pod)
@@ -188,7 +188,7 @@ func (n *NodeInfo) Pods() []*v1.Pod {
return n.pods
}
func (n *NodeInfo) UsedPorts() map[int]bool {
func (n *NodeInfo) UsedPorts() map[string]bool {
if n == nil {
return nil
}
@@ -269,7 +269,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
taintsErr: n.taintsErr,
memoryPressureCondition: n.memoryPressureCondition,
diskPressureCondition: n.diskPressureCondition,
usedPorts: make(map[int]bool),
usedPorts: make(map[string]bool),
generation: n.generation,
}
if len(n.pods) > 0 {
@@ -408,11 +408,26 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) {
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
if used {
n.usedPorts[int(podPort.HostPort)] = used
} else {
delete(n.usedPorts, int(podPort.HostPort))
// user does not explicitly set protocol, default is tcp
portProtocol := podPort.Protocol
if podPort.Protocol == "" {
portProtocol = v1.ProtocolTCP
}
// user does not explicitly set hostIP, default is 0.0.0.0
portHostIP := podPort.HostIP
if podPort.HostIP == "" {
portHostIP = util.DefaultBindAllHostIP
}
str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort)
if used {
n.usedPorts[str] = used
} else {
delete(n.usedPorts, str)
}
}
}
}

View File

@@ -17,16 +17,19 @@ limitations under the License.
package util
import (
"fmt"
"sort"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/scheduling"
)
const DefaultBindAllHostIP = "0.0.0.0"
// GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
// will be in the result; but it does not resolve port conflict.
func GetUsedPorts(pods ...*v1.Pod) map[int]bool {
ports := make(map[int]bool)
func GetUsedPorts(pods ...*v1.Pod) map[string]bool {
ports := make(map[string]bool)
for _, pod := range pods {
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]
@@ -35,7 +38,20 @@ func GetUsedPorts(pods ...*v1.Pod) map[int]bool {
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
ports[int(podPort.HostPort)] = true
// user does not explicitly set protocol, default is tcp
portProtocol := podPort.Protocol
if podPort.Protocol == "" {
portProtocol = v1.ProtocolTCP
}
// user does not explicitly set hostIP, default is 0.0.0.0
portHostIP := podPort.HostIP
if podPort.HostIP == "" {
portHostIP = "0.0.0.0"
}
str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort)
ports[str] = true
}
}
}

View File

@@ -592,6 +592,54 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
WaitForSchedulerAfterAction(f, removeTaintFromNodeAction(cs, nodeName, testTaint), podNameNoTolerations, true)
verifyResult(cs, 1, 0, ns)
})
It("validates that there is no conflict between pods with same hostPort but different hostIP and protocol", func() {
nodeName := GetNodeThatCanRunPod(f)
// use nodeSelector to make sure the testing pods get assigned on the same node to explicitly verify there exists conflict or not
By("Trying to apply a random label on the found node.")
k := fmt.Sprintf("kubernetes.io/e2e-%s", string(uuid.NewUUID()))
v := "90"
nodeSelector := make(map[string]string)
nodeSelector[k] = v
framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
defer framework.RemoveLabelOffNode(cs, nodeName, k)
By("Trying to create a pod(pod1) with hostport 80 and hostIP 127.0.0.1 and expect scheduled")
creatHostPortPodOnNode(f, "pod1", ns, "127.0.0.1", v1.ProtocolTCP, nodeSelector, true)
By("Trying to create another pod(pod2) with hostport 80 but hostIP 127.0.0.2 on the node which pod1 resides and expect scheduled")
creatHostPortPodOnNode(f, "pod2", ns, "127.0.0.2", v1.ProtocolTCP, nodeSelector, true)
By("Trying to create a third pod(pod3) with hostport 80, hostIP 127.0.0.2 but use UDP protocol on the node which pod2 resides")
creatHostPortPodOnNode(f, "pod3", ns, "127.0.0.2", v1.ProtocolUDP, nodeSelector, true)
})
It("validates that there exists conflict between pods with same hostPort and protocol but one using 0.0.0.0 hostIP", func() {
nodeName := GetNodeThatCanRunPod(f)
// use nodeSelector to make sure the testing pods get assigned on the same node to explicitly verify there exists conflict or not
By("Trying to apply a random label on the found node.")
k := fmt.Sprintf("kubernetes.io/e2e-%s", string(uuid.NewUUID()))
v := "95"
nodeSelector := make(map[string]string)
nodeSelector[k] = v
framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
defer framework.RemoveLabelOffNode(cs, nodeName, k)
By("Trying to create a pod(pod4) with hostport 80 and hostIP 0.0.0.0(empty string here) and expect scheduled")
creatHostPortPodOnNode(f, "pod4", ns, "", v1.ProtocolTCP, nodeSelector, true)
By("Trying to create another pod(pod5) with hostport 80 but hostIP 127.0.0.1 on the node which pod4 resides and expect not scheduled")
creatHostPortPodOnNode(f, "pod5", ns, "127.0.0.1", v1.ProtocolTCP, nodeSelector, false)
})
})
func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
@@ -782,3 +830,24 @@ func CreateHostPortPods(f *framework.Framework, id string, replicas int, expectR
framework.ExpectNoError(err)
}
}
// create pod which using hostport on the specified node according to the nodeSelector
func creatHostPortPodOnNode(f *framework.Framework, podName, ns, hostIP string, protocol v1.Protocol, nodeSelector map[string]string, expectScheduled bool) {
createPausePod(f, pausePodConfig{
Name: podName,
Ports: []v1.ContainerPort{
{
HostPort: 80,
ContainerPort: 80,
Protocol: protocol,
HostIP: hostIP,
},
},
NodeSelector: nodeSelector,
})
err := framework.WaitForPodNotPending(f.ClientSet, ns, podName)
if expectScheduled {
framework.ExpectNoError(err)
}
}