Merge pull request #124189 from toddtreece/kube-aggregator-proxy-tracing
Add tracing to kube-aggregator proxyHandler
This commit is contained in:
		| @@ -37,6 +37,7 @@ import ( | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	"k8s.io/client-go/transport" | ||||
| 	"k8s.io/component-base/tracing" | ||||
| 	"k8s.io/component-base/version" | ||||
| 	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" | ||||
| 	v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" | ||||
| @@ -169,6 +170,9 @@ type APIAggregator struct { | ||||
|  | ||||
| 	// rejectForwardingRedirects is whether to allow to forward redirect response | ||||
| 	rejectForwardingRedirects bool | ||||
|  | ||||
| 	// tracerProvider is used to wrap the proxy transport and handler with tracing | ||||
| 	tracerProvider tracing.TracerProvider | ||||
| } | ||||
|  | ||||
| // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. | ||||
| @@ -239,6 +243,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg | ||||
| 		openAPIV3Config:            c.GenericConfig.OpenAPIV3Config, | ||||
| 		proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil }, | ||||
| 		rejectForwardingRedirects:  c.ExtraConfig.RejectForwardingRedirects, | ||||
| 		tracerProvider:             c.GenericConfig.TracerProvider, | ||||
| 	} | ||||
|  | ||||
| 	// used later  to filter the served resource by those that have expired. | ||||
| @@ -518,6 +523,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { | ||||
| 		proxyTransportDial:         s.proxyTransportDial, | ||||
| 		serviceResolver:            s.serviceResolver, | ||||
| 		rejectForwardingRedirects:  s.rejectForwardingRedirects, | ||||
| 		tracerProvider:             s.tracerProvider, | ||||
| 	} | ||||
| 	proxyHandler.updateAPIService(apiService) | ||||
| 	if s.openAPIAggregationController != nil { | ||||
|   | ||||
| @@ -27,10 +27,13 @@ import ( | ||||
| 	"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" | ||||
| 	endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics" | ||||
| 	genericapirequest "k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	genericfeatures "k8s.io/apiserver/pkg/features" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" | ||||
| 	apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" | ||||
| 	"k8s.io/apiserver/pkg/util/x509metrics" | ||||
| 	"k8s.io/client-go/transport" | ||||
| 	"k8s.io/component-base/tracing" | ||||
| 	"k8s.io/klog/v2" | ||||
| 	apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" | ||||
| 	apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" | ||||
| @@ -59,6 +62,9 @@ type proxyHandler struct { | ||||
|  | ||||
| 	// reject to forward redirect response | ||||
| 	rejectForwardingRedirects bool | ||||
|  | ||||
| 	// tracerProvider is used to wrap the proxy transport and handler with tracing | ||||
| 	tracerProvider tracing.TracerProvider | ||||
| } | ||||
|  | ||||
| type proxyHandlingInfo struct { | ||||
| @@ -155,6 +161,11 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | ||||
|  | ||||
| 	proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper) | ||||
|  | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && !upgrade { | ||||
| 		tracingWrapper := tracing.WrapperFor(r.tracerProvider) | ||||
| 		proxyRoundTripper = tracingWrapper(proxyRoundTripper) | ||||
| 	} | ||||
|  | ||||
| 	// If we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does | ||||
| 	// NOT use the proxyRoundTripper.  It's a direct dial that bypasses the proxyRoundTripper.  This means that we have to | ||||
| 	// attach the "correct" user headers to the request ahead of time. | ||||
|   | ||||
| @@ -40,12 +40,17 @@ import ( | ||||
|  | ||||
| 	"golang.org/x/net/websocket" | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/propagation" | ||||
| 	sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||||
| 	"go.opentelemetry.io/otel/sdk/trace/tracetest" | ||||
| 	"go.opentelemetry.io/otel/trace" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	utilnet "k8s.io/apimachinery/pkg/util/net" | ||||
| 	"k8s.io/apimachinery/pkg/util/proxy" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apiserver/pkg/authentication/user" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/filters" | ||||
| 	genericapirequest "k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	"k8s.io/apiserver/pkg/server/egressselector" | ||||
| 	utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" | ||||
| @@ -54,6 +59,7 @@ import ( | ||||
| 	"k8s.io/component-base/metrics/legacyregistry" | ||||
| 	apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" | ||||
| 	"k8s.io/utils/pointer" | ||||
| 	"k8s.io/utils/ptr" | ||||
| ) | ||||
|  | ||||
| type targetHTTPHandler struct { | ||||
| @@ -774,6 +780,116 @@ func TestGetContextForNewRequest(t *testing.T) { | ||||
|  | ||||
| } | ||||
|  | ||||
| func TestTracerProvider(t *testing.T) { | ||||
| 	fakeRecorder := tracetest.NewSpanRecorder() | ||||
| 	otelTracer := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder)) | ||||
| 	target := &targetHTTPHandler{} | ||||
| 	user := &user.DefaultInfo{ | ||||
| 		Name:   "username", | ||||
| 		Groups: []string{"one", "two"}, | ||||
| 	} | ||||
| 	path := "/request/path" | ||||
| 	apiService := &apiregistration.APIService{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"}, | ||||
| 		Spec: apiregistration.APIServiceSpec{ | ||||
| 			Service:  &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: ptr.To(int32(443))}, | ||||
| 			Group:    "foo", | ||||
| 			Version:  "v1", | ||||
| 			CABundle: testCACrt, | ||||
| 		}, | ||||
| 		Status: apiregistration.APIServiceStatus{ | ||||
| 			Conditions: []apiregistration.APIServiceCondition{ | ||||
| 				{Type: apiregistration.Available, Status: apiregistration.ConditionTrue}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	targetServer := httptest.NewUnstartedServer(target) | ||||
| 	serviceCert := svcCrt | ||||
| 	if cert, err := tls.X509KeyPair(serviceCert, svcKey); err != nil { | ||||
| 		t.Fatalf("TestTracerProvider: failed to parse key pair: %v", err) | ||||
| 	} else { | ||||
| 		targetServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}} | ||||
| 	} | ||||
| 	targetServer.StartTLS() | ||||
| 	defer targetServer.Close() | ||||
|  | ||||
| 	serviceResolver := &mockedRouter{destinationHost: targetServer.Listener.Addr().String()} | ||||
| 	handler := &proxyHandler{ | ||||
| 		localDelegate:              http.NewServeMux(), | ||||
| 		serviceResolver:            serviceResolver, | ||||
| 		proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, | ||||
| 		tracerProvider:             otelTracer, | ||||
| 	} | ||||
|  | ||||
| 	server := httptest.NewServer(contextHandler(filters.WithTracing(handler, otelTracer), user)) | ||||
| 	defer server.Close() | ||||
|  | ||||
| 	handler.updateAPIService(apiService) | ||||
| 	curr := handler.handlingInfo.Load().(proxyHandlingInfo) | ||||
| 	handler.handlingInfo.Store(curr) | ||||
| 	var propagator propagation.TraceContext | ||||
| 	req, err := http.NewRequest(http.MethodGet, server.URL+path, nil) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("expected new request: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	t.Logf("Sending request: %v", req) | ||||
| 	_, err = http.DefaultClient.Do(req) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("http request failed: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	t.Log("Ensure the target server received the traceparent header") | ||||
| 	id, ok := target.headers["Traceparent"] | ||||
| 	if !ok { | ||||
| 		t.Error("expected traceparent header") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	t.Log("Get the span context from the traceparent header") | ||||
| 	h := http.Header{ | ||||
| 		"Traceparent": id, | ||||
| 	} | ||||
| 	ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(h)) | ||||
| 	span := trace.SpanFromContext(ctx) | ||||
|  | ||||
| 	t.Log("Ensure that the span context is valid and remote") | ||||
| 	if !span.SpanContext().IsValid() { | ||||
| 		t.Error("expected valid span context") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if !span.SpanContext().IsRemote() { | ||||
| 		t.Error("expected remote span context") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	t.Log("Ensure that the span ID and trace ID match the expected values") | ||||
| 	expectedSpanCtx := fakeRecorder.Ended()[0].SpanContext() | ||||
| 	if expectedSpanCtx.TraceID() != span.SpanContext().TraceID() { | ||||
| 		t.Errorf("expected trace id to match. expected: %v, but got %v", expectedSpanCtx.TraceID(), span.SpanContext().TraceID()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if expectedSpanCtx.SpanID() != span.SpanContext().SpanID() { | ||||
| 		t.Errorf("expected span id to match. expected: %v, but got: %v", expectedSpanCtx.SpanID(), span.SpanContext().SpanID()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	t.Log("Ensure that the expected spans were recorded when sending a request through the proxy") | ||||
| 	expectedSpanNames := []string{"HTTP GET", "GET"} | ||||
| 	spanNames := []string{} | ||||
| 	for _, span := range fakeRecorder.Ended() { | ||||
| 		spanNames = append(spanNames, span.Name()) | ||||
| 	} | ||||
| 	if e, a := expectedSpanNames, spanNames; !reflect.DeepEqual(e, a) { | ||||
| 		t.Errorf("expected span names %v, got %v", e, a) | ||||
| 		return | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestNewRequestForProxyWithAuditID(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name    string | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot