Merge pull request #111344 from aojea/kproxy_node_cidr
kube-proxy react on Node PodCIDR changes
This commit is contained in:
81
pkg/proxy/node.go
Normal file
81
pkg/proxy/node.go
Normal file
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/proxy/config"
|
||||
)
|
||||
|
||||
// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned
|
||||
// Implements the config.NodeHandler interface
|
||||
// https://issues.k8s.io/111321
|
||||
type NodePodCIDRHandler struct {
|
||||
mu sync.Mutex
|
||||
podCIDRs []string
|
||||
}
|
||||
|
||||
var _ config.NodeHandler = &NodePodCIDRHandler{}
|
||||
|
||||
// OnNodeAdd is a handler for Node creates.
|
||||
func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
podCIDRs := node.Spec.PodCIDRs
|
||||
// initialize podCIDRs
|
||||
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
|
||||
klog.InfoS("Setting current PodCIDRs", "PodCIDRs", podCIDRs)
|
||||
n.podCIDRs = podCIDRs
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
|
||||
klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
|
||||
"node", klog.KObj(node), "New Node PodCIDRs", podCIDRs, "Old Node UID", n.podCIDRs)
|
||||
panic("Current Node PodCIDRs are different than previous PodCIDRs, restarting")
|
||||
}
|
||||
}
|
||||
|
||||
// OnNodeUpdate is a handler for Node updates.
|
||||
func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
podCIDRs := node.Spec.PodCIDRs
|
||||
// initialize podCIDRs
|
||||
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
|
||||
klog.InfoS("Setting current PodCIDRs", "PodCIDRs", podCIDRs)
|
||||
n.podCIDRs = podCIDRs
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
|
||||
klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
|
||||
"node", klog.KObj(node), "New Node PodCIDRs", podCIDRs, "Old Node UID", n.podCIDRs)
|
||||
panic("Current Node PodCIDRs are different than previous PodCIDRs, restarting")
|
||||
}
|
||||
}
|
||||
|
||||
// OnNodeDelete is a handler for Node deletes.
|
||||
func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) {
|
||||
klog.ErrorS(nil, "Current Node is being deleted", "node", klog.KObj(node))
|
||||
}
|
||||
|
||||
// OnNodeSynced is a handler for Node syncs.
|
||||
func (n *NodePodCIDRHandler) OnNodeSynced() {}
|
126
pkg/proxy/node_test.go
Normal file
126
pkg/proxy/node_test.go
Normal file
@@ -0,0 +1,126 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestNodePodCIDRHandlerAdd(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
oldNodePodCIDRs []string
|
||||
newNodePodCIDRs []string
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "both empty",
|
||||
},
|
||||
{
|
||||
name: "initialized correctly",
|
||||
newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
|
||||
},
|
||||
{
|
||||
name: "already initialized and different node",
|
||||
oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
|
||||
newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"},
|
||||
expectPanic: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
n := &NodePodCIDRHandler{
|
||||
podCIDRs: tt.oldNodePodCIDRs,
|
||||
}
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-node",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
PodCIDRs: tt.newNodePodCIDRs,
|
||||
},
|
||||
}
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r == nil && tt.expectPanic {
|
||||
t.Errorf("The code did not panic")
|
||||
} else if r != nil && !tt.expectPanic {
|
||||
t.Errorf("The code did panic")
|
||||
}
|
||||
}()
|
||||
n.OnNodeAdd(node)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePodCIDRHandlerUpdate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
oldNodePodCIDRs []string
|
||||
newNodePodCIDRs []string
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "both empty",
|
||||
},
|
||||
{
|
||||
name: "initialize",
|
||||
newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
|
||||
},
|
||||
{
|
||||
name: "same node",
|
||||
oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
|
||||
newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
|
||||
},
|
||||
{
|
||||
name: "different nodes",
|
||||
oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
|
||||
newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"},
|
||||
expectPanic: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
n := &NodePodCIDRHandler{
|
||||
podCIDRs: tt.oldNodePodCIDRs,
|
||||
}
|
||||
oldNode := &v1.Node{}
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-node",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
PodCIDRs: tt.newNodePodCIDRs,
|
||||
},
|
||||
}
|
||||
defer func() {
|
||||
r := recover()
|
||||
if r == nil && tt.expectPanic {
|
||||
t.Errorf("The code did not panic")
|
||||
} else if r != nil && !tt.expectPanic {
|
||||
t.Errorf("The code did panic")
|
||||
}
|
||||
}()
|
||||
n.OnNodeUpdate(oldNode, node)
|
||||
})
|
||||
}
|
||||
}
|
@@ -145,6 +145,9 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
|
||||
// * All of the endpoints for this Service have a topology hint
|
||||
// * At least one endpoint for this Service is hinted for this node's zone.
|
||||
func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
|
||||
return false
|
||||
}
|
||||
hintsAnnotation := svcInfo.HintsAnnotation()
|
||||
if hintsAnnotation != "Auto" && hintsAnnotation != "auto" {
|
||||
if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" {
|
||||
|
@@ -91,6 +91,20 @@ func TestCategorizeEndpoints(t *testing.T) {
|
||||
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
|
||||
localEndpoints: nil,
|
||||
}, {
|
||||
name: "hints disabled, hints annotation == auto",
|
||||
hintsEnabled: false,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
|
||||
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
|
||||
},
|
||||
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
|
||||
localEndpoints: nil,
|
||||
}, {
|
||||
|
||||
name: "hints, hints annotation == aUto (wrong capitalization), hints ignored",
|
||||
hintsEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
|
Reference in New Issue
Block a user