Merge pull request #122979 from fatsheep9146/pkg-proxy-support-contextual-logging

Migrate `pkg/proxy` to contextual logging: Part 1
This commit is contained in:
Kubernetes Prow Robot
2024-04-23 02:43:04 -07:00
committed by GitHub
19 changed files with 542 additions and 420 deletions

View File

@@ -30,10 +30,12 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
klogtesting "k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
service1v1 := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}}
@@ -56,7 +58,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
serviceConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
serviceConfig := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
serviceConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
go serviceConfig.Run(stopCh)
@@ -83,6 +85,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
}
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
endpoints1v1 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
AddressType: discoveryv1.AddressTypeIPv4,
@@ -136,7 +139,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
endpointsliceConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
endpointsliceConfig := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
endpointsliceConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
go endpointsliceConfig.Run(stopCh)
@@ -163,6 +166,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
}
func TestInitialSync(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
svc1 := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
@@ -191,11 +195,11 @@ func TestInitialSync(t *testing.T) {
client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1)
sharedInformers := informers.NewSharedInformerFactory(client, 0)
svcConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), 0)
svcConfig := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), 0)
svcHandler := NewServiceHandlerMock()
svcConfig.RegisterEventHandler(svcHandler)
epsConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), 0)
epsConfig := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), 0)
epsHandler := NewEndpointSliceHandlerMock()
epsConfig.RegisterEventHandler(epsHandler)

View File

@@ -17,6 +17,7 @@ limitations under the License.
package config
import (
"context"
"fmt"
"sync"
"time"
@@ -71,12 +72,14 @@ type EndpointSliceHandler interface {
type EndpointSliceConfig struct {
listerSynced cache.InformerSynced
eventHandlers []EndpointSliceHandler
logger klog.Logger
}
// NewEndpointSliceConfig creates a new EndpointSliceConfig.
func NewEndpointSliceConfig(endpointSliceInformer discoveryv1informers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
func NewEndpointSliceConfig(ctx context.Context, endpointSliceInformer discoveryv1informers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
result := &EndpointSliceConfig{
listerSynced: endpointSliceInformer.Informer().HasSynced,
logger: klog.FromContext(ctx),
}
_, _ = endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -98,14 +101,14 @@ func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler)
// Run waits for cache synced and invokes handlers after syncing.
func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting endpoint slice config controller")
c.logger.Info("Starting endpoint slice config controller")
if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
return
}
for _, h := range c.eventHandlers {
klog.V(3).InfoS("Calling handler.OnEndpointSlicesSynced()")
c.logger.V(3).Info("Calling handler.OnEndpointSlicesSynced()")
h.OnEndpointSlicesSynced()
}
}
@@ -117,7 +120,7 @@ func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
return
}
for _, h := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnEndpointSliceAdd", "endpoints", klog.KObj(endpointSlice))
c.logger.V(4).Info("Calling handler.OnEndpointSliceAdd", "endpoints", klog.KObj(endpointSlice))
h.OnEndpointSliceAdd(endpointSlice)
}
}
@@ -134,7 +137,7 @@ func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface
return
}
for _, h := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnEndpointSliceUpdate")
c.logger.V(4).Info("Calling handler.OnEndpointSliceUpdate")
h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
}
}
@@ -153,7 +156,7 @@ func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
}
}
for _, h := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnEndpointsDelete")
c.logger.V(4).Info("Calling handler.OnEndpointsDelete")
h.OnEndpointSliceDelete(endpointSlice)
}
}
@@ -162,12 +165,14 @@ func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
type ServiceConfig struct {
listerSynced cache.InformerSynced
eventHandlers []ServiceHandler
logger klog.Logger
}
// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer v1informers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
func NewServiceConfig(ctx context.Context, serviceInformer v1informers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{
listerSynced: serviceInformer.Informer().HasSynced,
logger: klog.FromContext(ctx),
}
_, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -189,14 +194,14 @@ func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
// Run waits for cache synced and invokes handlers after syncing.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting service config controller")
c.logger.Info("Starting service config controller")
if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
return
}
for i := range c.eventHandlers {
klog.V(3).InfoS("Calling handler.OnServiceSynced()")
c.logger.V(3).Info("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}
}
@@ -208,7 +213,7 @@ func (c *ServiceConfig) handleAddService(obj interface{}) {
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnServiceAdd")
c.logger.V(4).Info("Calling handler.OnServiceAdd")
c.eventHandlers[i].OnServiceAdd(service)
}
}
@@ -225,7 +230,7 @@ func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnServiceUpdate")
c.logger.V(4).Info("Calling handler.OnServiceUpdate")
c.eventHandlers[i].OnServiceUpdate(oldService, service)
}
}
@@ -244,7 +249,7 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) {
}
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnServiceDelete")
c.logger.V(4).Info("Calling handler.OnServiceDelete")
c.eventHandlers[i].OnServiceDelete(service)
}
}
@@ -289,12 +294,14 @@ var _ NodeHandler = &NoopNodeHandler{}
type NodeConfig struct {
listerSynced cache.InformerSynced
eventHandlers []NodeHandler
logger klog.Logger
}
// NewNodeConfig creates a new NodeConfig.
func NewNodeConfig(nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
func NewNodeConfig(ctx context.Context, nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
result := &NodeConfig{
listerSynced: nodeInformer.Informer().HasSynced,
logger: klog.FromContext(ctx),
}
_, _ = nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -316,14 +323,14 @@ func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
// Run starts the goroutine responsible for calling registered handlers.
func (c *NodeConfig) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting node config controller")
c.logger.Info("Starting node config controller")
if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
return
}
for i := range c.eventHandlers {
klog.V(3).InfoS("Calling handler.OnNodeSynced()")
c.logger.V(3).Info("Calling handler.OnNodeSynced()")
c.eventHandlers[i].OnNodeSynced()
}
}
@@ -335,7 +342,7 @@ func (c *NodeConfig) handleAddNode(obj interface{}) {
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnNodeAdd")
c.logger.V(4).Info("Calling handler.OnNodeAdd")
c.eventHandlers[i].OnNodeAdd(node)
}
}
@@ -352,7 +359,7 @@ func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
return
}
for i := range c.eventHandlers {
klog.V(5).InfoS("Calling handler.OnNodeUpdate")
c.logger.V(5).Info("Calling handler.OnNodeUpdate")
c.eventHandlers[i].OnNodeUpdate(oldNode, node)
}
}
@@ -371,7 +378,7 @@ func (c *NodeConfig) handleDeleteNode(obj interface{}) {
}
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnNodeDelete")
c.logger.V(4).Info("Calling handler.OnNodeDelete")
c.eventHandlers[i].OnNodeDelete(node)
}
}
@@ -390,13 +397,15 @@ type ServiceCIDRConfig struct {
eventHandlers []ServiceCIDRHandler
mu sync.Mutex
cidrs sets.Set[string]
logger klog.Logger
}
// NewServiceCIDRConfig creates a new ServiceCIDRConfig.
func NewServiceCIDRConfig(serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer, resyncPeriod time.Duration) *ServiceCIDRConfig {
func NewServiceCIDRConfig(ctx context.Context, serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer, resyncPeriod time.Duration) *ServiceCIDRConfig {
result := &ServiceCIDRConfig{
listerSynced: serviceCIDRInformer.Informer().HasSynced,
cidrs: sets.New[string](),
logger: klog.FromContext(ctx),
}
_, _ = serviceCIDRInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -423,7 +432,7 @@ func (c *ServiceCIDRConfig) RegisterEventHandler(handler ServiceCIDRHandler) {
// Run waits for cache synced and invokes handlers after syncing.
func (c *ServiceCIDRConfig) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting serviceCIDR config controller")
c.logger.Info("Starting serviceCIDR config controller")
if !cache.WaitForNamedCacheSync("serviceCIDR config", stopCh, c.listerSynced) {
return
@@ -465,7 +474,7 @@ func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) {
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnServiceCIDRsChanged")
c.logger.V(4).Info("Calling handler.OnServiceCIDRsChanged")
c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList())
}
}

View File

@@ -32,6 +32,7 @@ import (
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
klogtesting "k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
@@ -226,6 +227,7 @@ func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expected
}
func TestNewServiceAddedAndNotified(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
@@ -235,7 +237,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
handler := NewServiceHandlerMock()
config.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
@@ -250,6 +252,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
}
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
@@ -259,7 +262,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
handler := NewServiceHandlerMock()
config.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
@@ -286,6 +289,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
}
func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
@@ -295,7 +299,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
config := NewServiceConfig(ctx, sharedInformers.Core().V1().Services(), time.Minute)
handler := NewServiceHandlerMock()
handler2 := NewServiceHandlerMock()
config.RegisterEventHandler(handler)
@@ -320,6 +324,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
}
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
@@ -329,7 +334,7 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
config := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
handler := NewEndpointSliceHandlerMock()
handler2 := NewEndpointSliceHandlerMock()
config.RegisterEventHandler(handler)
@@ -366,6 +371,7 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
}
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
_, ctx := klogtesting.NewTestContext(t)
client := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
@@ -375,7 +381,7 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
config := NewEndpointSliceConfig(ctx, sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
handler := NewEndpointSliceHandlerMock()
handler2 := NewEndpointSliceHandlerMock()
config.RegisterEventHandler(handler)

View File

@@ -25,6 +25,7 @@ package iptables
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base32"
"fmt"
@@ -97,6 +98,7 @@ const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal
// NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
func NewDualStackProxier(
ctx context.Context,
ipt [2]utiliptables.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
@@ -114,14 +116,14 @@ func NewDualStackProxier(
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
ipv4Proxier, err := NewProxier(ctx, v1.IPv4Protocol, ipt[0], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl,
ipv6Proxier, err := NewProxier(ctx, v1.IPv6Protocol, ipt[1], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname,
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
@@ -205,6 +207,8 @@ type Proxier struct {
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer proxyutil.NetworkInterfacer
logger klog.Logger
}
// Proxier implements proxy.Provider
@@ -215,7 +219,8 @@ var _ proxy.Provider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipFamily v1.IPFamily,
func NewProxier(ctx context.Context,
ipFamily v1.IPFamily,
ipt utiliptables.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
@@ -232,6 +237,7 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddressStrings []string,
initOnly bool,
) (*Proxier, error) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
if !nodePortAddresses.ContainsIPv4Loopback() {
@@ -240,7 +246,7 @@ func NewProxier(ipFamily v1.IPFamily,
if localhostNodePorts {
// Set the route_localnet sysctl we need for exposing NodePorts on loopback addresses
// Refer to https://issues.k8s.io/90259
klog.InfoS("Setting route_localnet=1 to allow node-ports on localhost; to change this either disable iptables.localhostNodePorts (--iptables-localhost-nodeports) or set nodePortAddresses (--nodeport-addresses) to filter loopback addresses")
logger.Info("Setting route_localnet=1 to allow node-ports on localhost; to change this either disable iptables.localhostNodePorts (--iptables-localhost-nodeports) or set nodePortAddresses (--nodeport-addresses) to filter loopback addresses")
if err := proxyutil.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
return nil, err
}
@@ -252,18 +258,18 @@ func NewProxier(ipFamily v1.IPFamily,
conntrackTCPLiberal := false
if val, err := sysctl.GetSysctl(sysctlNFConntrackTCPBeLiberal); err == nil && val != 0 {
conntrackTCPLiberal = true
klog.InfoS("nf_conntrack_tcp_be_liberal set, not installing DROP rules for INVALID packets")
logger.Info("nf_conntrack_tcp_be_liberal set, not installing DROP rules for INVALID packets")
}
if initOnly {
klog.InfoS("System initialized and --init-only specified")
logger.Info("System initialized and --init-only specified")
return nil, nil
}
// Generate the masquerade mark to use for SNAT rules.
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
@@ -296,10 +302,11 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal,
logger: logger,
}
burstSyncs := 2
klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
logger.V(2).Info("Iptables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
@@ -309,9 +316,9 @@ func NewProxier(ipFamily v1.IPFamily,
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)
if ipt.HasRandomFully() {
klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
logger.V(2).Info("Iptables supports --random-fully")
} else {
klog.V(2).InfoS("Iptables does not support --random-fully", "ipFamily", ipt.Protocol())
logger.V(2).Info("Iptables does not support --random-fully")
}
return proxier, nil
@@ -396,7 +403,8 @@ var iptablesCleanupOnlyChains = []iptablesJumpChain{}
// CleanupLeftovers removes all iptables rules and chains created by the Proxier
// It returns true if an error was encountered. Errors are logged.
func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
func CleanupLeftovers(ctx context.Context, ipt utiliptables.Interface) (encounteredError bool) {
logger := klog.FromContext(ctx)
// Unlink our chains
for _, jump := range append(iptablesJumpChains, iptablesCleanupOnlyChains...) {
args := append(jump.extraArgs,
@@ -405,7 +413,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
)
if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.ErrorS(err, "Error removing pure-iptables proxy rule")
logger.Error(err, "Error removing pure-iptables proxy rule")
encounteredError = true
}
}
@@ -414,7 +422,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Flush and remove all of our "-t nat" chains.
iptablesData := bytes.NewBuffer(nil)
if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
logger.Error(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
encounteredError = true
} else {
existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
@@ -442,7 +450,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Write it.
err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT)
logger.Error(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT)
metrics.IptablesRestoreFailuresTotal.Inc()
encounteredError = true
}
@@ -451,7 +459,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Flush and remove all of our "-t filter" chains.
iptablesData.Reset()
if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
logger.Error(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
encounteredError = true
} else {
existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
@@ -469,7 +477,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
// Write it.
if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter)
logger.Error(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter)
metrics.IptablesRestoreFailuresTotal.Inc()
encounteredError = true
}
@@ -605,7 +613,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -621,7 +629,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
@@ -630,7 +638,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -646,7 +654,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
@@ -655,7 +663,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -779,7 +787,7 @@ func (proxier *Proxier) syncProxyRules() {
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master")
proxier.logger.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}
@@ -796,18 +804,18 @@ func (proxier *Proxier) syncProxyRules() {
} else {
metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
}
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start))
}()
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
klog.V(2).InfoS("Syncing iptables rules")
proxier.logger.V(2).Info("Syncing iptables rules")
success := false
defer func() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
if tryPartialSync {
metrics.IptablesPartialRestoreFailuresTotal.Inc()
@@ -833,7 +841,7 @@ func (proxier *Proxier) syncProxyRules() {
// (which will be very slow on hosts with lots of iptables rules).
for _, jump := range append(iptablesJumpChains, iptablesKubeletJumpChains...) {
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
proxier.logger.Error(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
return
}
args := jump.extraArgs
@@ -842,7 +850,7 @@ func (proxier *Proxier) syncProxyRules() {
}
args = append(args, "-j", string(jump.dstChain))
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
proxier.logger.Error(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
return
}
}
@@ -952,7 +960,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
@@ -1345,7 +1353,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
proxier.logger.Error(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}
@@ -1396,7 +1404,7 @@ func (proxier *Proxier) syncProxyRules() {
}
proxier.lastIPTablesCleanup = time.Now()
} else {
klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
proxier.logger.Error(err, "Failed to execute iptables-save: stale chains will not be deleted")
}
}
@@ -1420,15 +1428,15 @@ func (proxier *Proxier) syncProxyRules() {
} else {
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
for _, ip := range nodeIPs {
if ip.IsLoopback() {
if isIPv6 {
klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported on IPv6", "address", ip.String())
proxier.logger.Error(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported on IPv6", "address", ip.String())
continue
} else if !proxier.localhostNodePorts {
klog.ErrorS(nil, "--nodeport-addresses includes localhost but --iptables-localhost-nodeports=false was passed", "address", ip.String())
proxier.logger.Error(nil, "--nodeport-addresses includes localhost but --iptables-localhost-nodeports=false was passed", "address", ip.String())
continue
}
}
@@ -1491,7 +1499,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.WriteString("COMMIT\n")
klog.V(2).InfoS("Reloading service iptables data",
proxier.logger.V(2).Info("Reloading service iptables data",
"numServices", len(proxier.svcPortMap),
"numEndpoints", totalEndpoints,
"numFilterChains", proxier.filterChains.Lines(),
@@ -1499,16 +1507,16 @@ func (proxier *Proxier) syncProxyRules() {
"numNATChains", proxier.natChains.Lines(),
"numNATRules", proxier.natRules.Lines(),
)
klog.V(9).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
proxier.logger.V(9).Info("Restoring iptables", "rules", proxier.iptablesData.Bytes())
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
err := proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
if pErr, ok := err.(utiliptables.ParseError); ok {
lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
proxier.logger.Error(pErr, "Failed to execute iptables-restore", "rules", lines)
} else {
klog.ErrorS(err, "Failed to execute iptables-restore")
proxier.logger.Error(err, "Failed to execute iptables-restore")
}
metrics.IptablesRestoreFailuresTotal.Inc()
return
@@ -1520,7 +1528,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
proxier.logger.V(4).Info("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}
@@ -1535,10 +1543,10 @@ func (proxier *Proxier) syncProxyRules() {
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
proxier.logger.Error(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
proxier.logger.Error(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping, clear stale conntrack entries for UDP Services

View File

@@ -46,6 +46,7 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
klogtesting "k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
@@ -387,17 +388,17 @@ func TestParseIPTablesData(t *testing.T) {
}
}
func countRules(tableName utiliptables.Table, ruleData string) int {
func countRules(logger klog.Logger, tableName utiliptables.Table, ruleData string) int {
dump, err := iptablestest.ParseIPTablesDump(ruleData)
if err != nil {
klog.ErrorS(err, "error parsing iptables rules")
logger.Error(err, "error parsing iptables rules")
return -1
}
rules := 0
table, err := dump.GetTable(tableName)
if err != nil {
klog.ErrorS(err, "can't find table", "table", tableName)
logger.Error(err, "can't find table", "table", tableName)
return -1
}
@@ -407,19 +408,19 @@ func countRules(tableName utiliptables.Table, ruleData string) int {
return rules
}
func countRulesFromMetric(tableName utiliptables.Table) int {
func countRulesFromMetric(logger klog.Logger, tableName utiliptables.Table) int {
numRulesFloat, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesTotal.WithLabelValues(string(tableName)))
if err != nil {
klog.ErrorS(err, "metrics are not registered?")
logger.Error(err, "metrics are not registered?")
return -1
}
return int(numRulesFloat)
}
func countRulesFromLastSyncMetric(tableName utiliptables.Table) int {
func countRulesFromLastSyncMetric(logger klog.Logger, tableName utiliptables.Table) int {
numRulesFloat, err := testutil.GetGaugeMetricValue(metrics.IptablesRulesLastSync.WithLabelValues(string(tableName)))
if err != nil {
klog.ErrorS(err, "metrics are not registered?")
logger.Error(err, "metrics are not registered?")
return -1
}
return int(numRulesFloat)
@@ -1540,6 +1541,7 @@ func TestTracePacket(t *testing.T) {
// TestOverallIPTablesRules creates a variety of services and verifies that the generated
// rules are exactly as expected.
func TestOverallIPTablesRules(t *testing.T) {
logger, _ := klogtesting.NewTestContext(t)
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
metrics.RegisterMetrics()
@@ -1799,8 +1801,8 @@ func TestOverallIPTablesRules(t *testing.T) {
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
nNatRules := countRulesFromMetric(utiliptables.TableNAT)
expectedNatRules := countRules(utiliptables.TableNAT, fp.iptablesData.String())
nNatRules := countRulesFromMetric(logger, utiliptables.TableNAT)
expectedNatRules := countRules(logger, utiliptables.TableNAT, fp.iptablesData.String())
if nNatRules != expectedNatRules {
t.Fatalf("Wrong number of nat rules: expected %d received %d", expectedNatRules, nNatRules)
@@ -4142,6 +4144,7 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
}
func TestProxierMetricsIptablesTotalRules(t *testing.T) {
logger, _ := klogtesting.NewTestContext(t)
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
@@ -4170,15 +4173,15 @@ func TestProxierMetricsIptablesTotalRules(t *testing.T) {
fp.syncProxyRules()
iptablesData := fp.iptablesData.String()
nFilterRules := countRulesFromMetric(utiliptables.TableFilter)
expectedFilterRules := countRules(utiliptables.TableFilter, iptablesData)
nFilterRules := countRulesFromMetric(logger, utiliptables.TableFilter)
expectedFilterRules := countRules(logger, utiliptables.TableFilter, iptablesData)
if nFilterRules != expectedFilterRules {
t.Fatalf("Wrong number of filter rule: expected %d got %d\n%s", expectedFilterRules, nFilterRules, iptablesData)
}
nNatRules := countRulesFromMetric(utiliptables.TableNAT)
expectedNatRules := countRules(utiliptables.TableNAT, iptablesData)
nNatRules := countRulesFromMetric(logger, utiliptables.TableNAT)
expectedNatRules := countRules(logger, utiliptables.TableNAT, iptablesData)
if nNatRules != expectedNatRules {
t.Fatalf("Wrong number of nat rules: expected %d got %d\n%s", expectedNatRules, nNatRules, iptablesData)
@@ -4203,15 +4206,15 @@ func TestProxierMetricsIptablesTotalRules(t *testing.T) {
fp.syncProxyRules()
iptablesData = fp.iptablesData.String()
nFilterRules = countRulesFromMetric(utiliptables.TableFilter)
expectedFilterRules = countRules(utiliptables.TableFilter, iptablesData)
nFilterRules = countRulesFromMetric(logger, utiliptables.TableFilter)
expectedFilterRules = countRules(logger, utiliptables.TableFilter, iptablesData)
if nFilterRules != expectedFilterRules {
t.Fatalf("Wrong number of filter rule: expected %d got %d\n%s", expectedFilterRules, nFilterRules, iptablesData)
}
nNatRules = countRulesFromMetric(utiliptables.TableNAT)
expectedNatRules = countRules(utiliptables.TableNAT, iptablesData)
nNatRules = countRulesFromMetric(logger, utiliptables.TableNAT)
expectedNatRules = countRules(logger, utiliptables.TableNAT, iptablesData)
if nNatRules != expectedNatRules {
t.Fatalf("Wrong number of nat rules: expected %d got %d\n%s", expectedNatRules, nNatRules, iptablesData)
@@ -5822,6 +5825,7 @@ func TestSyncProxyRulesLargeClusterMode(t *testing.T) {
// Test calling syncProxyRules() multiple times with various changes
func TestSyncProxyRulesRepeated(t *testing.T) {
logger, _ := klogtesting.NewTestContext(t)
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
metrics.RegisterMetrics()
@@ -5920,14 +5924,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), true, expected, fp.iptablesData.String())
rulesSynced := countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric := countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced := countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric := countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
rulesTotal := rulesSynced
rulesTotalMetric := countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric := countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -5998,8 +6002,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
@@ -6007,7 +6011,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
// We added 1 KUBE-SERVICES rule, 2 KUBE-SVC-X27LE4BHSL4DOUIK rules, and 2
// KUBE-SEP-BSWRHOQ77KEXZLNL rules.
rulesTotal += 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6049,8 +6053,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
@@ -6058,7 +6062,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
// We deleted 1 KUBE-SERVICES rule, 2 KUBE-SVC-2VJB64SDSIJUP5T6 rules, and 2
// KUBE-SEP-UHEGFW77JX3KXTOV rules
rulesTotal -= 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6109,15 +6113,15 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// The REJECT rule is in "filter", not NAT, so the number of NAT rules hasn't
// changed.
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6172,8 +6176,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
@@ -6181,7 +6185,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
// We added 1 KUBE-SERVICES rule, 2 KUBE-SVC-4SW47YFZTEDKD3PK rules, and
// 2 KUBE-SEP-AYCN5HPXMIRJNJXU rules
rulesTotal += 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6231,14 +6235,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// We rewrote existing rules but did not change the overall number of rules.
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6289,8 +6293,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
@@ -6299,7 +6303,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
// jumping to the new SEP chain. The other rules related to svc3 got rewritten,
// but that does not change the count of rules.
rulesTotal += 3
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6337,14 +6341,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
// (No changes)
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}
@@ -6447,8 +6451,8 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
`)
assertIPTablesRulesEqual(t, getLine(), false, expected, fp.iptablesData.String())
rulesSynced = countRules(utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(utiliptables.TableNAT)
rulesSynced = countRules(logger, utiliptables.TableNAT, expected)
rulesSyncedMetric = countRulesFromLastSyncMetric(logger, utiliptables.TableNAT)
if rulesSyncedMetric != rulesSynced {
t.Errorf("metric shows %d rules synced but iptables data shows %d", rulesSyncedMetric, rulesSynced)
}
@@ -6456,7 +6460,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
// We deleted 1 KUBE-SERVICES rule, 2 KUBE-SVC-4SW47YFZTEDKD3PK rules, and 2
// KUBE-SEP-AYCN5HPXMIRJNJXU rules
rulesTotal -= 5
rulesTotalMetric = countRulesFromMetric(utiliptables.TableNAT)
rulesTotalMetric = countRulesFromMetric(logger, utiliptables.TableNAT)
if rulesTotalMetric != rulesTotal {
t.Errorf("metric shows %d rules total but expected %d", rulesTotalMetric, rulesTotal)
}

View File

@@ -21,6 +21,7 @@ package ipvs
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@@ -111,6 +112,7 @@ const (
// NewDualStackProxier returns a new Proxier for dual-stack operation
func NewDualStackProxier(
ctx context.Context,
ipt [2]utiliptables.Interface,
ipvs utilipvs.Interface,
ipset utilipset.Interface,
@@ -135,7 +137,7 @@ func NewDualStackProxier(
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], ipvs, ipset, sysctl,
ipv4Proxier, err := NewProxier(ctx, v1.IPv4Protocol, ipt[0], ipvs, ipset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[0], hostname, nodeIPs[v1.IPv4Protocol], recorder,
@@ -144,7 +146,7 @@ func NewDualStackProxier(
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], ipvs, ipset, sysctl,
ipv6Proxier, err := NewProxier(ctx, v1.IPv6Protocol, ipt[1], ipvs, ipset, sysctl,
exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP,
tcpTimeout, tcpFinTimeout, udpTimeout, masqueradeAll, masqueradeBit,
localDetectors[1], hostname, nodeIPs[v1.IPv6Protocol], recorder,
@@ -251,6 +253,8 @@ type Proxier struct {
// additional iptables rules.
// (ref: https://github.com/kubernetes/kubernetes/issues/119656)
lbNoNodeAccessIPPortProtocolEntries []*utilipset.Entry
logger klog.Logger
}
// Proxier implements proxy.Provider
@@ -261,7 +265,9 @@ var _ proxy.Provider = &Proxier{}
// An error will be returned if it fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
// will not terminate if a particular iptables or ipvs call fails.
func NewProxier(ipFamily v1.IPFamily,
func NewProxier(
ctx context.Context,
ipFamily v1.IPFamily,
ipt utiliptables.Interface,
ipvs utilipvs.Interface,
ipset utilipset.Interface,
@@ -285,6 +291,7 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddressStrings []string,
initOnly bool,
) (*Proxier, error) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
// Set the conntrack sysctl we need for
if err := proxyutil.EnsureSysctl(sysctl, sysctlVSConnTrack, 1); err != nil {
return nil, err
@@ -296,10 +303,10 @@ func NewProxier(ipFamily v1.IPFamily,
}
if kernelVersion.LessThan(version.MustParseGeneric(utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)) {
klog.ErrorS(nil, "Can't set sysctl, kernel version doesn't satisfy minimum version requirements", "sysctl", sysctlConnReuse, "minimumKernelVersion", utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)
logger.Error(nil, "Can't set sysctl, kernel version doesn't satisfy minimum version requirements", "sysctl", sysctlConnReuse, "minimumKernelVersion", utilkernel.IPVSConnReuseModeMinSupportedKernelVersion)
} else if kernelVersion.AtLeast(version.MustParseGeneric(utilkernel.IPVSConnReuseModeFixedKernelVersion)) {
// https://github.com/kubernetes/kubernetes/issues/93297
klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse)
logger.V(2).Info("Left as-is", "sysctl", sysctlConnReuse)
} else {
// Set the connection reuse mode
if err := proxyutil.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil {
@@ -339,12 +346,12 @@ func NewProxier(ipFamily v1.IPFamily,
// current system timeout should be preserved
if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
klog.ErrorS(err, "Failed to configure IPVS timeouts")
logger.Error(err, "Failed to configure IPVS timeouts")
}
}
if initOnly {
klog.InfoS("System initialized and --init-only specified")
logger.Info("System initialized and --init-only specified")
return nil, nil
}
@@ -352,10 +359,10 @@ func NewProxier(ipFamily v1.IPFamily,
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
logger.V(2).Info("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
if len(scheduler) == 0 {
klog.InfoS("IPVS scheduler not specified, use rr by default")
logger.Info("IPVS scheduler not specified, use rr by default")
scheduler = defaultScheduler
}
@@ -399,6 +406,7 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
logger: logger,
}
// initialize ipsetList with all sets we needed
proxier.ipsetList = make(map[string]*IPSet)
@@ -406,7 +414,7 @@ func NewProxier(ipFamily v1.IPFamily,
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
}
burstSyncs := 2
klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
logger.V(2).Info("ipvs sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
proxier.gracefuldeleteManager.Run()
return proxier, nil
@@ -571,7 +579,8 @@ func getFirstColumn(r io.Reader) ([]string, error) {
// already exist with the configured scheduler, we just return. Otherwise
// we check if a dummy VS can be configured with the configured scheduler.
// Kernel modules will be loaded automatically if necessary.
func CanUseIPVSProxier(ipvs utilipvs.Interface, ipsetver IPSetVersioner, scheduler string) error {
func CanUseIPVSProxier(ctx context.Context, ipvs utilipvs.Interface, ipsetver IPSetVersioner, scheduler string) error {
logger := klog.FromContext(ctx)
// BUG: https://github.com/moby/ipvs/issues/27
// If ipvs is not compiled into the kernel no error is returned and handle==nil.
// This in turn causes ipvs.GetVirtualServers and ipvs.AddVirtualServer
@@ -597,20 +606,20 @@ func CanUseIPVSProxier(ipvs utilipvs.Interface, ipsetver IPSetVersioner, schedul
// If any virtual server (VS) using the scheduler exist we skip the checks.
vservers, err := ipvs.GetVirtualServers()
if err != nil {
klog.ErrorS(err, "Can't read the ipvs")
logger.Error(err, "Can't read the ipvs")
return err
}
klog.V(5).InfoS("Virtual Servers", "count", len(vservers))
logger.V(5).Info("Virtual Servers", "count", len(vservers))
if len(vservers) > 0 {
// This is most likely a kube-proxy re-start. We know that ipvs works
// and if any VS uses the configured scheduler, we are done.
for _, vs := range vservers {
if vs.Scheduler == scheduler {
klog.V(5).InfoS("VS exist, Skipping checks")
logger.V(5).Info("VS exist, Skipping checks")
return nil
}
}
klog.V(5).InfoS("No existing VS uses the configured scheduler", "scheduler", scheduler)
logger.V(5).Info("No existing VS uses the configured scheduler", "scheduler", scheduler)
}
// Try to insert a dummy VS with the passed scheduler.
@@ -631,25 +640,25 @@ func CanUseIPVSProxier(ipvs utilipvs.Interface, ipsetver IPSetVersioner, schedul
Scheduler: scheduler,
}
if err := ipvs.AddVirtualServer(&vs); err != nil {
klog.ErrorS(err, "Could not create dummy VS", "scheduler", scheduler)
logger.Error(err, "Could not create dummy VS", "scheduler", scheduler)
return err
}
// To overcome the BUG described above we check that the VS is *really* added.
vservers, err = ipvs.GetVirtualServers()
if err != nil {
klog.ErrorS(err, "ipvs.GetVirtualServers")
logger.Error(err, "ipvs.GetVirtualServers")
return err
}
klog.V(5).InfoS("Virtual Servers after adding dummy", "count", len(vservers))
logger.V(5).Info("Virtual Servers after adding dummy", "count", len(vservers))
if len(vservers) == 0 {
klog.InfoS("Dummy VS not created", "scheduler", scheduler)
logger.Info("Dummy VS not created", "scheduler", scheduler)
return fmt.Errorf("Ipvs not supported") // This is a BUG work-around
}
klog.V(5).InfoS("Dummy VS created", "vs", vs)
logger.V(5).Info("Dummy VS created", "vs", vs)
if err := ipvs.DeleteVirtualServer(&vs); err != nil {
klog.ErrorS(err, "Could not delete dummy VS")
logger.Error(err, "Could not delete dummy VS")
return err
}
@@ -658,7 +667,8 @@ func CanUseIPVSProxier(ipvs utilipvs.Interface, ipsetver IPSetVersioner, schedul
// CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier
// It returns true if an error was encountered. Errors are logged.
func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
func cleanupIptablesLeftovers(ctx context.Context, ipt utiliptables.Interface) (encounteredError bool) {
logger := klog.FromContext(ctx)
// Unlink the iptables chains created by ipvs Proxier
for _, jc := range iptablesJumpChain {
args := []string{
@@ -667,7 +677,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
}
if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
logger.Error(err, "Error removing iptables rules in ipvs proxier")
encounteredError = true
}
}
@@ -677,7 +687,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
for _, ch := range iptablesCleanupChains {
if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
logger.Error(err, "Error removing iptables rules in ipvs proxier")
encounteredError = true
}
}
@@ -687,7 +697,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
for _, ch := range iptablesCleanupChains {
if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
logger.Error(err, "Error removing iptables rules in ipvs proxier")
encounteredError = true
}
}
@@ -697,12 +707,13 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
}
// CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier.
func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
func CleanupLeftovers(ctx context.Context, ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) {
logger := klog.FromContext(ctx)
// Clear all ipvs rules
if ipvs != nil {
err := ipvs.Flush()
if err != nil {
klog.ErrorS(err, "Error flushing ipvs rules")
logger.Error(err, "Error flushing ipvs rules")
encounteredError = true
}
}
@@ -710,18 +721,18 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
nl := NewNetLinkHandle(false)
err := nl.DeleteDummyDevice(defaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Error deleting dummy device created by ipvs proxier", "device", defaultDummyDevice)
logger.Error(err, "Error deleting dummy device created by ipvs proxier", "device", defaultDummyDevice)
encounteredError = true
}
// Clear iptables created by ipvs Proxier.
encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError
encounteredError = cleanupIptablesLeftovers(ctx, ipt) || encounteredError
// Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
// iptables since we can NOT delete ip set which is still referenced by iptables.
for _, set := range ipsetInfo {
err = ipset.DestroySet(set.name)
if err != nil {
if !utilipset.IsNotFoundError(err) {
klog.ErrorS(err, "Error removing ipset", "ipset", set.name)
logger.Error(err, "Error removing ipset", "ipset", set.name)
encounteredError = true
}
}
@@ -829,7 +840,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -843,7 +854,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
proxier.nodeLabels[k] = v
}
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
@@ -852,7 +863,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -866,7 +877,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
proxier.nodeLabels[k] = v
}
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
@@ -875,7 +886,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -902,7 +913,7 @@ func (proxier *Proxier) syncProxyRules() {
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
proxier.logger.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master")
return
}
@@ -916,7 +927,7 @@ func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
proxier.logger.V(4).Info("syncProxyRules complete", "elapsed", time.Since(start))
}()
// We assume that if this was called, we really want to sync them,
@@ -925,7 +936,7 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
klog.V(3).InfoS("Syncing ipvs proxier rules")
proxier.logger.V(3).Info("Syncing ipvs proxier rules")
proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
proxier.serviceNoLocalEndpointsExternal = sets.New[string]()
@@ -950,7 +961,7 @@ func (proxier *Proxier) syncProxyRules() {
// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
_, err := proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Failed to create dummy interface", "interface", defaultDummyDevice)
proxier.logger.Error(err, "Failed to create dummy interface", "interface", defaultDummyDevice)
return
}
@@ -969,12 +980,12 @@ func (proxier *Proxier) syncProxyRules() {
// alreadyBoundAddrs Represents addresses currently assigned to the dummy interface
alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Error listing addresses binded to dummy interface")
proxier.logger.Error(err, "Error listing addresses binded to dummy interface")
}
// nodeAddressSet All addresses *except* those on the dummy interface
nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddressesExcept(defaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Error listing node addresses")
proxier.logger.Error(err, "Error listing node addresses")
}
hasNodePort := false
@@ -997,7 +1008,7 @@ func (proxier *Proxier) syncProxyRules() {
} else {
allNodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node IP address matching nodeport cidr")
proxier.logger.Error(err, "Failed to get node IP address matching nodeport cidr")
} else {
for _, ip := range allNodeIPs {
if !ip.IsLoopback() {
@@ -1012,7 +1023,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcPortName, svcPort := range proxier.svcPortMap {
svcInfo, ok := svcPort.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
proxier.logger.Error(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
continue
}
@@ -1025,7 +1036,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, e := range proxier.endpointsMap[svcPortName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
proxier.logger.Error(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
continue
}
if !ep.IsLocal() {
@@ -1045,7 +1056,7 @@ func (proxier *Proxier) syncProxyRules() {
SetType: utilipset.HashIPPortIP,
}
if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
continue
}
proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
@@ -1062,7 +1073,7 @@ func (proxier *Proxier) syncProxyRules() {
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
continue
}
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
@@ -1093,10 +1104,10 @@ func (proxier *Proxier) syncProxyRules() {
internalNodeLocal = true
}
if err := proxier.syncEndpoint(svcPortName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
// Capture externalIPs.
@@ -1111,14 +1122,14 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.ExternalPolicyLocal() {
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
continue
}
proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
} else {
// We have to SNAT packets to external IPs.
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
continue
}
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
@@ -1147,10 +1158,10 @@ func (proxier *Proxier) syncProxyRules() {
activeBindAddrs.Insert(serv.Address.String())
}
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
}
@@ -1168,14 +1179,14 @@ func (proxier *Proxier) syncProxyRules() {
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.ExternalPolicyLocal() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
@@ -1185,7 +1196,7 @@ func (proxier *Proxier) syncProxyRules() {
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
if valid := proxier.ipsetList[kubeLoadBalancerFWSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerFWSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerFWSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerFWSet].activeEntries.Insert(entry.String())
@@ -1201,7 +1212,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// enumerate all white list source cidr
if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
@@ -1223,7 +1234,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// enumerate all white list source ip
if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
@@ -1256,10 +1267,10 @@ func (proxier *Proxier) syncProxyRules() {
activeBindAddrs.Insert(serv.Address.String())
}
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
}
@@ -1309,13 +1320,13 @@ func (proxier *Proxier) syncProxyRules() {
}
default:
// It should never hit
klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
proxier.logger.Error(nil, "Unsupported protocol type", "protocol", protocol)
}
if nodePortSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
entryInvalidErr = true
break
}
@@ -1338,13 +1349,13 @@ func (proxier *Proxier) syncProxyRules() {
nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
default:
// It should never hit
klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
proxier.logger.Error(nil, "Unsupported protocol type", "protocol", protocol)
}
if nodePortLocalSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortLocalSet.validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortLocalSet.Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortLocalSet.Name)
entryInvalidErr = true
break
}
@@ -1377,10 +1388,10 @@ func (proxier *Proxier) syncProxyRules() {
if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil {
activeIPVSServices.Insert(serv.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
proxier.logger.Error(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
}
}
@@ -1395,7 +1406,7 @@ func (proxier *Proxier) syncProxyRules() {
}
if valid := nodePortSet.validateEntry(entry); !valid {
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
proxier.logger.Error(nil, "Error adding entry to ipset", "entry", entry, "ipset", nodePortSet.Name)
continue
}
nodePortSet.activeEntries.Insert(entry.String())
@@ -1422,14 +1433,17 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
proxier.logger.V(5).Info(
"Restoring iptables", "natChains", proxier.natChains,
"natRules", proxier.natRules, "filterChains", proxier.filterChains,
"filterRules", proxier.filterRules)
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
if pErr, ok := err.(utiliptables.ParseError); ok {
lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
proxier.logger.Error(pErr, "Failed to execute iptables-restore", "rules", lines)
} else {
klog.ErrorS(err, "Failed to execute iptables-restore", "rules", proxier.iptablesData.Bytes())
proxier.logger.Error(err, "Failed to execute iptables-restore", "rules", proxier.iptablesData.Bytes())
}
metrics.IptablesRestoreFailuresTotal.Inc()
return
@@ -1438,17 +1452,17 @@ func (proxier *Proxier) syncProxyRules() {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
proxier.logger.V(4).Info("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}
// Remove superfluous addresses from the dummy device
superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
if superfluousAddresses.Len() > 0 {
klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
proxier.logger.V(2).Info("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
for adr := range superfluousAddresses {
if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil {
klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
proxier.logger.Error(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
}
}
}
@@ -1462,7 +1476,7 @@ func (proxier *Proxier) syncProxyRules() {
currentIPVSServices[appliedSvc.String()] = appliedSvc
}
} else {
klog.ErrorS(err, "Failed to get ipvs service")
proxier.logger.Error(err, "Failed to get ipvs service")
}
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
@@ -1475,10 +1489,10 @@ func (proxier *Proxier) syncProxyRules() {
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
proxier.logger.Error(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
proxier.logger.Error(err, "Error syncing healthcheck endpoints")
}
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
@@ -1750,7 +1764,7 @@ func (proxier *Proxier) acceptIPVSTraffic() {
func (proxier *Proxier) createAndLinkKubeChain() {
for _, ch := range iptablesChains {
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
proxier.logger.Error(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
return
}
if ch.table == utiliptables.TableNAT {
@@ -1763,7 +1777,7 @@ func (proxier *Proxier) createAndLinkKubeChain() {
for _, jc := range iptablesJumpChain {
args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to)
proxier.logger.Error(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to)
}
}
@@ -1774,17 +1788,17 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil {
// IPVS service is not found, create a new service
klog.V(3).InfoS("Adding new service", "serviceName", svcName, "virtualServer", vs)
proxier.logger.V(3).Info("Adding new service", "serviceName", svcName, "virtualServer", vs)
if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
klog.ErrorS(err, "Failed to add IPVS service", "serviceName", svcName)
proxier.logger.Error(err, "Failed to add IPVS service", "serviceName", svcName)
return err
}
} else {
// IPVS service was changed, update the existing one
// During updates, service VIP will not go down
klog.V(3).InfoS("IPVS service was changed", "serviceName", svcName)
proxier.logger.V(3).Info("IPVS service was changed", "serviceName", svcName)
if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
klog.ErrorS(err, "Failed to update IPVS service")
proxier.logger.Error(err, "Failed to update IPVS service")
return err
}
}
@@ -1798,10 +1812,10 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
return nil
}
klog.V(4).InfoS("Bind address", "address", vs.Address)
proxier.logger.V(4).Info("Bind address", "address", vs.Address)
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), defaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Failed to bind service address to dummy device", "serviceName", svcName)
proxier.logger.Error(err, "Failed to bind service address to dummy device", "serviceName", svcName)
return err
}
}
@@ -1812,7 +1826,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
if err != nil {
klog.ErrorS(err, "Failed to get IPVS service")
proxier.logger.Error(err, "Failed to get IPVS service")
return err
}
if appliedVirtualServer == nil {
@@ -1823,7 +1837,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
curEndpoints := sets.New[string]()
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil {
klog.ErrorS(err, "Failed to list IPVS destinations")
proxier.logger.Error(err, "Failed to list IPVS destinations")
return err
}
for _, des := range curDests {
@@ -1838,7 +1852,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// externalTrafficPolicy=Local.
svcInfo, ok := proxier.svcPortMap[svcPortName]
if !ok {
klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
if onlyNodeLocalEndpoints {
@@ -1873,12 +1887,12 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
for _, ep := range newEndpoints.UnsortedList() {
ip, port, err := net.SplitHostPort(ep)
if err != nil {
klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
proxier.logger.Error(err, "Failed to parse endpoint", "endpoint", ep)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
proxier.logger.Error(err, "Failed to parse endpoint port", "port", port)
continue
}
@@ -1896,7 +1910,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if dest.Weight != newDest.Weight {
err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest)
if err != nil {
klog.ErrorS(err, "Failed to update destination", "newDest", newDest)
proxier.logger.Error(err, "Failed to update destination", "newDest", newDest)
continue
}
}
@@ -1907,16 +1921,16 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
continue
}
klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRealServer", uniqueRS)
proxier.logger.V(5).Info("new ep is in graceful delete list", "uniqueRealServer", uniqueRS)
err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep)
proxier.logger.Error(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep)
continue
}
}
err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
if err != nil {
klog.ErrorS(err, "Failed to add destination", "newDest", newDest)
proxier.logger.Error(err, "Failed to add destination", "newDest", newDest)
continue
}
}
@@ -1930,12 +1944,12 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}
ip, port, err := net.SplitHostPort(ep)
if err != nil {
klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
proxier.logger.Error(err, "Failed to parse endpoint", "endpoint", ep)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
proxier.logger.Error(err, "Failed to parse endpoint port", "port", port)
continue
}
@@ -1944,10 +1958,10 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
Port: uint16(portNum),
}
klog.V(5).InfoS("Using graceful delete", "uniqueRealServer", uniqueRS)
proxier.logger.V(5).Info("Using graceful delete", "uniqueRealServer", uniqueRS)
err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
if err != nil {
klog.ErrorS(err, "Failed to delete destination", "uniqueRealServer", uniqueRS)
proxier.logger.Error(err, "Failed to delete destination", "uniqueRealServer", uniqueRS)
continue
}
}
@@ -1964,9 +1978,9 @@ func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], curr
continue
}
if !activeServices.Has(cs) {
klog.V(4).InfoS("Delete service", "virtualServer", svc)
proxier.logger.V(4).Info("Delete service", "virtualServer", svc)
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
klog.ErrorS(err, "Failed to delete service", "virtualServer", svc)
proxier.logger.Error(err, "Failed to delete service", "virtualServer", svc)
}
}
}

View File

@@ -21,6 +21,7 @@ package ipvs
import (
"bytes"
"context"
"fmt"
"net"
"reflect"
@@ -55,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@@ -125,7 +127,7 @@ func (fake *fakeIPSetVersioner) GetVersion() (string, error) {
return fake.version, fake.err
}
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []string, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier {
func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []string, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier {
netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol)
netlinkHandle.SetLocalAddresses("eth0", nodeIPs...)
@@ -224,10 +226,11 @@ func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*d
}
func TestCleanupLeftovers(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@@ -266,12 +269,13 @@ func TestCleanupLeftovers(t *testing.T) {
fp.syncProxyRules()
// test cleanup left over
if CleanupLeftovers(ipvs, ipt, ipset) {
if CleanupLeftovers(ctx, ipvs, ipt, ipset) {
t.Errorf("Cleanup leftovers failed")
}
}
func TestCanUseIPVSProxier(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testCases := []struct {
name string
scheduler string
@@ -321,7 +325,7 @@ func TestCanUseIPVSProxier(t *testing.T) {
for _, tc := range testCases {
ipvs := &fakeIpvs{tc.ipvsErr, false}
versioner := &fakeIPSetVersioner{version: tc.ipsetVersion, err: tc.ipsetErr}
err := CanUseIPVSProxier(ipvs, versioner, tc.scheduler)
err := CanUseIPVSProxier(ctx, ipvs, versioner, tc.scheduler)
if (err == nil) != tc.ok {
t.Errorf("Case [%s], expect %v, got err: %v", tc.name, tc.ok, err)
}
@@ -941,10 +945,11 @@ func TestNodePortIPv4(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv4Protocol)
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv4Protocol, test.nodePortAddresses)
makeServiceMap(fp, test.services...)
@@ -1283,10 +1288,11 @@ func TestNodePortIPv6(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, test.nodeIPs, nil, v1.IPv6Protocol)
fp.nodePortAddresses = proxyutil.NewNodePortAddresses(v1.IPv6Protocol, test.nodePortAddresses)
makeServiceMap(fp, test.services...)
@@ -1312,10 +1318,11 @@ func TestNodePortIPv6(t *testing.T) {
}
func Test_syncEndpoint_updateWeightsOnRestart(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
svc1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.ClusterIP = "10.20.30.41"
@@ -1509,10 +1516,11 @@ func TestIPv4Proxier(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
makeServiceMap(fp, test.services...)
populateEndpointSlices(fp, test.endpoints...)
@@ -1646,10 +1654,11 @@ func TestIPv6Proxier(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv6Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv6Protocol)
makeServiceMap(fp, test.services...)
populateEndpointSlices(fp, test.endpoints...)
@@ -1667,10 +1676,11 @@ func TestIPv6Proxier(t *testing.T) {
func TestMasqueradeRule(t *testing.T) {
for _, testcase := range []bool{false, true} {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake().SetHasRandomFully(testcase)
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
makeServiceMap(fp)
fp.syncProxyRules()
@@ -1700,10 +1710,11 @@ func TestMasqueradeRule(t *testing.T) {
}
func TestExternalIPsNoEndpoint(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := "50.60.70.81"
@@ -1752,10 +1763,11 @@ func TestExternalIPsNoEndpoint(t *testing.T) {
}
func TestExternalIPs(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := sets.New[string]("50.60.70.81", "2012::51", "127.0.0.1")
@@ -1822,10 +1834,11 @@ func TestExternalIPs(t *testing.T) {
}
func TestOnlyLocalExternalIPs(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := sets.New[string]("50.60.70.81", "2012::51", "127.0.0.1")
@@ -1903,7 +1916,7 @@ func TestOnlyLocalExternalIPs(t *testing.T) {
}
func TestLoadBalancer(t *testing.T) {
ipt, fp := buildFakeProxier()
ipt, fp := buildFakeProxier(t)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@@ -1989,7 +2002,7 @@ func TestLoadBalancer(t *testing.T) {
func TestOnlyLocalNodePorts(t *testing.T) {
nodeIP := netutils.ParseIPSloppy("100.101.102.103")
ipt, fp := buildFakeProxier()
ipt, fp := buildFakeProxier(t)
svcIP := "10.20.30.41"
svcPort := 80
@@ -2087,7 +2100,7 @@ func TestOnlyLocalNodePorts(t *testing.T) {
}
func TestHealthCheckNodePort(t *testing.T) {
ipt, fp := buildFakeProxier()
ipt, fp := buildFakeProxier(t)
svcIP := "10.20.30.41"
svcPort := 80
@@ -2160,7 +2173,7 @@ func TestHealthCheckNodePort(t *testing.T) {
}
func TestLoadBalancerSourceRanges(t *testing.T) {
ipt, fp := buildFakeProxier()
ipt, fp := buildFakeProxier(t)
svcIP := "10.20.30.41"
svcPort := 80
@@ -2265,7 +2278,7 @@ func TestLoadBalancerSourceRanges(t *testing.T) {
}
func TestAcceptIPVSTraffic(t *testing.T) {
ipt, fp := buildFakeProxier()
ipt, fp := buildFakeProxier(t)
ingressIP := "1.2.3.4"
externalIP := []string{"5.6.7.8"}
@@ -2335,7 +2348,7 @@ func TestAcceptIPVSTraffic(t *testing.T) {
}
func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt, fp := buildFakeProxier()
ipt, fp := buildFakeProxier(t)
svcIP := "10.20.30.41"
svcPort := 80
@@ -2449,10 +2462,11 @@ func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port
}
func TestBuildServiceMapAddRemove(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
services := []*v1.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
@@ -2561,10 +2575,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
}
func TestBuildServiceMapServiceHeadless(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
makeServiceMap(fp,
makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
@@ -2601,10 +2616,11 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
}
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
makeServiceMap(fp,
makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
@@ -2631,10 +2647,11 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
}
func TestBuildServiceMapServiceUpdate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@@ -2722,11 +2739,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
}
func TestSessionAffinity(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
nodeIP := "100.101.102.103"
fp := NewFakeProxier(ipt, ipvs, ipset, []string{nodeIP}, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, []string{nodeIP}, nil, v1.IPv4Protocol)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@@ -3537,10 +3555,11 @@ func Test_updateEndpointsMap(t *testing.T) {
for tci, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.hostname = testHostname
// First check that after adding all previous versions of endpoints,
@@ -3813,10 +3832,11 @@ func Test_syncService(t *testing.T) {
}
for i := range testCases {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
proxier := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
proxier.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
if testCases[i].oldVirtualServer != nil {
@@ -3842,11 +3862,12 @@ func Test_syncService(t *testing.T) {
}
}
func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) {
func buildFakeProxier(t *testing.T) (*iptablestest.FakeIPTables, *Proxier) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
return ipt, NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
}
func getRules(ipt *iptablestest.FakeIPTables, chain utiliptables.Chain) []*iptablestest.Rule {
@@ -3935,11 +3956,12 @@ func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer)
}
func TestCleanLegacyService(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"})
fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
// All ipvs services that were processed in the latest sync loop.
activeServices := sets.New("ipvs0", "ipvs1")
@@ -4016,10 +4038,11 @@ func TestCleanLegacyService(t *testing.T) {
}
func TestCleanLegacyServiceWithRealServers(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
// all deleted expect ipvs2
activeServices := sets.New("ipvs2")
@@ -4085,12 +4108,13 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) {
}
func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
gtm := NewGracefulTerminationManager(ipvs)
excludeCIDRs, _ := netutils.ParseCIDRs([]string{"4.4.4.4/32"})
fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
fp.gracefuldeleteManager = gtm
vs := &utilipvs.VirtualServer{
@@ -4137,11 +4161,12 @@ func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
}
func TestCleanLegacyService6(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3000::/64", "4000::/64"})
fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv6Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv6Protocol)
fp.nodeIP = netutils.ParseIPSloppy("::1")
// All ipvs services that were processed in the latest sync loop.
@@ -4219,10 +4244,11 @@ func TestCleanLegacyService6(t *testing.T) {
}
func TestMultiPortServiceBindAddr(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@@ -4323,10 +4349,11 @@ raid10 57344 0 - Live 0xffffffffc0597000`,
// the shared EndpointsChangeTracker and EndpointSliceCache. This test ensures that the
// ipvs proxier supports translating EndpointSlices to ipvs output.
func TestEndpointSliceE2E(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
fp.endpointSlicesSynced = true
@@ -4408,10 +4435,11 @@ func TestEndpointSliceE2E(t *testing.T) {
}
func TestHealthCheckNodePortE2E(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
fp.endpointSlicesSynced = true
@@ -4462,10 +4490,11 @@ func TestHealthCheckNodePortE2E(t *testing.T) {
// Test_HealthCheckNodePortWhenTerminating tests that health check node ports are not enabled when all local endpoints are terminating
func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -4606,10 +4635,11 @@ func TestFilterCIDRs(t *testing.T) {
}
func TestCreateAndLinkKubeChain(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.createAndLinkKubeChain()
expectedNATChains := `:KUBE-SERVICES - [0:0]
:KUBE-POSTROUTING - [0:0]
@@ -4709,10 +4739,11 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) {
},
}
for _, tc := range testCases {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -4805,11 +4836,11 @@ func TestTestInternalTrafficPolicyE2E(t *testing.T) {
// Test_EndpointSliceReadyAndTerminatingCluster tests that when there are ready and ready + terminating
// endpoints and the traffic policy is "Cluster", only the ready endpoints are used.
func Test_EndpointSliceReadyAndTerminatingCluster(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -4978,11 +5009,11 @@ func Test_EndpointSliceReadyAndTerminatingCluster(t *testing.T) {
// Test_EndpointSliceReadyAndTerminatingLocal tests that when there are local ready and ready + terminating
// endpoints, only the ready endpoints are used.
func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -5150,11 +5181,11 @@ func Test_EndpointSliceReadyAndTerminatingLocal(t *testing.T) {
// Test_EndpointSliceOnlyReadyTerminatingCluster tests that when there are only ready terminating
// endpoints and the traffic policy is "Cluster", we fall back to terminating endpoints.
func Test_EndpointSliceOnlyReadyAndTerminatingCluster(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -5322,11 +5353,11 @@ func Test_EndpointSliceOnlyReadyAndTerminatingCluster(t *testing.T) {
// Test_EndpointSliceOnlyReadyTerminatingLocal tests that when there are only local ready terminating
// endpoints, we fall back to those endpoints.
func Test_EndpointSliceOnlyReadyAndTerminatingLocal(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -5665,10 +5696,11 @@ func TestNoEndpointsMetric(t *testing.T) {
},
}
for _, tc := range testCases {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, []string{"10.0.0.1"}, nil, v1.IPv4Protocol)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, []string{"10.0.0.1"}, nil, v1.IPv4Protocol)
fp.servicesSynced = true
// fp.endpointsSynced = true
fp.endpointSlicesSynced = true
@@ -5760,13 +5792,14 @@ func TestDismissLocalhostRuleExist(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ipt := iptablestest.NewFake()
if test.ipFamily == v1.IPv6Protocol {
ipt = iptablestest.NewIPv6Fake()
}
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, test.ipFamily)
fp := NewFakeProxier(ctx, ipt, ipvs, ipset, nil, nil, test.ipFamily)
fp.syncProxyRules()
@@ -5853,7 +5886,7 @@ func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)()
_, fp := buildFakeProxier()
_, fp := buildFakeProxier(t)
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "LoadBalancer"

View File

@@ -20,13 +20,13 @@ limitations under the License.
package ipvs
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"errors"
libipvs "github.com/moby/ipvs"
"golang.org/x/sys/unix"

View File

@@ -17,6 +17,7 @@ limitations under the License.
package kubemark
import (
"context"
"fmt"
"time"
@@ -85,7 +86,8 @@ func NewHollowProxy(
}
func (hp *HollowProxy) Run() error {
if err := hp.ProxyServer.Run(); err != nil {
if err := hp.ProxyServer.Run(context.TODO()); err != nil {
return fmt.Errorf("Error while running proxy: %w", err)
}
return nil

View File

@@ -105,6 +105,7 @@ const (
// NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
func NewDualStackProxier(
ctx context.Context,
sysctl utilsysctl.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
@@ -119,14 +120,14 @@ func NewDualStackProxier(
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, sysctl,
ipv4Proxier, err := NewProxier(ctx, v1.IPv4Protocol, sysctl,
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[0], hostname,
nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}
ipv6Proxier, err := NewProxier(v1.IPv6Protocol, sysctl,
ipv6Proxier, err := NewProxier(ctx, v1.IPv6Protocol, sysctl,
syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit, localDetectors[1], hostname,
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
@@ -189,6 +190,8 @@ type Proxier struct {
// serviceCIDRs is a comma separated list of ServiceCIDRs belonging to the IPFamily
// which proxier is operating on, can be directly consumed by knftables.
serviceCIDRs string
logger klog.Logger
}
// Proxier implements proxy.Provider
@@ -197,7 +200,8 @@ var _ proxy.Provider = &Proxier{}
// NewProxier returns a new nftables Proxier. Once a proxier is created, it will keep
// nftables up to date in the background and will not terminate if a particular nftables
// call fails.
func NewProxier(ipFamily v1.IPFamily,
func NewProxier(ctx context.Context,
ipFamily v1.IPFamily,
sysctl utilsysctl.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
@@ -211,15 +215,17 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddressStrings []string,
initOnly bool,
) (*Proxier, error) {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "ipFamily", ipFamily)
if initOnly {
klog.InfoS("System initialized and --init-only specified")
logger.Info("System initialized and --init-only specified")
return nil, nil
}
// Generate the masquerade mark to use for SNAT rules.
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using nftables mark for masquerade", "ipFamily", ipFamily, "mark", masqueradeMark)
logger.V(2).Info("Using nftables mark for masquerade", "mark", masqueradeMark)
nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nodePortAddressStrings)
@@ -256,10 +262,11 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
staleChains: make(map[string]time.Time),
logger: logger,
}
burstSyncs := 2
klog.V(2).InfoS("NFTables sync params", "ipFamily", ipFamily, "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
logger.V(2).Info("NFTables sync params", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
return proxier, nil
@@ -516,11 +523,11 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
})
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
for _, ip := range nodeIPs {
if ip.IsLoopback() {
klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported", "address", ip.String())
proxier.logger.Error(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported", "address", ip.String())
continue
}
tx.Add(&knftables.Element{
@@ -642,7 +649,8 @@ func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) {
// CleanupLeftovers removes all nftables rules and chains created by the Proxier
// It returns true if an error was encountered. Errors are logged.
func CleanupLeftovers() bool {
func CleanupLeftovers(ctx context.Context) bool {
logger := klog.FromContext(ctx)
var encounteredError bool
for _, family := range []knftables.Family{knftables.IPv4Family, knftables.IPv6Family} {
@@ -650,10 +658,10 @@ func CleanupLeftovers() bool {
if err == nil {
tx := nft.NewTransaction()
tx.Delete(&knftables.Table{})
err = nft.Run(context.TODO(), tx)
err = nft.Run(ctx, tx)
}
if err != nil && !knftables.IsNotFound(err) {
klog.ErrorS(err, "Error cleaning up nftables rules")
logger.Error(err, "Error cleaning up nftables rules")
encounteredError = true
}
}
@@ -767,7 +775,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -782,7 +790,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
proxier.nodeLabels[k] = v
}
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
@@ -791,7 +799,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -806,7 +814,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
proxier.nodeLabels[k] = v
}
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)
proxier.Sync()
}
@@ -815,7 +823,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@@ -974,7 +982,7 @@ func (proxier *Proxier) syncProxyRules() {
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing nftables until Services and Endpoints have been received from master")
proxier.logger.V(2).Info("Not syncing nftables until Services and Endpoints have been received from master")
return
}
@@ -986,18 +994,18 @@ func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start))
}()
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
klog.V(2).InfoS("Syncing nftables rules")
proxier.logger.V(2).Info("Syncing nftables rules")
success := false
defer func() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
}
}()
@@ -1018,13 +1026,13 @@ func (proxier *Proxier) syncProxyRules() {
}
}
if deleted > 0 {
klog.InfoS("Deleting stale nftables chains", "numChains", deleted)
proxier.logger.Info("Deleting stale nftables chains", "numChains", deleted)
err := proxier.nftables.Run(context.TODO(), tx)
if err != nil {
// We already deleted the entries from staleChains, but if
// the chains still exist, they'll just get added back
// (with a later timestamp) at the end of the sync.
klog.ErrorS(err, "Unable to delete stale chains; will retry later")
proxier.logger.Error(err, "Unable to delete stale chains; will retry later")
// FIXME: metric
}
}
@@ -1082,7 +1090,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
@@ -1477,7 +1485,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
proxier.logger.Error(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep)
continue
}
@@ -1525,7 +1533,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
proxier.logger.Error(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}
@@ -1583,7 +1591,7 @@ func (proxier *Proxier) syncProxyRules() {
}
}
} else if !knftables.IsNotFound(err) {
klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
proxier.logger.Error(err, "Failed to list nftables chains: stale chains will not be deleted")
}
// OTOH, we can immediately delete any stale affinity sets
@@ -1597,11 +1605,11 @@ func (proxier *Proxier) syncProxyRules() {
}
}
} else if !knftables.IsNotFound(err) {
klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
}
// Sync rules.
klog.V(2).InfoS("Reloading service nftables data",
proxier.logger.V(2).Info("Reloading service nftables data",
"numServices", len(proxier.svcPortMap),
"numEndpoints", totalEndpoints,
)
@@ -1611,7 +1619,7 @@ func (proxier *Proxier) syncProxyRules() {
err = proxier.nftables.Run(context.TODO(), tx)
if err != nil {
klog.ErrorS(err, "nftables sync failed")
proxier.logger.Error(err, "nftables sync failed")
metrics.IptablesRestoreFailuresTotal.Inc()
return
}
@@ -1621,7 +1629,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
proxier.logger.V(4).Info("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}
@@ -1636,10 +1644,10 @@ func (proxier *Proxier) syncProxyRules() {
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
proxier.logger.Error(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
proxier.logger.Error(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping, clear stale conntrack entries for UDP Services

View File

@@ -17,6 +17,7 @@ limitations under the License.
package proxy
import (
"context"
"reflect"
"sync"
@@ -32,11 +33,13 @@ import (
type NodePodCIDRHandler struct {
mu sync.Mutex
podCIDRs []string
logger klog.Logger
}
func NewNodePodCIDRHandler(podCIDRs []string) *NodePodCIDRHandler {
func NewNodePodCIDRHandler(ctx context.Context, podCIDRs []string) *NodePodCIDRHandler {
return &NodePodCIDRHandler{
podCIDRs: podCIDRs,
logger: klog.FromContext(ctx),
}
}
@@ -50,12 +53,12 @@ func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) {
podCIDRs := node.Spec.PodCIDRs
// initialize podCIDRs
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
klog.InfoS("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.logger.Info("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.podCIDRs = podCIDRs
return
}
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
n.logger.Error(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
@@ -68,12 +71,12 @@ func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) {
podCIDRs := node.Spec.PodCIDRs
// initialize podCIDRs
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
klog.InfoS("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.logger.Info("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.podCIDRs = podCIDRs
return
}
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
n.logger.Error(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPODCIDRs", n.podCIDRs)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
@@ -81,7 +84,7 @@ func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) {
// OnNodeDelete is a handler for Node deletes.
func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) {
klog.ErrorS(nil, "Current Node is being deleted", "node", klog.KObj(node))
n.logger.Error(nil, "Current Node is being deleted", "node", klog.KObj(node))
}
// OnNodeSynced is a handler for Node syncs.

View File

@@ -19,6 +19,9 @@ package util
import (
"bytes"
"fmt"
"strings"
"github.com/go-logr/logr"
)
// LineBuffer is an interface for writing lines of input to a bytes.Buffer
@@ -46,6 +49,8 @@ type LineBuffer interface {
Lines() int
}
var _ logr.Marshaler = &realLineBuffer{}
type realLineBuffer struct {
b bytes.Buffer
lines int
@@ -108,6 +113,11 @@ func (buf *realLineBuffer) Lines() int {
return buf.lines
}
// Implements the logs.Marshaler interface
func (buf *realLineBuffer) MarshalLog() any {
return strings.Split(buf.b.String(), "\n")
}
type discardLineBuffer struct {
lines int
}