Merge pull request #113217 from dashpole/filterlatency_tracing
FilterLatency tracing for APIServerTracing
This commit is contained in:
		| @@ -22,6 +22,8 @@ import ( | ||||
| 	"net/http" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	"k8s.io/apiserver/pkg/endpoints/metrics" | ||||
| 	apirequest "k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	"k8s.io/apiserver/pkg/server/httplog" | ||||
| @@ -54,8 +56,8 @@ func requestFilterRecordFrom(ctx context.Context) *requestFilterRecord { | ||||
|  | ||||
| // TrackStarted measures the timestamp the given handler has started execution | ||||
| // by attaching a handler to the chain. | ||||
| func TrackStarted(handler http.Handler, name string) http.Handler { | ||||
| 	return trackStarted(handler, name, clock.RealClock{}) | ||||
| func TrackStarted(handler http.Handler, tp trace.TracerProvider, name string) http.Handler { | ||||
| 	return trackStarted(handler, tp, name, clock.RealClock{}) | ||||
| } | ||||
|  | ||||
| // TrackCompleted measures the timestamp the given handler has completed execution and then | ||||
| @@ -70,7 +72,9 @@ func TrackCompleted(handler http.Handler) http.Handler { | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) http.Handler { | ||||
| func trackStarted(handler http.Handler, tp trace.TracerProvider, name string, clock clock.PassiveClock) http.Handler { | ||||
| 	// This is a noop if the tracing is disabled, since tp will be a NoopTracerProvider | ||||
| 	tracer := tp.Tracer("k8s.op/apiserver/pkg/endpoints/filterlatency") | ||||
| 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		ctx := r.Context() | ||||
| 		if fr := requestFilterRecordFrom(ctx); fr != nil { | ||||
| @@ -85,6 +89,7 @@ func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) h | ||||
| 			name:             name, | ||||
| 			startedTimestamp: clock.Now(), | ||||
| 		} | ||||
| 		ctx, _ = tracer.Start(ctx, name) | ||||
| 		r = r.WithContext(withRequestFilterRecord(ctx, fr)) | ||||
| 		handler.ServeHTTP(w, r) | ||||
| 	}) | ||||
| @@ -101,5 +106,6 @@ func trackCompleted(handler http.Handler, clock clock.PassiveClock, action func( | ||||
| 		if fr := requestFilterRecordFrom(ctx); fr != nil { | ||||
| 			action(ctx, fr, completedAt) | ||||
| 		} | ||||
| 		trace.SpanFromContext(ctx).End() | ||||
| 	}) | ||||
| } | ||||
|   | ||||
| @@ -23,6 +23,10 @@ import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||||
| 	"go.opentelemetry.io/otel/sdk/trace/tracetest" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
|  | ||||
| 	testingclock "k8s.io/utils/clock/testing" | ||||
| ) | ||||
|  | ||||
| @@ -41,7 +45,7 @@ func TestTrackStartedWithContextAlreadyHasFilterRecord(t *testing.T) { | ||||
| 	}) | ||||
|  | ||||
| 	requestFilterStarted := time.Now() | ||||
| 	wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted)) | ||||
| 	wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted)) | ||||
|  | ||||
| 	testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil) | ||||
| 	if err != nil { | ||||
| @@ -84,7 +88,7 @@ func TestTrackStartedWithContextDoesNotHaveFilterRecord(t *testing.T) { | ||||
| 	}) | ||||
|  | ||||
| 	requestFilterStarted := time.Now() | ||||
| 	wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted)) | ||||
| 	wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted)) | ||||
|  | ||||
| 	testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil) | ||||
| 	if err != nil { | ||||
| @@ -176,3 +180,39 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) { | ||||
| 		t.Errorf("expected the callback to not be invoked, but was actually invoked %d times", actionCallCount) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestStartedAndCompletedOpenTelemetryTracing(t *testing.T) { | ||||
| 	filterName := "my-filter" | ||||
| 	// Seup OTel for testing | ||||
| 	fakeRecorder := tracetest.NewSpanRecorder() | ||||
| 	tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder)) | ||||
|  | ||||
| 	// base handler func | ||||
| 	var callCount int | ||||
| 	handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) { | ||||
| 		// we expect the handler to be invoked just once. | ||||
| 		callCount++ | ||||
| 	}) | ||||
| 	// wrap with start and completed handler | ||||
| 	wrapped := TrackCompleted(handler) | ||||
| 	wrapped = TrackStarted(wrapped, tp, filterName) | ||||
|  | ||||
| 	testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("failed to create new http request - %v", err) | ||||
| 	} | ||||
|  | ||||
| 	wrapped.ServeHTTP(httptest.NewRecorder(), testRequest) | ||||
|  | ||||
| 	if callCount != 1 { | ||||
| 		t.Errorf("expected the given handler to be invoked once, but was actually invoked %d times", callCount) | ||||
| 	} | ||||
| 	output := fakeRecorder.Ended() | ||||
| 	if len(output) != 1 { | ||||
| 		t.Fatalf("got %d; expected len(output) == 1", len(output)) | ||||
| 	} | ||||
| 	span := output[0] | ||||
| 	if span.Name() != filterName { | ||||
| 		t.Fatalf("got %s; expected span.Name == my-filter", span.Name()) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -828,7 +828,7 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c | ||||
| func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { | ||||
| 	handler := filterlatency.TrackCompleted(apiHandler) | ||||
| 	handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) | ||||
| 	handler = filterlatency.TrackStarted(handler, "authorization") | ||||
| 	handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization") | ||||
|  | ||||
| 	if c.FlowControl != nil { | ||||
| 		workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig() | ||||
| @@ -836,18 +836,18 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { | ||||
| 			c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg) | ||||
| 		handler = filterlatency.TrackCompleted(handler) | ||||
| 		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) | ||||
| 		handler = filterlatency.TrackStarted(handler, "priorityandfairness") | ||||
| 		handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness") | ||||
| 	} else { | ||||
| 		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) | ||||
| 	} | ||||
|  | ||||
| 	handler = filterlatency.TrackCompleted(handler) | ||||
| 	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer) | ||||
| 	handler = filterlatency.TrackStarted(handler, "impersonation") | ||||
| 	handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation") | ||||
|  | ||||
| 	handler = filterlatency.TrackCompleted(handler) | ||||
| 	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc) | ||||
| 	handler = filterlatency.TrackStarted(handler, "audit") | ||||
| 	handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit") | ||||
|  | ||||
| 	failedHandler := genericapifilters.Unauthorized(c.Serializer) | ||||
| 	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator) | ||||
| @@ -855,7 +855,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { | ||||
| 	failedHandler = filterlatency.TrackCompleted(failedHandler) | ||||
| 	handler = filterlatency.TrackCompleted(handler) | ||||
| 	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences) | ||||
| 	handler = filterlatency.TrackStarted(handler, "authentication") | ||||
| 	handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authentication") | ||||
|  | ||||
| 	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") | ||||
|  | ||||
|   | ||||
| @@ -40,6 +40,7 @@ import ( | ||||
| 	"k8s.io/client-go/informers" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	"k8s.io/client-go/rest" | ||||
| 	"k8s.io/component-base/tracing" | ||||
| 	netutils "k8s.io/utils/net" | ||||
| ) | ||||
|  | ||||
| @@ -302,6 +303,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { | ||||
| 		RequestTimeout:        10 * time.Second, | ||||
| 		LongRunningFunc:       func(_ *http.Request, _ *request.RequestInfo) bool { return false }, | ||||
| 		lifecycleSignals:      newLifecycleSignals(), | ||||
| 		TracerProvider:        tracing.NewNoopTracerProvider(), | ||||
| 	} | ||||
|  | ||||
| 	h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot