kube-proxy network programming latency on restarts
kube-proxy expose the metric network_programming_duration_seconds, that is defined as the time it takes to program the network since a a service or pod has changed. It uses an annotation on the endpoints /endpointslices to calculate when the endpoint was created, however, on restarts, kube-proxy process all the endpoints again, no matter when those were generated, polluting the metrics. To be safe, kube-proxy will estimate the latency only for those endpoints that were generated after it started.
This commit is contained in:
@@ -164,6 +164,11 @@ type EndpointChangeTracker struct {
|
|||||||
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
|
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
|
||||||
// object to change. Used to calculate the network-programming-latency.
|
// object to change. Used to calculate the network-programming-latency.
|
||||||
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
||||||
|
// record the time when the endpointChangeTracker was created so we can ignore the endpoints
|
||||||
|
// that were generated before, because we can't estimate the network-programming-latency on those.
|
||||||
|
// This is specially problematic on restarts, because we process all the endpoints that may have been
|
||||||
|
// created hours or days before.
|
||||||
|
trackerStartTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEndpointChangeTracker initializes an EndpointsChangeMap
|
// NewEndpointChangeTracker initializes an EndpointsChangeMap
|
||||||
@@ -175,6 +180,7 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
|
|||||||
ipFamily: ipFamily,
|
ipFamily: ipFamily,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||||
|
trackerStartTime: time.Now(),
|
||||||
processEndpointsMapChange: processEndpointsMapChange,
|
processEndpointsMapChange: processEndpointsMapChange,
|
||||||
}
|
}
|
||||||
if endpointSlicesEnabled {
|
if endpointSlicesEnabled {
|
||||||
@@ -216,7 +222,7 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
|
|||||||
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
|
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
|
||||||
// by-definition coming from the time of last update, which is not what
|
// by-definition coming from the time of last update, which is not what
|
||||||
// we want to measure. So we simply ignore it in this cases.
|
// we want to measure. So we simply ignore it in this cases.
|
||||||
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil {
|
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) {
|
||||||
ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t)
|
ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,7 +282,7 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
|
|||||||
// we want to measure. So we simply ignore it in this cases.
|
// we want to measure. So we simply ignore it in this cases.
|
||||||
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
|
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
|
||||||
// when other EndpointSlice for that service still exist.
|
// when other EndpointSlice for that service still exist.
|
||||||
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice {
|
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) {
|
||||||
ect.lastChangeTriggerTimes[namespacedName] =
|
ect.lastChangeTriggerTimes[namespacedName] =
|
||||||
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||||
}
|
}
|
||||||
|
@@ -23,7 +23,7 @@ import (
|
|||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
@@ -1294,7 +1294,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
|
|
||||||
for tci, tc := range testCases {
|
for tci, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
fp := newFakeProxier(v1.IPv4Protocol)
|
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
|
||||||
fp.hostname = nodeName
|
fp.hostname = nodeName
|
||||||
|
|
||||||
// First check that after adding all previous versions of endpoints,
|
// First check that after adding all previous versions of endpoints,
|
||||||
@@ -1364,7 +1364,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestLastChangeTriggerTime(t *testing.T) {
|
func TestLastChangeTriggerTime(t *testing.T) {
|
||||||
t0 := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
|
startTime := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
|
||||||
|
t_1 := startTime.Add(-time.Second)
|
||||||
|
t0 := startTime.Add(time.Second)
|
||||||
t1 := t0.Add(time.Second)
|
t1 := t0.Add(time.Second)
|
||||||
t2 := t1.Add(time.Second)
|
t2 := t1.Add(time.Second)
|
||||||
t3 := t2.Add(time.Second)
|
t3 := t2.Add(time.Second)
|
||||||
@@ -1438,6 +1440,14 @@ func TestLastChangeTriggerTime(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expected: map[types.NamespacedName][]time.Time{},
|
expected: map[types.NamespacedName][]time.Time{},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "Endpoints create before tracker started",
|
||||||
|
scenario: func(fp *FakeProxier) {
|
||||||
|
e := createEndpoints("ns", "ep1", t_1)
|
||||||
|
fp.addEndpoints(e)
|
||||||
|
},
|
||||||
|
expected: map[types.NamespacedName][]time.Time{},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "addEndpoints then deleteEndpoints",
|
name: "addEndpoints then deleteEndpoints",
|
||||||
scenario: func(fp *FakeProxier) {
|
scenario: func(fp *FakeProxier) {
|
||||||
@@ -1469,7 +1479,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
fp := newFakeProxier(v1.IPv4Protocol)
|
fp := newFakeProxier(v1.IPv4Protocol, startTime)
|
||||||
|
|
||||||
tc.scenario(fp)
|
tc.scenario(fp)
|
||||||
|
|
||||||
|
@@ -20,10 +20,11 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
@@ -511,12 +512,21 @@ type FakeProxier struct {
|
|||||||
hostname string
|
hostname string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFakeProxier(ipFamily v1.IPFamily) *FakeProxier {
|
func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
|
||||||
return &FakeProxier{
|
return &FakeProxier{
|
||||||
serviceMap: make(ServiceMap),
|
serviceMap: make(ServiceMap),
|
||||||
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
|
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
|
||||||
endpointsMap: make(EndpointsMap),
|
endpointsMap: make(EndpointsMap),
|
||||||
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, false, nil),
|
endpointsChanges: &EndpointChangeTracker{
|
||||||
|
hostname: testHostname,
|
||||||
|
items: make(map[types.NamespacedName]*endpointsChange),
|
||||||
|
makeEndpointInfo: nil,
|
||||||
|
ipFamily: ipFamily,
|
||||||
|
recorder: nil,
|
||||||
|
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||||
|
trackerStartTime: t,
|
||||||
|
processEndpointsMapChange: nil,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -539,7 +549,7 @@ func (fake *FakeProxier) deleteService(service *v1.Service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestServiceMapUpdateHeadless(t *testing.T) {
|
func TestServiceMapUpdateHeadless(t *testing.T) {
|
||||||
fp := newFakeProxier(v1.IPv4Protocol)
|
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
|
||||||
|
|
||||||
makeServiceMap(fp,
|
makeServiceMap(fp,
|
||||||
makeTestService("ns2", "headless", func(svc *v1.Service) {
|
makeTestService("ns2", "headless", func(svc *v1.Service) {
|
||||||
@@ -570,7 +580,7 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateServiceTypeExternalName(t *testing.T) {
|
func TestUpdateServiceTypeExternalName(t *testing.T) {
|
||||||
fp := newFakeProxier(v1.IPv4Protocol)
|
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
|
||||||
|
|
||||||
makeServiceMap(fp,
|
makeServiceMap(fp,
|
||||||
makeTestService("ns2", "external-name", func(svc *v1.Service) {
|
makeTestService("ns2", "external-name", func(svc *v1.Service) {
|
||||||
@@ -595,7 +605,7 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildServiceMapAddRemove(t *testing.T) {
|
func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||||
fp := newFakeProxier(v1.IPv4Protocol)
|
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
|
||||||
|
|
||||||
services := []*v1.Service{
|
services := []*v1.Service{
|
||||||
makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
|
makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
|
||||||
@@ -698,7 +708,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||||
fp := newFakeProxier(v1.IPv4Protocol)
|
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
|
||||||
|
|
||||||
servicev1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
|
servicev1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
|
||||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||||
|
Reference in New Issue
Block a user