Merge pull request #105890 from pawbana/added_request_slo_latency_metric

Added requestSloLatencies metric
This commit is contained in:
Kubernetes Prow Robot 2021-11-15 04:10:47 -08:00 committed by GitHub
commit 1e6f3b5cd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 528 additions and 6 deletions

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
utiltrace "k8s.io/utils/trace"
@ -263,7 +264,13 @@ func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admiss
}
}
if err := r.Do(ctx).Into(response); err != nil {
do := func() { err = r.Do(ctx).Into(response) }
if wd, ok := endpointsrequest.WebhookDurationFrom(ctx); ok {
tmp := do
do = func() { wd.AdmitTracker.Track(tmp) }
}
do()
if err != nil {
var status *apierrors.StatusError
if se, ok := err.(*apierrors.StatusError); ok {
status = se

View File

@ -26,6 +26,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apiserver/pkg/endpoints/request"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -251,3 +253,69 @@ func TestAdmitCachedClient(t *testing.T) {
}
}
}
// TestWebhookDuration tests that MutatingWebhook#Admit sets webhook duration in context correctly
func TestWebhookDuration(ts *testing.T) {
clk := clocktesting.FakeClock{}
testServer := webhooktesting.NewTestServerWithHandler(ts, webhooktesting.ClockSteppingWebhookHandler(ts, &clk))
testServer.StartTLS()
defer testServer.Close()
serverURL, err := url.ParseRequestURI(testServer.URL)
if err != nil {
ts.Fatalf("this should never happen? %v", err)
}
objectInterfaces := webhooktesting.NewObjectInterfacesForTest()
stopCh := make(chan struct{})
defer close(stopCh)
for _, test := range webhooktesting.NewValidationDurationTestCases(serverURL) {
ts.Run(test.Name, func(t *testing.T) {
ctx := context.TODO()
if test.InitContext {
ctx = request.WithWebhookDurationAndCustomClock(ctx, &clk)
}
wh, err := NewMutatingWebhook(nil)
if err != nil {
t.Errorf("failed to create mutating webhook: %v", err)
return
}
ns := "webhook-test"
client, informer := webhooktesting.NewFakeMutatingDataSource(ns, webhooktesting.ConvertToMutatingWebhooks(test.Webhooks), stopCh)
wh.SetAuthenticationInfoResolverWrapper(webhooktesting.Wrapper(webhooktesting.NewAuthenticationInfoResolver(new(int32))))
wh.SetServiceResolver(webhooktesting.NewServiceResolver(*serverURL))
wh.SetExternalKubeClientSet(client)
wh.SetExternalKubeInformerFactory(informer)
informer.Start(stopCh)
informer.WaitForCacheSync(stopCh)
if err = wh.ValidateInitialization(); err != nil {
t.Errorf("failed to validate initialization: %v", err)
return
}
_ = wh.Admit(ctx, webhooktesting.NewAttribute(ns, nil, test.IsDryRun), objectInterfaces)
wd, ok := request.WebhookDurationFrom(ctx)
if !ok {
if test.InitContext {
t.Errorf("expected webhook duration to be initialized")
}
return
}
if !test.InitContext {
t.Errorf("expected webhook duration to not be initialized")
return
}
if wd.AdmitTracker.GetLatency() != test.ExpectedDurationSum {
t.Errorf("expected admit duration %q got %q", test.ExpectedDurationSum, wd.AdmitTracker.GetLatency())
}
if wd.ValidateTracker.GetLatency() != 0 {
t.Errorf("expected validate duraion to be equal to 0 got %q", wd.ValidateTracker.GetLatency())
}
})
}
}

View File

@ -23,6 +23,7 @@ import (
"reflect"
"strings"
"sync"
"time"
registrationv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
@ -243,6 +244,17 @@ type MutatingTest struct {
ExpectReinvokeWebhooks map[string]bool
}
// DurationTest is webhook duration test case, used both in mutating and
// validating plugin test cases.
type DurationTest struct {
Name string
Webhooks []registrationv1.ValidatingWebhook
InitContext bool
IsDryRun bool
ExpectedDurationSum time.Duration
ExpectedDurationMax time.Duration
}
// ConvertToMutatingTestCases converts a validating test case to a mutating one for test purposes.
func ConvertToMutatingTestCases(tests []ValidatingTest, configurationName string) []MutatingTest {
r := make([]MutatingTest, len(tests))
@ -1069,3 +1081,67 @@ func NewObjectInterfacesForTest() admission.ObjectInterfaces {
corev1.AddToScheme(scheme)
return admission.NewObjectInterfacesFromScheme(scheme)
}
// NewValidationDurationTestCases returns test cases for webhook duration test
func NewValidationDurationTestCases(url *url.URL) []DurationTest {
ccfgURL := urlConfigGenerator{url}.ccfgURL
webhooks := []registrationv1.ValidatingWebhook{
{
Name: "allow match",
ClientConfig: ccfgURL("allow/100"),
Rules: matchEverythingRules,
NamespaceSelector: &metav1.LabelSelector{},
ObjectSelector: &metav1.LabelSelector{},
AdmissionReviewVersions: []string{"v1beta1"},
},
{
Name: "allow no match",
ClientConfig: ccfgURL("allow/200"),
NamespaceSelector: &metav1.LabelSelector{},
ObjectSelector: &metav1.LabelSelector{},
AdmissionReviewVersions: []string{"v1beta1"},
},
{
Name: "disallow match",
ClientConfig: ccfgURL("disallow/400"),
Rules: matchEverythingRules,
NamespaceSelector: &metav1.LabelSelector{},
ObjectSelector: &metav1.LabelSelector{},
AdmissionReviewVersions: []string{"v1beta1"},
},
{
Name: "disallow no match",
ClientConfig: ccfgURL("disallow/800"),
NamespaceSelector: &metav1.LabelSelector{},
ObjectSelector: &metav1.LabelSelector{},
AdmissionReviewVersions: []string{"v1beta1"},
},
}
return []DurationTest{
{
Name: "duration test",
IsDryRun: false,
InitContext: true,
Webhooks: webhooks,
ExpectedDurationSum: 500,
ExpectedDurationMax: 400,
},
{
Name: "duration dry run",
IsDryRun: true,
InitContext: true,
Webhooks: webhooks,
ExpectedDurationSum: 0,
ExpectedDurationMax: 0,
},
{
Name: "duration no init",
IsDryRun: false,
InitContext: false,
Webhooks: webhooks,
ExpectedDurationSum: 0,
ExpectedDurationMax: 0,
},
}
}

View File

@ -22,15 +22,21 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"strings"
"testing"
"time"
"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/admission/plugin/webhook/testcerts"
testingclock "k8s.io/utils/clock/testing"
)
// NewTestServer returns a webhook test HTTPS server with fixed webhook test certs.
func NewTestServer(t testing.TB) *httptest.Server {
// NewTestServerWithHandler returns a webhook test HTTPS server
// which uses given handler function to handle requests
func NewTestServerWithHandler(t testing.TB, handler func(http.ResponseWriter, *http.Request)) *httptest.Server {
// Create the test webhook server
sCert, err := tls.X509KeyPair(testcerts.ServerCert, testcerts.ServerKey)
if err != nil {
@ -39,7 +45,7 @@ func NewTestServer(t testing.TB) *httptest.Server {
}
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(testcerts.CACert)
testServer := httptest.NewUnstartedServer(http.HandlerFunc(webhookHandler))
testServer := httptest.NewUnstartedServer(http.HandlerFunc(handler))
testServer.TLS = &tls.Config{
Certificates: []tls.Certificate{sCert},
ClientCAs: rootCAs,
@ -48,6 +54,11 @@ func NewTestServer(t testing.TB) *httptest.Server {
return testServer
}
// NewTestServer returns a webhook test HTTPS server with fixed webhook test certs.
func NewTestServer(t testing.TB) *httptest.Server {
return NewTestServerWithHandler(t, webhookHandler)
}
func webhookHandler(w http.ResponseWriter, r *http.Request) {
// fmt.Printf("got req: %v\n", r.URL.Path)
switch r.URL.Path {
@ -160,3 +171,41 @@ func webhookHandler(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}
}
// ClockSteppingWebhookHandler given a fakeClock returns a request handler
// that moves time in given clock by an amount specified in the webhook request
func ClockSteppingWebhookHandler(t testing.TB, fakeClock *testingclock.FakeClock) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
validPath := regexp.MustCompile(`^/(?:allow|disallow)/(\d{1,10})$`)
if !validPath.MatchString(path) {
t.Errorf("error in test case, wrong webhook url path: '%q' expected to match: '%q'", path, validPath.String())
t.FailNow()
}
delay, _ := strconv.ParseInt(validPath.FindStringSubmatch(path)[1], 0, 64)
fakeClock.Step(time.Duration(delay))
w.Header().Set("Content-Type", "application/json")
if strings.HasPrefix(path, "/allow/") {
json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
Response: &v1beta1.AdmissionResponse{
Allowed: true,
AuditAnnotations: map[string]string{
"key1": "value1",
},
},
})
return
}
json.NewEncoder(w).Encode(&v1beta1.AdmissionReview{
Response: &v1beta1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
Code: http.StatusForbidden,
},
},
})
}
}

View File

@ -32,6 +32,7 @@ import (
webhookerrors "k8s.io/apiserver/pkg/admission/plugin/webhook/errors"
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
webhookrequest "k8s.io/apiserver/pkg/admission/plugin/webhook/request"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
webhookutil "k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
"k8s.io/klog/v2"
@ -230,7 +231,13 @@ func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWeb
}
}
if err := r.Do(ctx).Into(response); err != nil {
do := func() { err = r.Do(ctx).Into(response) }
if wd, ok := endpointsrequest.WebhookDurationFrom(ctx); ok {
tmp := do
do = func() { wd.ValidateTracker.Track(tmp) }
}
do()
if err != nil {
var status *apierrors.StatusError
if se, ok := err.(*apierrors.StatusError); ok {
status = se

View File

@ -24,6 +24,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apiserver/pkg/endpoints/request"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/apimachinery/pkg/api/errors"
webhooktesting "k8s.io/apiserver/pkg/admission/plugin/webhook/testing"
@ -213,3 +215,69 @@ func TestValidateCachedClient(t *testing.T) {
}
}
}
// TestValidateWebhookDuration tests that ValidatingWebhook#Validate sets webhook duration in context correctly
func TestValidateWebhookDuration(ts *testing.T) {
clk := clocktesting.FakeClock{}
testServer := webhooktesting.NewTestServerWithHandler(ts, webhooktesting.ClockSteppingWebhookHandler(ts, &clk))
testServer.StartTLS()
defer testServer.Close()
serverURL, err := url.ParseRequestURI(testServer.URL)
if err != nil {
ts.Fatalf("this should never happen? %v", err)
}
objectInterfaces := webhooktesting.NewObjectInterfacesForTest()
stopCh := make(chan struct{})
defer close(stopCh)
for _, test := range webhooktesting.NewValidationDurationTestCases(serverURL) {
ts.Run(test.Name, func(t *testing.T) {
ctx := context.TODO()
if test.InitContext {
ctx = request.WithWebhookDurationAndCustomClock(ctx, &clk)
}
wh, err := NewValidatingAdmissionWebhook(nil)
if err != nil {
t.Errorf("failed to create mutating webhook: %v", err)
return
}
ns := "webhook-test"
client, informer := webhooktesting.NewFakeValidatingDataSource(ns, test.Webhooks, stopCh)
wh.SetAuthenticationInfoResolverWrapper(webhooktesting.Wrapper(webhooktesting.NewAuthenticationInfoResolver(new(int32))))
wh.SetServiceResolver(webhooktesting.NewServiceResolver(*serverURL))
wh.SetExternalKubeClientSet(client)
wh.SetExternalKubeInformerFactory(informer)
informer.Start(stopCh)
informer.WaitForCacheSync(stopCh)
if err = wh.ValidateInitialization(); err != nil {
t.Errorf("failed to validate initialization: %v", err)
return
}
_ = wh.Validate(ctx, webhooktesting.NewAttribute(ns, nil, test.IsDryRun), objectInterfaces)
wd, ok := request.WebhookDurationFrom(ctx)
if !ok {
if test.InitContext {
t.Errorf("expected webhook duration to be initialized")
}
return
}
if !test.InitContext {
t.Errorf("expected webhook duration to not be initialized")
return
}
if wd.AdmitTracker.GetLatency() != 0 {
t.Errorf("expected admit duration to be equal to 0 got %q", wd.AdmitTracker.GetLatency())
}
if wd.ValidateTracker.GetLatency() < test.ExpectedDurationMax {
t.Errorf("expected validate duraion to be greater or equal to %q got %q", test.ExpectedDurationMax, wd.ValidateTracker.GetLatency())
}
})
}
}

View File

@ -0,0 +1,33 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http"
"k8s.io/apiserver/pkg/endpoints/request"
)
// WithWebhookDuration adds WebhookDuration trackers to the
// context associated with a request.
func WithWebhookDuration(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
req = req.WithContext(request.WithWebhookDuration(ctx))
handler.ServeHTTP(w, req)
})
}

View File

@ -109,6 +109,19 @@ var (
},
[]string{"verb", "dry_run", "group", "version", "resource", "subresource", "scope", "component"},
)
requestSloLatencies = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "apiserver_request_slo_duration_seconds",
Help: "Response latency distribution (not counting webhook duration) in seconds for each verb, group, version, resource, subresource, scope and component.",
// This metric is supplementary to the requestLatencies metric.
// It measures request duration excluding webhooks as they are mostly
// dependant on user configuration.
Buckets: []float64{0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3,
4, 5, 6, 8, 10, 15, 20, 30, 45, 60},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
)
responseSizes = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "apiserver_response_sizes",
@ -246,6 +259,7 @@ var (
longRunningRequestsGauge,
longRunningRequestGauge,
requestLatencies,
requestSloLatencies,
responseSizes,
DroppedRequests,
TLSHandshakeErrors,
@ -468,6 +482,10 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour
}
}
requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
if wd, ok := request.WebhookDurationFrom(req.Context()); ok {
sloLatency := elapsedSeconds - (wd.AdmitTracker.GetLatency() + wd.ValidateTracker.GetLatency()).Seconds()
requestSloLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(sloLatency)
}
// We are only interested in response sizes of read requests.
if verb == "GET" || verb == "LIST" {
responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))

View File

@ -0,0 +1,122 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"context"
"sync"
"time"
"k8s.io/utils/clock"
)
func sumDuration(d1 time.Duration, d2 time.Duration) time.Duration {
return d1 + d2
}
func maxDuration(d1 time.Duration, d2 time.Duration) time.Duration {
if d1 > d2 {
return d1
}
return d2
}
// DurationTracker is a simple interface for tracking functions duration
type DurationTracker interface {
Track(func())
GetLatency() time.Duration
}
// durationTracker implements DurationTracker by measuring function time
// using given clock and aggregates the duration using given aggregate function
type durationTracker struct {
clock clock.Clock
latency time.Duration
mu sync.Mutex
aggregateFunction func(time.Duration, time.Duration) time.Duration
}
// Track measures time spent in given function and aggregates measured
// duration using aggregateFunction
func (t *durationTracker) Track(f func()) {
startedAt := t.clock.Now()
defer func() {
duration := t.clock.Since(startedAt)
t.mu.Lock()
defer t.mu.Unlock()
t.latency = t.aggregateFunction(t.latency, duration)
}()
f()
}
// GetLatency returns aggregated latency tracked by a tracker
func (t *durationTracker) GetLatency() time.Duration {
t.mu.Lock()
defer t.mu.Unlock()
return t.latency
}
func newSumLatencyTracker(c clock.Clock) DurationTracker {
return &durationTracker{
clock: c,
aggregateFunction: sumDuration,
}
}
func newMaxLatencyTracker(c clock.Clock) DurationTracker {
return &durationTracker{
clock: c,
aggregateFunction: maxDuration,
}
}
// WebhookDuration stores trackers used to measure webhook request durations.
// Since admit webhooks are done sequentially duration is aggregated using
// sum function. Validate webhooks are done in parallel so max function
// is used.
type WebhookDuration struct {
AdmitTracker DurationTracker
ValidateTracker DurationTracker
}
type webhookDurationKeyType int
// webhookDurationKey is the WebhookDuration (the time the request spent waiting
// for the webhooks to finish) key for the context.
const webhookDurationKey webhookDurationKeyType = iota
// WithWebhookDuration returns a copy of parent context to which the
// WebhookDuration trackers are added.
func WithWebhookDuration(parent context.Context) context.Context {
return WithWebhookDurationAndCustomClock(parent, clock.RealClock{})
}
// WithWebhookDurationAndCustomClock returns a copy of parent context to which
// the WebhookDuration trackers are added. Tracers use given clock.
func WithWebhookDurationAndCustomClock(parent context.Context, c clock.Clock) context.Context {
return WithValue(parent, webhookDurationKey, &WebhookDuration{
AdmitTracker: newSumLatencyTracker(c),
ValidateTracker: newMaxLatencyTracker(c),
})
}
// WebhookDurationFrom returns the value of the WebhookDuration key from the specified context.
func WebhookDurationFrom(ctx context.Context) (*WebhookDuration, bool) {
wd, ok := ctx.Value(webhookDurationKey).(*WebhookDuration)
return wd, ok && wd != nil
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package request
import (
"context"
"testing"
"time"
clocktesting "k8s.io/utils/clock/testing"
)
func TestWebhookDurationFrom(t *testing.T) {
type testCase struct {
Durations []time.Duration
SumDurations time.Duration
MaxDuration time.Duration
}
tc := testCase{
Durations: []time.Duration{100, 200, 300, 200, 400, 300, 100},
SumDurations: 1600,
MaxDuration: 400,
}
t.Run("TestWebhookDurationFrom", func(t *testing.T) {
parent := context.TODO()
_, ok := WebhookDurationFrom(parent)
if ok {
t.Error("expected WebhookDurationFrom to not be initialized")
}
clk := clocktesting.FakeClock{}
ctx := WithWebhookDurationAndCustomClock(parent, &clk)
wd, ok := WebhookDurationFrom(ctx)
if !ok {
t.Error("expected webhook duration to be initialized")
}
if wd.AdmitTracker.GetLatency() != 0 || wd.ValidateTracker.GetLatency() != 0 {
t.Error("expected values to be initialized to 0")
}
for _, d := range tc.Durations {
wd.AdmitTracker.Track(func() { clk.Step(d) })
wd.ValidateTracker.Track(func() { clk.Step(d) })
}
wd, ok = WebhookDurationFrom(ctx)
if !ok {
t.Errorf("expected webhook duration to be initialized")
}
if wd.AdmitTracker.GetLatency() != tc.SumDurations {
t.Errorf("expected admit duration: %q, but got: %q", tc.SumDurations, wd.AdmitTracker.GetLatency())
}
if wd.ValidateTracker.GetLatency() != tc.MaxDuration {
t.Errorf("expected validate duration: %q, but got: %q", tc.MaxDuration, wd.ValidateTracker.GetLatency())
}
})
}

View File

@ -762,7 +762,8 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c
}
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler := genericapifilters.WithWebhookDuration(apiHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "authorization")