Added requestSloLatencies metric
This commit is contained in:
parent
7b9f4f18fe
commit
0afa569499
@ -43,6 +43,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
|
||||
webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request"
|
||||
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
webhookutil "k8s.io/apiserver/pkg/util/webhook"
|
||||
"k8s.io/apiserver/pkg/warning"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
@ -263,7 +264,13 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.Do(ctx).Into(response); err != nil {
|
||||
do := func() { err = r.Do(ctx).Into(response) }
|
||||
if wd, ok := endpointsrequest.WebhookDurationFrom(ctx); ok {
|
||||
tmp := do
|
||||
do = func() { wd.AdmitTracker.Track(tmp) }
|
||||
}
|
||||
do()
|
||||
if err != nil {
|
||||
var status *apierrors.StatusError
|
||||
if se, ok := err.(*apierrors.StatusError); ok {
|
||||
status = se
|
||||
|
@ -26,6 +26,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
clocktesting "k8s.io/utils/clock/testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -251,3 +253,69 @@ func TestAdmitCachedClient(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestWebhookDuration tests that MutatingWebhook#Admit sets webhook duration in context correctly
|
||||
func TestWebhookDuration(ts *testing.T) {
|
||||
clk := clocktesting.FakeClock{}
|
||||
testServer := webhooktesting.NewTestServerWithHandler(ts, webhooktesting.ClockSteppingWebhookHandler(ts, &clk))
|
||||
testServer.StartTLS()
|
||||
defer testServer.Close()
|
||||
serverURL, err := url.ParseRequestURI(testServer.URL)
|
||||
if err != nil {
|
||||
ts.Fatalf("this should never happen? %v", err)
|
||||
}
|
||||
|
||||
objectInterfaces := webhooktesting.NewObjectInterfacesForTest()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
for _, test := range webhooktesting.NewValidationDurationTestCases(serverURL) {
|
||||
ts.Run(test.Name, func(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
if test.InitContext {
|
||||
ctx = request.WithWebhookDurationAndCustomClock(ctx, &clk)
|
||||
}
|
||||
wh, err := NewMutatingWebhook(nil)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create mutating webhook: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
ns := "webhook-test"
|
||||
client, informer := webhooktesting.NewFakeMutatingDataSource(ns, webhooktesting.ConvertToMutatingWebhooks(test.Webhooks), stopCh)
|
||||
|
||||
wh.SetAuthenticationInfoResolverWrapper(webhooktesting.Wrapper(webhooktesting.NewAuthenticationInfoResolver(new(int32))))
|
||||
wh.SetServiceResolver(webhooktesting.NewServiceResolver(*serverURL))
|
||||
wh.SetExternalKubeClientSet(client)
|
||||
wh.SetExternalKubeInformerFactory(informer)
|
||||
|
||||
informer.Start(stopCh)
|
||||
informer.WaitForCacheSync(stopCh)
|
||||
|
||||
if err = wh.ValidateInitialization(); err != nil {
|
||||
t.Errorf("failed to validate initialization: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
_ = wh.Admit(ctx, webhooktesting.NewAttribute(ns, nil, test.IsDryRun), objectInterfaces)
|
||||
wd, ok := request.WebhookDurationFrom(ctx)
|
||||
if !ok {
|
||||
if test.InitContext {
|
||||
t.Errorf("expected webhook duration to be initialized")
|
||||
}
|
||||
return
|
||||
}
|
||||
if !test.InitContext {
|
||||
t.Errorf("expected webhook duration to not be initialized")
|
||||
return
|
||||
}
|
||||
if wd.AdmitTracker.GetLatency() != test.ExpectedDurationSum {
|
||||
t.Errorf("expected admit duration %q got %q", test.ExpectedDurationSum, wd.AdmitTracker.GetLatency())
|
||||
}
|
||||
if wd.ValidateTracker.GetLatency() != 0 {
|
||||
t.Errorf("expected validate duraion to be equal to 0 got %q", wd.ValidateTracker.GetLatency())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
registrationv1 "k8s.io/api/admissionregistration/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@ -243,6 +244,17 @@ type MutatingTest struct {
|
||||
ExpectReinvokeWebhooks map[string]bool
|
||||
}
|
||||
|
||||
// DurationTest is webhook duration test case, used both in mutating and
|
||||
// validating plugin test cases.
|
||||
type DurationTest struct {
|
||||
Name string
|
||||
Webhooks []registrationv1.ValidatingWebhook
|
||||
InitContext bool
|
||||
IsDryRun bool
|
||||
ExpectedDurationSum time.Duration
|
||||
ExpectedDurationMax time.Duration
|
||||
}
|
||||
|
||||
// ConvertToMutatingTestCases converts a validating test case to a mutating one for test purposes.
|
||||
func ConvertToMutatingTestCases(tests []ValidatingTest, configurationName string) []MutatingTest {
|
||||
r := make([]MutatingTest, len(tests))
|
||||
@ -1069,3 +1081,67 @@ func NewObjectInterfacesForTest() admission.ObjectInterfaces {
|
||||
corev1.AddToScheme(scheme)
|
||||
return admission.NewObjectInterfacesFromScheme(scheme)
|
||||
}
|
||||
|
||||
// NewValidationDurationTestCases returns test cases for webhook duration test
|
||||
func NewValidationDurationTestCases(url *url.URL) []DurationTest {
|
||||
ccfgURL := urlConfigGenerator{url}.ccfgURL
|
||||
webhooks := []registrationv1.ValidatingWebhook{
|
||||
{
|
||||
Name: "allow match",
|
||||
ClientConfig: ccfgURL("allow/100"),
|
||||
Rules: matchEverythingRules,
|
||||
NamespaceSelector: &metav1.LabelSelector{},
|
||||
ObjectSelector: &metav1.LabelSelector{},
|
||||
AdmissionReviewVersions: []string{"v1beta1"},
|
||||
},
|
||||
{
|
||||
Name: "allow no match",
|
||||
ClientConfig: ccfgURL("allow/200"),
|
||||
NamespaceSelector: &metav1.LabelSelector{},
|
||||
ObjectSelector: &metav1.LabelSelector{},
|
||||
AdmissionReviewVersions: []string{"v1beta1"},
|
||||
},
|
||||
{
|
||||
Name: "disallow match",
|
||||
ClientConfig: ccfgURL("disallow/400"),
|
||||
Rules: matchEverythingRules,
|
||||
NamespaceSelector: &metav1.LabelSelector{},
|
||||
ObjectSelector: &metav1.LabelSelector{},
|
||||
AdmissionReviewVersions: []string{"v1beta1"},
|
||||
},
|
||||
{
|
||||
Name: "disallow no match",
|
||||
ClientConfig: ccfgURL("disallow/800"),
|
||||
NamespaceSelector: &metav1.LabelSelector{},
|
||||
ObjectSelector: &metav1.LabelSelector{},
|
||||
AdmissionReviewVersions: []string{"v1beta1"},
|
||||
},
|
||||
}
|
||||
|
||||
return []DurationTest{
|
||||
{
|
||||
Name: "duration test",
|
||||
IsDryRun: false,
|
||||
InitContext: true,
|
||||
Webhooks: webhooks,
|
||||
ExpectedDurationSum: 500,
|
||||
ExpectedDurationMax: 400,
|
||||
},
|
||||
{
|
||||
Name: "duration dry run",
|
||||
IsDryRun: true,
|
||||
InitContext: true,
|
||||
Webhooks: webhooks,
|
||||
ExpectedDurationSum: 0,
|
||||
ExpectedDurationMax: 0,
|
||||
},
|
||||
{
|
||||
Name: "duration no init",
|
||||
IsDryRun: false,
|
||||
InitContext: false,
|
||||
Webhooks: webhooks,
|
||||
ExpectedDurationSum: 0,
|
||||
ExpectedDurationMax: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -22,15 +22,21 @@ import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/admission/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
// NewTestServer returns a webhook test HTTPS server with fixed webhook test certs.
|
||||
func NewTestServer(t testing.TB) *httptest.Server {
|
||||
// NewTestServerWithHandler returns a webhook test HTTPS server
|
||||
// which uses given handler function to handle requests
|
||||
func NewTestServerWithHandler(t testing.TB, handler func(http.ResponseWriter, *http.Request)) *httptest.Server {
|
||||
// Create the test webhook server
|
||||
sCert, err := tls.X509KeyPair(testcerts.ServerCert, testcerts.ServerKey)
|
||||
if err != nil {
|
||||
@ -39,7 +45,7 @@ func NewTestServer(t testing.TB) *httptest.Server {
|
||||
}
|
||||
rootCAs := x509.NewCertPool()
|
||||
rootCAs.AppendCertsFromPEM(testcerts.CACert)
|
||||
testServer := httptest.NewUnstartedServer(http.HandlerFunc(webhookHandler))
|
||||
testServer := httptest.NewUnstartedServer(http.HandlerFunc(handler))
|
||||
testServer.TLS = &tls.Config{
|
||||
Certificates: []tls.Certificate{sCert},
|
||||
ClientCAs: rootCAs,
|
||||
@ -48,6 +54,11 @@ func NewTestServer(t testing.TB) *httptest.Server {
|
||||
return testServer
|
||||
}
|
||||
|
||||
// NewTestServer returns a webhook test HTTPS server with fixed webhook test certs.
|
||||
func NewTestServer(t testing.TB) *httptest.Server {
|
||||
return NewTestServerWithHandler(t, webhookHandler)
|
||||
}
|
||||
|
||||
func webhookHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// fmt.Printf("got req: %v\n", r.URL.Path)
|
||||
switch r.URL.Path {
|
||||
@ -160,3 +171,41 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
// ClockSteppingWebhookHandler given a fakeClock returns a request handler
|
||||
// that moves time in given clock by an amount specified in the webhook request
|
||||
func ClockSteppingWebhookHandler(t testing.TB, fakeClock *testingclock.FakeClock) func(http.ResponseWriter, *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
path := r.URL.Path
|
||||
validPath := regexp.MustCompile(`^/(?:allow|disallow)/(\d{1,10})$`)
|
||||
|
||||
if !validPath.MatchString(path) {
|
||||
t.Errorf("error in test case, wrong webhook url path: '%q' expected to match: '%q'", path, validPath.String())
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
delay, _ := strconv.ParseInt(validPath.FindStringSubmatch(path)[1], 0, 64)
|
||||
fakeClock.Step(time.Duration(delay))
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
if strings.HasPrefix(path, "/allow/") {
|
||||
json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
|
||||
Response: &v1beta1.AdmissionResponse{
|
||||
Allowed: true,
|
||||
AuditAnnotations: map[string]string{
|
||||
"key1": "value1",
|
||||
},
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
|
||||
Response: &v1beta1.AdmissionResponse{
|
||||
Allowed: false,
|
||||
Result: &metav1.Status{
|
||||
Code: http.StatusForbidden,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
webhookerrors "k8s.io/apiserver/pkg/admission/plugin/webhook/errors"
|
||||
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
|
||||
webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request"
|
||||
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
webhookutil "k8s.io/apiserver/pkg/util/webhook"
|
||||
"k8s.io/apiserver/pkg/warning"
|
||||
"k8s.io/klog/v2"
|
||||
@ -230,7 +231,13 @@ func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWeb
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.Do(ctx).Into(response); err != nil {
|
||||
do := func() { err = r.Do(ctx).Into(response) }
|
||||
if wd, ok := endpointsrequest.WebhookDurationFrom(ctx); ok {
|
||||
tmp := do
|
||||
do = func() { wd.ValidateTracker.Track(tmp) }
|
||||
}
|
||||
do()
|
||||
if err != nil {
|
||||
var status *apierrors.StatusError
|
||||
if se, ok := err.(*apierrors.StatusError); ok {
|
||||
status = se
|
||||
|
@ -24,6 +24,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
clocktesting "k8s.io/utils/clock/testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
webhooktesting "k8s.io/apiserver/pkg/admission/plugin/webhook/testing"
|
||||
@ -213,3 +215,69 @@ func TestValidateCachedClient(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidateWebhookDuration tests that ValidatingWebhook#Validate sets webhook duration in context correctly
|
||||
func TestValidateWebhookDuration(ts *testing.T) {
|
||||
clk := clocktesting.FakeClock{}
|
||||
testServer := webhooktesting.NewTestServerWithHandler(ts, webhooktesting.ClockSteppingWebhookHandler(ts, &clk))
|
||||
testServer.StartTLS()
|
||||
defer testServer.Close()
|
||||
serverURL, err := url.ParseRequestURI(testServer.URL)
|
||||
if err != nil {
|
||||
ts.Fatalf("this should never happen? %v", err)
|
||||
}
|
||||
|
||||
objectInterfaces := webhooktesting.NewObjectInterfacesForTest()
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
for _, test := range webhooktesting.NewValidationDurationTestCases(serverURL) {
|
||||
ts.Run(test.Name, func(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
if test.InitContext {
|
||||
ctx = request.WithWebhookDurationAndCustomClock(ctx, &clk)
|
||||
}
|
||||
wh, err := NewValidatingAdmissionWebhook(nil)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create mutating webhook: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
ns := "webhook-test"
|
||||
client, informer := webhooktesting.NewFakeValidatingDataSource(ns, test.Webhooks, stopCh)
|
||||
|
||||
wh.SetAuthenticationInfoResolverWrapper(webhooktesting.Wrapper(webhooktesting.NewAuthenticationInfoResolver(new(int32))))
|
||||
wh.SetServiceResolver(webhooktesting.NewServiceResolver(*serverURL))
|
||||
wh.SetExternalKubeClientSet(client)
|
||||
wh.SetExternalKubeInformerFactory(informer)
|
||||
|
||||
informer.Start(stopCh)
|
||||
informer.WaitForCacheSync(stopCh)
|
||||
|
||||
if err = wh.ValidateInitialization(); err != nil {
|
||||
t.Errorf("failed to validate initialization: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
_ = wh.Validate(ctx, webhooktesting.NewAttribute(ns, nil, test.IsDryRun), objectInterfaces)
|
||||
wd, ok := request.WebhookDurationFrom(ctx)
|
||||
if !ok {
|
||||
if test.InitContext {
|
||||
t.Errorf("expected webhook duration to be initialized")
|
||||
}
|
||||
return
|
||||
}
|
||||
if !test.InitContext {
|
||||
t.Errorf("expected webhook duration to not be initialized")
|
||||
return
|
||||
}
|
||||
if wd.AdmitTracker.GetLatency() != 0 {
|
||||
t.Errorf("expected admit duration to be equal to 0 got %q", wd.AdmitTracker.GetLatency())
|
||||
}
|
||||
if wd.ValidateTracker.GetLatency() < test.ExpectedDurationMax {
|
||||
t.Errorf("expected validate duraion to be greater or equal to %q got %q", test.ExpectedDurationMax, wd.ValidateTracker.GetLatency())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package filters
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
// WithWebhookDuration adds WebhookDuration trackers to the
|
||||
// context associated with a request.
|
||||
func WithWebhookDuration(handler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
ctx := req.Context()
|
||||
req = req.WithContext(request.WithWebhookDuration(ctx))
|
||||
handler.ServeHTTP(w, req)
|
||||
})
|
||||
}
|
@ -109,6 +109,19 @@ var (
|
||||
},
|
||||
[]string{"verb", "dry_run", "group", "version", "resource", "subresource", "scope", "component"},
|
||||
)
|
||||
requestSloLatencies = compbasemetrics.NewHistogramVec(
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Name: "apiserver_request_slo_duration_seconds",
|
||||
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.",
|
||||
// This metric is supplementary to the requestLatencies metric.
|
||||
// It measures request duration excluding webhooks as they are mostly
|
||||
// dependant on user configuration.
|
||||
Buckets: []float64{0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3,
|
||||
4, 5, 6, 8, 10, 15, 20, 30, 45, 60},
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
},
|
||||
[]string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
|
||||
)
|
||||
responseSizes = compbasemetrics.NewHistogramVec(
|
||||
&compbasemetrics.HistogramOpts{
|
||||
Name: "apiserver_response_sizes",
|
||||
@ -246,6 +259,7 @@ var (
|
||||
longRunningRequestsGauge,
|
||||
longRunningRequestGauge,
|
||||
requestLatencies,
|
||||
requestSloLatencies,
|
||||
responseSizes,
|
||||
DroppedRequests,
|
||||
TLSHandshakeErrors,
|
||||
@ -468,6 +482,10 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
|
||||
}
|
||||
}
|
||||
requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
|
||||
if wd, ok := request.WebhookDurationFrom(req.Context()); ok {
|
||||
sloLatency := elapsedSeconds - (wd.AdmitTracker.GetLatency() + wd.ValidateTracker.GetLatency()).Seconds()
|
||||
requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sloLatency)
|
||||
}
|
||||
// We are only interested in response sizes of read requests.
|
||||
if verb == "GET" || verb == "LIST" {
|
||||
responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
|
||||
|
@ -0,0 +1,122 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package request
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
func sumDuration(d1 time.Duration, d2 time.Duration) time.Duration {
|
||||
return d1 + d2
|
||||
}
|
||||
|
||||
func maxDuration(d1 time.Duration, d2 time.Duration) time.Duration {
|
||||
if d1 > d2 {
|
||||
return d1
|
||||
}
|
||||
return d2
|
||||
}
|
||||
|
||||
// DurationTracker is a simple interface for tracking functions duration
|
||||
type DurationTracker interface {
|
||||
Track(func())
|
||||
GetLatency() time.Duration
|
||||
}
|
||||
|
||||
// durationTracker implements DurationTracker by measuring function time
|
||||
// using given clock and aggregates the duration using given aggregate function
|
||||
type durationTracker struct {
|
||||
clock clock.Clock
|
||||
latency time.Duration
|
||||
mu sync.Mutex
|
||||
aggregateFunction func(time.Duration, time.Duration) time.Duration
|
||||
}
|
||||
|
||||
// Track measures time spent in given function and aggregates measured
|
||||
// duration using aggregateFunction
|
||||
func (t *durationTracker) Track(f func()) {
|
||||
startedAt := t.clock.Now()
|
||||
defer func() {
|
||||
duration := t.clock.Since(startedAt)
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.latency = t.aggregateFunction(t.latency, duration)
|
||||
}()
|
||||
|
||||
f()
|
||||
}
|
||||
|
||||
// GetLatency returns aggregated latency tracked by a tracker
|
||||
func (t *durationTracker) GetLatency() time.Duration {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.latency
|
||||
}
|
||||
|
||||
func newSumLatencyTracker(c clock.Clock) DurationTracker {
|
||||
return &durationTracker{
|
||||
clock: c,
|
||||
aggregateFunction: sumDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func newMaxLatencyTracker(c clock.Clock) DurationTracker {
|
||||
return &durationTracker{
|
||||
clock: c,
|
||||
aggregateFunction: maxDuration,
|
||||
}
|
||||
}
|
||||
|
||||
// WebhookDuration stores trackers used to measure webhook request durations.
|
||||
// Since admit webhooks are done sequentially duration is aggregated using
|
||||
// sum function. Validate webhooks are done in parallel so max function
|
||||
// is used.
|
||||
type WebhookDuration struct {
|
||||
AdmitTracker DurationTracker
|
||||
ValidateTracker DurationTracker
|
||||
}
|
||||
|
||||
type webhookDurationKeyType int
|
||||
|
||||
// webhookDurationKey is the WebhookDuration (the time the request spent waiting
|
||||
// for the webhooks to finish) key for the context.
|
||||
const webhookDurationKey webhookDurationKeyType = iota
|
||||
|
||||
// WithWebhookDuration returns a copy of parent context to which the
|
||||
// WebhookDuration trackers are added.
|
||||
func WithWebhookDuration(parent context.Context) context.Context {
|
||||
return WithWebhookDurationAndCustomClock(parent, clock.RealClock{})
|
||||
}
|
||||
|
||||
// WithWebhookDurationAndCustomClock returns a copy of parent context to which
|
||||
// the WebhookDuration trackers are added. Tracers use given clock.
|
||||
func WithWebhookDurationAndCustomClock(parent context.Context, c clock.Clock) context.Context {
|
||||
return WithValue(parent, webhookDurationKey, &WebhookDuration{
|
||||
AdmitTracker: newSumLatencyTracker(c),
|
||||
ValidateTracker: newMaxLatencyTracker(c),
|
||||
})
|
||||
}
|
||||
|
||||
// WebhookDurationFrom returns the value of the WebhookDuration key from the specified context.
|
||||
func WebhookDurationFrom(ctx context.Context) (*WebhookDuration, bool) {
|
||||
wd, ok := ctx.Value(webhookDurationKey).(*WebhookDuration)
|
||||
return wd, ok && wd != nil
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package request
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clocktesting "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
func TestWebhookDurationFrom(t *testing.T) {
|
||||
type testCase struct {
|
||||
Durations []time.Duration
|
||||
SumDurations time.Duration
|
||||
MaxDuration time.Duration
|
||||
}
|
||||
tc := testCase{
|
||||
Durations: []time.Duration{100, 200, 300, 200, 400, 300, 100},
|
||||
SumDurations: 1600,
|
||||
MaxDuration: 400,
|
||||
}
|
||||
t.Run("TestWebhookDurationFrom", func(t *testing.T) {
|
||||
parent := context.TODO()
|
||||
_, ok := WebhookDurationFrom(parent)
|
||||
if ok {
|
||||
t.Error("expected WebhookDurationFrom to not be initialized")
|
||||
}
|
||||
|
||||
clk := clocktesting.FakeClock{}
|
||||
ctx := WithWebhookDurationAndCustomClock(parent, &clk)
|
||||
wd, ok := WebhookDurationFrom(ctx)
|
||||
if !ok {
|
||||
t.Error("expected webhook duration to be initialized")
|
||||
}
|
||||
if wd.AdmitTracker.GetLatency() != 0 || wd.ValidateTracker.GetLatency() != 0 {
|
||||
t.Error("expected values to be initialized to 0")
|
||||
}
|
||||
|
||||
for _, d := range tc.Durations {
|
||||
wd.AdmitTracker.Track(func() { clk.Step(d) })
|
||||
wd.ValidateTracker.Track(func() { clk.Step(d) })
|
||||
}
|
||||
|
||||
wd, ok = WebhookDurationFrom(ctx)
|
||||
if !ok {
|
||||
t.Errorf("expected webhook duration to be initialized")
|
||||
}
|
||||
|
||||
if wd.AdmitTracker.GetLatency() != tc.SumDurations {
|
||||
t.Errorf("expected admit duration: %q, but got: %q", tc.SumDurations, wd.AdmitTracker.GetLatency())
|
||||
}
|
||||
|
||||
if wd.ValidateTracker.GetLatency() != tc.MaxDuration {
|
||||
t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidateTracker.GetLatency())
|
||||
}
|
||||
})
|
||||
}
|
@ -762,7 +762,8 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c
|
||||
}
|
||||
|
||||
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||
handler := filterlatency.TrackCompleted(apiHandler)
|
||||
handler := genericapifilters.WithWebhookDuration(apiHandler)
|
||||
handler = filterlatency.TrackCompleted(handler)
|
||||
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
|
||||
handler = filterlatency.TrackStarted(handler, "authorization")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user