Merge pull request #49395 from deads2k/server-33-rate
Automatic merge from submit-queue (batch tested with PRs 49444, 47864, 48584, 49395, 49118) rate limiting should not affect system masters A `system:masters` user is either a loopback connection or someone with complete access to the cluster. Either way, they should not be rate-limited like a normal client since their requests are more important. This moves the maxinflight checker post-authentication and excluded system:masters from the rate limit. @smarterclayton as discussed. @kubernetes/sig-api-machinery-misc
This commit is contained in:
		@@ -468,6 +468,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
 | 
					func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
 | 
				
			||||||
	handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer, c.Serializer)
 | 
						handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer, c.Serializer)
 | 
				
			||||||
 | 
						handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
 | 
				
			||||||
	handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer, c.Serializer)
 | 
						handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer, c.Serializer)
 | 
				
			||||||
	if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
 | 
						if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
 | 
				
			||||||
		handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
 | 
							handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
 | 
				
			||||||
@@ -477,7 +478,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
 | 
				
			|||||||
	handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.RequestContextMapper, c.Serializer, c.SupportsBasicAuth))
 | 
						handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.RequestContextMapper, c.Serializer, c.SupportsBasicAuth))
 | 
				
			||||||
	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
 | 
						handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
 | 
				
			||||||
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc)
 | 
						handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc)
 | 
				
			||||||
	handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
 | 
					 | 
				
			||||||
	handler = genericapifilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
 | 
						handler = genericapifilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
 | 
				
			||||||
	handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
 | 
						handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
 | 
				
			||||||
	handler = genericfilters.WithPanicRecovery(handler)
 | 
						handler = genericfilters.WithPanicRecovery(handler)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,6 +24,7 @@ go_test(
 | 
				
			|||||||
        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
@@ -48,6 +49,7 @@ go_library(
 | 
				
			|||||||
        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,6 +24,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/authentication/user"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/endpoints/metrics"
 | 
						"k8s.io/apiserver/pkg/endpoints/metrics"
 | 
				
			||||||
	apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
						apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
						genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
@@ -91,11 +92,23 @@ func WithMaxInFlightLimit(
 | 
				
			|||||||
		if c == nil {
 | 
							if c == nil {
 | 
				
			||||||
			handler.ServeHTTP(w, r)
 | 
								handler.ServeHTTP(w, r)
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case c <- true:
 | 
								case c <- true:
 | 
				
			||||||
				defer func() { <-c }()
 | 
									defer func() { <-c }()
 | 
				
			||||||
				handler.ServeHTTP(w, r)
 | 
									handler.ServeHTTP(w, r)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
 | 
									// at this point we're about to return a 429, BUT not all actors should be rate limited.  A system:master is so powerful
 | 
				
			||||||
 | 
									// that he should always get an answer.  It's a super-admin or a loopback connection.
 | 
				
			||||||
 | 
									if currUser, ok := apirequest.UserFrom(ctx); ok {
 | 
				
			||||||
 | 
										for _, group := range currUser.GetGroups() {
 | 
				
			||||||
 | 
											if group == user.SystemPrivilegedGroup {
 | 
				
			||||||
 | 
												handler.ServeHTTP(w, r)
 | 
				
			||||||
 | 
												return
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
				metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now())
 | 
									metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now())
 | 
				
			||||||
				tooManyRequests(r, w)
 | 
									tooManyRequests(r, w)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,12 +26,12 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
 | 
						"k8s.io/apiserver/pkg/authentication/user"
 | 
				
			||||||
	apifilters "k8s.io/apiserver/pkg/endpoints/filters"
 | 
						apifilters "k8s.io/apiserver/pkg/endpoints/filters"
 | 
				
			||||||
	apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
						apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
 | 
					func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
 | 
				
			||||||
 | 
					 | 
				
			||||||
	longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
 | 
						longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	requestContextMapper := apirequest.NewRequestContextMapper()
 | 
						requestContextMapper := apirequest.NewRequestContextMapper()
 | 
				
			||||||
@@ -55,12 +55,30 @@ func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *b
 | 
				
			|||||||
		requestContextMapper,
 | 
							requestContextMapper,
 | 
				
			||||||
		longRunningRequestCheck,
 | 
							longRunningRequestCheck,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
						handler = withFakeUser(handler, requestContextMapper)
 | 
				
			||||||
	handler = apifilters.WithRequestInfo(handler, requestInfoFactory, requestContextMapper)
 | 
						handler = apifilters.WithRequestInfo(handler, requestInfoFactory, requestContextMapper)
 | 
				
			||||||
	handler = apirequest.WithRequestContext(handler, requestContextMapper)
 | 
						handler = apirequest.WithRequestContext(handler, requestContextMapper)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return httptest.NewServer(handler)
 | 
						return httptest.NewServer(handler)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func withFakeUser(handler http.Handler, requestContextMapper apirequest.RequestContextMapper) http.Handler {
 | 
				
			||||||
 | 
						return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
 | 
							ctx, ok := requestContextMapper.Get(r)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if len(r.Header["Groups"]) > 0 {
 | 
				
			||||||
 | 
								requestContextMapper.Update(r, apirequest.WithUser(ctx, &user.DefaultInfo{
 | 
				
			||||||
 | 
									Groups: r.Header["Groups"],
 | 
				
			||||||
 | 
								}))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							handler.ServeHTTP(w, r)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Tests that MaxInFlightLimit works, i.e.
 | 
					// Tests that MaxInFlightLimit works, i.e.
 | 
				
			||||||
// - "long" requests such as proxy or watch, identified by regexp are not accounted despite
 | 
					// - "long" requests such as proxy or watch, identified by regexp are not accounted despite
 | 
				
			||||||
//   hanging for the long time,
 | 
					//   hanging for the long time,
 | 
				
			||||||
@@ -228,8 +246,16 @@ func expectHTTPGet(url string, code int) error {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// We use POST as a sample mutating request.
 | 
					// We use POST as a sample mutating request.
 | 
				
			||||||
func expectHTTPPost(url string, code int) error {
 | 
					func expectHTTPPost(url string, code int, groups ...string) error {
 | 
				
			||||||
	r, err := http.Post(url, "text/html", strings.NewReader("foo bar"))
 | 
						req, err := http.NewRequest(http.MethodPost, url, strings.NewReader("foo bar"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, group := range groups {
 | 
				
			||||||
 | 
							req.Header.Add("Groups", group)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						r, err := http.DefaultClient.Do(req)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("unexpected error: %v", err)
 | 
							return fmt.Errorf("unexpected error: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -238,3 +264,54 @@ func expectHTTPPost(url string, code int) error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestMaxInFlightSkipsMasters(t *testing.T) {
 | 
				
			||||||
 | 
						const AllowedMutatingInflightRequestsNo = 3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						calls := &sync.WaitGroup{}
 | 
				
			||||||
 | 
						calls.Add(AllowedMutatingInflightRequestsNo)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						responses := &sync.WaitGroup{}
 | 
				
			||||||
 | 
						responses.Add(AllowedMutatingInflightRequestsNo)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Block is used to keep requests in flight for as long as we need to. All requests will
 | 
				
			||||||
 | 
						// be unblocked at the same time.
 | 
				
			||||||
 | 
						block := &sync.WaitGroup{}
 | 
				
			||||||
 | 
						block.Add(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						waitForCalls := true
 | 
				
			||||||
 | 
						waitForCallsMutex := sync.Mutex{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						server := createMaxInflightServer(calls, block, &waitForCalls, &waitForCallsMutex, 1, AllowedMutatingInflightRequestsNo)
 | 
				
			||||||
 | 
						defer server.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// These should hang and be accounted, i.e. saturate the server
 | 
				
			||||||
 | 
						for i := 0; i < AllowedMutatingInflightRequestsNo; i++ {
 | 
				
			||||||
 | 
							// These should hang waiting on block...
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								if err := expectHTTPPost(server.URL+"/foo/bar", http.StatusOK); err != nil {
 | 
				
			||||||
 | 
									t.Error(err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								responses.Done()
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// We wait for all calls to be received by the server
 | 
				
			||||||
 | 
						calls.Wait()
 | 
				
			||||||
 | 
						// Disable calls notifications in the server
 | 
				
			||||||
 | 
						// Disable calls notifications in the server
 | 
				
			||||||
 | 
						waitForCallsMutex.Lock()
 | 
				
			||||||
 | 
						waitForCalls = false
 | 
				
			||||||
 | 
						waitForCallsMutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Do this multiple times to show that rate limit rejected requests don't block.
 | 
				
			||||||
 | 
						for i := 0; i < 2; i++ {
 | 
				
			||||||
 | 
							if err := expectHTTPPost(server.URL+"/dontwait", http.StatusOK, user.SystemPrivilegedGroup); err != nil {
 | 
				
			||||||
 | 
								t.Error(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Let all hanging requests finish
 | 
				
			||||||
 | 
						block.Done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						responses.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user