Fix httplog not logging watch duration in separate goroutines
Signed-off-by: Eric Lin <exlin@google.com>
This commit is contained in:
		| @@ -41,7 +41,7 @@ import ( | ||||
| 	"k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	"k8s.io/apiserver/pkg/features" | ||||
| 	"k8s.io/apiserver/pkg/registry/rest" | ||||
| 	genericfilters "k8s.io/apiserver/pkg/server/filters" | ||||
| 	"k8s.io/apiserver/pkg/server/routine" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/component-base/tracing" | ||||
| 	"k8s.io/klog/v2" | ||||
| @@ -285,7 +285,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc | ||||
| 			} | ||||
|  | ||||
| 			// Run watch serving in a separate goroutine to allow freeing current stack memory | ||||
| 			t := genericfilters.TaskFrom(req.Context()) | ||||
| 			t := routine.TaskFrom(req.Context()) | ||||
| 			if t != nil { | ||||
| 				t.Func = serve | ||||
| 			} else { | ||||
|   | ||||
| @@ -64,6 +64,7 @@ import ( | ||||
| 	genericfilters "k8s.io/apiserver/pkg/server/filters" | ||||
| 	"k8s.io/apiserver/pkg/server/healthz" | ||||
| 	"k8s.io/apiserver/pkg/server/routes" | ||||
| 	"k8s.io/apiserver/pkg/server/routine" | ||||
| 	serverstore "k8s.io/apiserver/pkg/server/storage" | ||||
| 	storagevalue "k8s.io/apiserver/pkg/storage/value" | ||||
| 	"k8s.io/apiserver/pkg/storageversion" | ||||
| @@ -1055,7 +1056,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { | ||||
| 	// handler in current goroutine to minimize the stack memory usage. It must be | ||||
| 	// after WithPanicRecover() to be protected from panics. | ||||
| 	if c.FeatureGate.Enabled(genericfeatures.APIServingWithRoutine) { | ||||
| 		handler = genericfilters.WithRoutine(handler, c.LongRunningFunc) | ||||
| 		handler = routine.WithRoutine(handler, c.LongRunningFunc) | ||||
| 	} | ||||
| 	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) | ||||
| 	handler = genericapifilters.WithRequestReceivedTimestamp(handler) | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| /* | ||||
| Copyright 2023 The Kubernetes Authors. | ||||
| Copyright 2024 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| @@ -25,6 +25,7 @@ import ( | ||||
| 
 | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	"k8s.io/apiserver/pkg/server/routine" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
| 
 | ||||
| @@ -42,7 +43,7 @@ func TestPropogatingPanic(t *testing.T) { | ||||
| 		APIPrefixes:          sets.NewString("api", "apis"), | ||||
| 		GrouplessAPIPrefixes: sets.NewString("api"), | ||||
| 	} | ||||
| 	ts := httptest.NewServer(WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true })) | ||||
| 	ts := httptest.NewServer(routine.WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true })) | ||||
| 	defer ts.Close() | ||||
| 	_, err := http.Get(ts.URL) | ||||
| 	if err == nil { | ||||
| @@ -57,23 +58,3 @@ func TestPropogatingPanic(t *testing.T) { | ||||
| 		t.Errorf("unexpected out captured actual = %v", capturedOutput) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestExecutionWithRoutine(t *testing.T) { | ||||
| 	var executed bool | ||||
| 	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		t := TaskFrom(r.Context()) | ||||
| 		t.Func = func() { | ||||
| 			executed = true | ||||
| 		} | ||||
| 	}) | ||||
| 	ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) | ||||
| 	defer ts.Close() | ||||
| 
 | ||||
| 	_, err := http.Get(ts.URL) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("got unexpected error on request: %v", err) | ||||
| 	} | ||||
| 	if !executed { | ||||
| 		t.Error("expected to execute") | ||||
| 	} | ||||
| } | ||||
| @@ -31,6 +31,7 @@ import ( | ||||
| 	"k8s.io/apiserver/pkg/endpoints/metrics" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/request" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/responsewriter" | ||||
| 	"k8s.io/apiserver/pkg/server/routine" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| @@ -125,10 +126,26 @@ func withLogging(handler http.Handler, stackTracePred StacktracePred, shouldLogR | ||||
| 		rl := newLoggedWithStartTime(req, w, startTime) | ||||
| 		rl.StacktraceWhen(stackTracePred) | ||||
| 		req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl)) | ||||
| 		defer rl.Log() | ||||
|  | ||||
| 		var logFunc func() | ||||
| 		logFunc = rl.Log | ||||
| 		defer func() { | ||||
| 			if logFunc != nil { | ||||
| 				logFunc() | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		w = responsewriter.WrapForHTTP1Or2(rl) | ||||
| 		handler.ServeHTTP(w, req) | ||||
|  | ||||
| 		// We need to ensure that the request is logged after it is processed. | ||||
| 		// In case the request is executed in a separate goroutine created via | ||||
| 		// WithRoutine handler in the handler chain (i.e. above handler.ServeHTTP() | ||||
| 		// would return request is completely responsed), we want the logging to | ||||
| 		// happen in that goroutine too, so we append it to the task. | ||||
| 		if routine.AppendTask(ctx, &routine.Task{Func: rl.Log}) { | ||||
| 			logFunc = nil | ||||
| 		} | ||||
| 	}) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package filters | ||||
| package routine | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| @@ -35,6 +35,20 @@ func WithTask(parent context.Context, t *Task) context.Context { | ||||
| 	return request.WithValue(parent, taskKey, t) | ||||
| } | ||||
| 
 | ||||
| // AppendTask appends a task executed after completion of existing task. | ||||
| // It is a no-op if there is no existing task. | ||||
| func AppendTask(ctx context.Context, t *Task) bool { | ||||
| 	if existTask := TaskFrom(ctx); existTask != nil && existTask.Func != nil { | ||||
| 		existFunc := existTask.Func | ||||
| 		existTask.Func = func() { | ||||
| 			existFunc() | ||||
| 			t.Func() | ||||
| 		} | ||||
| 		return true | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func TaskFrom(ctx context.Context) *Task { | ||||
| 	t, _ := ctx.Value(taskKey).(*Task) | ||||
| 	return t | ||||
| @@ -0,0 +1,99 @@ | ||||
| /* | ||||
| Copyright 2023 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package routine | ||||
|  | ||||
| import ( | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"testing" | ||||
|  | ||||
| 	"k8s.io/apiserver/pkg/endpoints/request" | ||||
| ) | ||||
|  | ||||
| func TestExecutionWithRoutine(t *testing.T) { | ||||
| 	var executed bool | ||||
| 	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		t := TaskFrom(r.Context()) | ||||
| 		t.Func = func() { | ||||
| 			executed = true | ||||
| 		} | ||||
| 	}) | ||||
| 	ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) | ||||
| 	defer ts.Close() | ||||
|  | ||||
| 	_, err := http.Get(ts.URL) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("got unexpected error on request: %v", err) | ||||
| 	} | ||||
| 	if !executed { | ||||
| 		t.Error("expected to execute") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestAppendTask(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name          string | ||||
| 		existingTask  bool | ||||
| 		taskAppended  bool | ||||
| 		shouldExecute bool | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:          "append task when existing", | ||||
| 			existingTask:  true, | ||||
| 			taskAppended:  true, | ||||
| 			shouldExecute: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:          "not append task when no existing tasks", | ||||
| 			existingTask:  false, | ||||
| 			taskAppended:  false, | ||||
| 			shouldExecute: false, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, test := range tests { | ||||
| 		t.Run(test.name, func(t *testing.T) { | ||||
| 			var executed, appended bool | ||||
| 			taskToAppend := func() { | ||||
| 				executed = true | ||||
| 			} | ||||
| 			handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 				ctx := r.Context() | ||||
| 				if test.existingTask { | ||||
| 					t := TaskFrom(ctx) | ||||
| 					t.Func = func() {} | ||||
| 				} | ||||
|  | ||||
| 				appended = AppendTask(ctx, &Task{taskToAppend}) | ||||
| 			}) | ||||
| 			ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true })) | ||||
| 			defer ts.Close() | ||||
|  | ||||
| 			_, err := http.Get(ts.URL) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("got unexpected error on request: %v", err) | ||||
| 			} | ||||
|  | ||||
| 			if test.taskAppended != appended { | ||||
| 				t.Errorf("expected taskAppended: %t, got: %t", test.taskAppended, executed) | ||||
| 			} | ||||
|  | ||||
| 			if test.shouldExecute != executed { | ||||
| 				t.Errorf("expected shouldExecute: %t, got: %t", test.shouldExecute, executed) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Eric Lin
					Eric Lin