Merge pull request #38119 from liggitt/long-running
Automatic merge from submit-queue (batch tested with PRs 37032, 38119, 38186, 38200, 38139) Detect long-running requests from parsed request info Follow up to https://github.com/kubernetes/kubernetes/pull/36064 Uses parsed request info to more tightly match verbs and subresources Removes regex-based long-running request path matching (which is easily fooled) ```release-note The --long-running-request-regexp flag to kube-apiserver is deprecated and will be removed in a future release. Long-running requests are now detected based on specific verbs (watch, proxy) or subresources (proxy, portforward, log, exec, attach). ```
This commit is contained in:
commit
cbf497b749
@ -35,11 +35,13 @@ go_library(
|
|||||||
"//pkg/generated/openapi:go_default_library",
|
"//pkg/generated/openapi:go_default_library",
|
||||||
"//pkg/genericapiserver:go_default_library",
|
"//pkg/genericapiserver:go_default_library",
|
||||||
"//pkg/genericapiserver/authorizer:go_default_library",
|
"//pkg/genericapiserver/authorizer:go_default_library",
|
||||||
|
"//pkg/genericapiserver/filters:go_default_library",
|
||||||
"//pkg/master:go_default_library",
|
"//pkg/master:go_default_library",
|
||||||
"//pkg/registry/cachesize:go_default_library",
|
"//pkg/registry/cachesize:go_default_library",
|
||||||
"//pkg/runtime/schema:go_default_library",
|
"//pkg/runtime/schema:go_default_library",
|
||||||
"//pkg/util/errors:go_default_library",
|
"//pkg/util/errors:go_default_library",
|
||||||
"//pkg/util/net:go_default_library",
|
"//pkg/util/net:go_default_library",
|
||||||
|
"//pkg/util/sets:go_default_library",
|
||||||
"//pkg/util/wait:go_default_library",
|
"//pkg/util/wait:go_default_library",
|
||||||
"//pkg/version:go_default_library",
|
"//pkg/version:go_default_library",
|
||||||
"//plugin/pkg/admission/admit:go_default_library",
|
"//plugin/pkg/admission/admit:go_default_library",
|
||||||
@ -67,11 +69,3 @@ go_library(
|
|||||||
"//vendor:github.com/spf13/pflag",
|
"//vendor:github.com/spf13/pflag",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
go_test(
|
|
||||||
name = "go_default_test",
|
|
||||||
srcs = ["server_test.go"],
|
|
||||||
library = "go_default_library",
|
|
||||||
tags = ["automanaged"],
|
|
||||||
deps = ["//cmd/kube-apiserver/app/options:go_default_library"],
|
|
||||||
)
|
|
||||||
|
@ -50,11 +50,13 @@ import (
|
|||||||
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
|
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
|
||||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||||
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
|
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
|
||||||
|
"k8s.io/kubernetes/pkg/genericapiserver/filters"
|
||||||
"k8s.io/kubernetes/pkg/master"
|
"k8s.io/kubernetes/pkg/master"
|
||||||
"k8s.io/kubernetes/pkg/registry/cachesize"
|
"k8s.io/kubernetes/pkg/registry/cachesize"
|
||||||
"k8s.io/kubernetes/pkg/runtime/schema"
|
"k8s.io/kubernetes/pkg/runtime/schema"
|
||||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
utilnet "k8s.io/kubernetes/pkg/util/net"
|
utilnet "k8s.io/kubernetes/pkg/util/net"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/version"
|
"k8s.io/kubernetes/pkg/version"
|
||||||
)
|
)
|
||||||
@ -277,6 +279,10 @@ func Run(s *options.ServerRunOptions) error {
|
|||||||
genericConfig.EnableOpenAPISupport = true
|
genericConfig.EnableOpenAPISupport = true
|
||||||
genericConfig.EnableMetrics = true
|
genericConfig.EnableMetrics = true
|
||||||
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
||||||
|
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
||||||
|
sets.NewString("watch", "proxy"),
|
||||||
|
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
||||||
|
)
|
||||||
|
|
||||||
config := &master.Config{
|
config := &master.Config{
|
||||||
GenericConfig: genericConfig,
|
GenericConfig: genericConfig,
|
||||||
|
@ -1,65 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2015 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"regexp"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestLongRunningRequestRegexp(t *testing.T) {
|
|
||||||
regexp := regexp.MustCompile(options.NewServerRunOptions().GenericServerRunOptions.LongRunningRequestRE)
|
|
||||||
dontMatch := []string{
|
|
||||||
"/api/v1/watch-namespace/",
|
|
||||||
"/api/v1/namespace-proxy/",
|
|
||||||
"/api/v1/namespace-watch",
|
|
||||||
"/api/v1/namespace-proxy",
|
|
||||||
"/api/v1/namespace-portforward/pods",
|
|
||||||
"/api/v1/portforward/pods",
|
|
||||||
". anything",
|
|
||||||
"/ that",
|
|
||||||
}
|
|
||||||
doMatch := []string{
|
|
||||||
"/api/v1/pods/watch",
|
|
||||||
"/api/v1/watch/stuff",
|
|
||||||
"/api/v1/default/service/proxy",
|
|
||||||
"/api/v1/pods/proxy/path/to/thing",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/log",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/logs",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/portforward",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/exec",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/attach",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/log/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/logs/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/portforward/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/exec/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/attach/",
|
|
||||||
"/api/v1/watch/namespaces/myns/pods",
|
|
||||||
}
|
|
||||||
for _, path := range dontMatch {
|
|
||||||
if regexp.MatchString(path) {
|
|
||||||
t.Errorf("path should not have match regexp but did: %s", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, path := range doMatch {
|
|
||||||
if !regexp.MatchString(path) {
|
|
||||||
t.Errorf("path should have match regexp did not: %s", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -42,6 +42,7 @@ go_library(
|
|||||||
"//pkg/generated/openapi:go_default_library",
|
"//pkg/generated/openapi:go_default_library",
|
||||||
"//pkg/genericapiserver:go_default_library",
|
"//pkg/genericapiserver:go_default_library",
|
||||||
"//pkg/genericapiserver/authorizer:go_default_library",
|
"//pkg/genericapiserver/authorizer:go_default_library",
|
||||||
|
"//pkg/genericapiserver/filters:go_default_library",
|
||||||
"//pkg/registry/cachesize:go_default_library",
|
"//pkg/registry/cachesize:go_default_library",
|
||||||
"//pkg/registry/core/configmap/etcd:go_default_library",
|
"//pkg/registry/core/configmap/etcd:go_default_library",
|
||||||
"//pkg/registry/core/event/etcd:go_default_library",
|
"//pkg/registry/core/event/etcd:go_default_library",
|
||||||
@ -57,6 +58,7 @@ go_library(
|
|||||||
"//pkg/routes:go_default_library",
|
"//pkg/routes:go_default_library",
|
||||||
"//pkg/runtime/schema:go_default_library",
|
"//pkg/runtime/schema:go_default_library",
|
||||||
"//pkg/util/errors:go_default_library",
|
"//pkg/util/errors:go_default_library",
|
||||||
|
"//pkg/util/sets:go_default_library",
|
||||||
"//pkg/util/wait:go_default_library",
|
"//pkg/util/wait:go_default_library",
|
||||||
"//pkg/version:go_default_library",
|
"//pkg/version:go_default_library",
|
||||||
"//plugin/pkg/admission/admit:go_default_library",
|
"//plugin/pkg/admission/admit:go_default_library",
|
||||||
|
@ -38,12 +38,14 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/generated/openapi"
|
"k8s.io/kubernetes/pkg/generated/openapi"
|
||||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||||
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
|
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
|
||||||
|
"k8s.io/kubernetes/pkg/genericapiserver/filters"
|
||||||
"k8s.io/kubernetes/pkg/registry/cachesize"
|
"k8s.io/kubernetes/pkg/registry/cachesize"
|
||||||
"k8s.io/kubernetes/pkg/registry/generic"
|
"k8s.io/kubernetes/pkg/registry/generic"
|
||||||
genericregistry "k8s.io/kubernetes/pkg/registry/generic/registry"
|
genericregistry "k8s.io/kubernetes/pkg/registry/generic/registry"
|
||||||
"k8s.io/kubernetes/pkg/routes"
|
"k8s.io/kubernetes/pkg/routes"
|
||||||
"k8s.io/kubernetes/pkg/runtime/schema"
|
"k8s.io/kubernetes/pkg/runtime/schema"
|
||||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/version"
|
"k8s.io/kubernetes/pkg/version"
|
||||||
)
|
)
|
||||||
@ -166,6 +168,10 @@ func Run(s *options.ServerRunOptions) error {
|
|||||||
genericConfig.OpenAPIConfig.Definitions = openapi.OpenAPIDefinitions
|
genericConfig.OpenAPIConfig.Definitions = openapi.OpenAPIDefinitions
|
||||||
genericConfig.EnableOpenAPISupport = true
|
genericConfig.EnableOpenAPISupport = true
|
||||||
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions
|
||||||
|
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
|
||||||
|
sets.NewString("watch", "proxy"),
|
||||||
|
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
|
||||||
|
)
|
||||||
|
|
||||||
// TODO: Move this to generic api server (Need to move the command line flag).
|
// TODO: Move this to generic api server (Need to move the command line flag).
|
||||||
if s.GenericServerRunOptions.EnableWatchCache {
|
if s.GenericServerRunOptions.EnableWatchCache {
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -193,8 +192,6 @@ type SecureServingInfo struct {
|
|||||||
|
|
||||||
// NewConfig returns a Config struct with the default values
|
// NewConfig returns a Config struct with the default values
|
||||||
func NewConfig() *Config {
|
func NewConfig() *Config {
|
||||||
longRunningRE := regexp.MustCompile(options.DefaultLongRunningRequestRE)
|
|
||||||
|
|
||||||
config := &Config{
|
config := &Config{
|
||||||
Serializer: api.Codecs,
|
Serializer: api.Codecs,
|
||||||
ReadWritePort: 6443,
|
ReadWritePort: 6443,
|
||||||
@ -220,7 +217,10 @@ func NewConfig() *Config {
|
|||||||
},
|
},
|
||||||
GetOperationIDAndTags: apiserveropenapi.GetOperationIDAndTags,
|
GetOperationIDAndTags: apiserveropenapi.GetOperationIDAndTags,
|
||||||
},
|
},
|
||||||
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(longRunningRE, map[string]string{"watch": "true"}),
|
|
||||||
|
// Default to treating watch as a long-running operation
|
||||||
|
// Generic API servers have no inherent long-running subresources
|
||||||
|
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
|
||||||
}
|
}
|
||||||
|
|
||||||
// this keeps the defaults in sync
|
// this keeps the defaults in sync
|
||||||
@ -498,7 +498,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) (secure, insec
|
|||||||
generic := func(handler http.Handler) http.Handler {
|
generic := func(handler http.Handler) http.Handler {
|
||||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||||
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
|
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
|
||||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
|
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc)
|
||||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
||||||
handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
|
handler = apiserverfilters.WithRequestInfo(handler, NewRequestInfoResolver(c), c.RequestContextMapper)
|
||||||
handler = api.WithRequestContext(handler, c.RequestContextMapper)
|
handler = api.WithRequestContext(handler, c.RequestContextMapper)
|
||||||
|
@ -47,5 +47,6 @@ go_test(
|
|||||||
"//pkg/api/errors:go_default_library",
|
"//pkg/api/errors:go_default_library",
|
||||||
"//pkg/apiserver/filters:go_default_library",
|
"//pkg/apiserver/filters:go_default_library",
|
||||||
"//pkg/apiserver/request:go_default_library",
|
"//pkg/apiserver/request:go_default_library",
|
||||||
|
"//pkg/util/sets:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -18,29 +18,23 @@ package filters
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
|
||||||
"strings"
|
"k8s.io/kubernetes/pkg/apiserver/request"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
|
// LongRunningRequestCheck is a predicate which is true for long-running http requests.
|
||||||
type LongRunningRequestCheck func(r *http.Request) bool
|
type LongRunningRequestCheck func(r *http.Request, requestInfo *request.RequestInfo) bool
|
||||||
|
|
||||||
// BasicLongRunningRequestCheck pathRegex operates against the url path, the queryParams match is case insensitive.
|
// BasicLongRunningRequestCheck returns true if the given request has one of the specified verbs or one of the specified subresources
|
||||||
// Any one match flags the request.
|
func BasicLongRunningRequestCheck(longRunningVerbs, longRunningSubresources sets.String) LongRunningRequestCheck {
|
||||||
// TODO tighten this check to eliminate the abuse potential by malicious clients that start setting queryParameters
|
return func(r *http.Request, requestInfo *request.RequestInfo) bool {
|
||||||
// to bypass the rate limitter. This could be done using a full parse and special casing the bits we need.
|
if longRunningVerbs.Has(requestInfo.Verb) {
|
||||||
func BasicLongRunningRequestCheck(pathRegex *regexp.Regexp, queryParams map[string]string) LongRunningRequestCheck {
|
|
||||||
return func(r *http.Request) bool {
|
|
||||||
if pathRegex.MatchString(r.URL.Path) {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
if requestInfo.IsResourceRequest && longRunningSubresources.Has(requestInfo.Subresource) {
|
||||||
for key, expectedValue := range queryParams {
|
return true
|
||||||
if strings.ToLower(expectedValue) == strings.ToLower(r.URL.Query().Get(key)) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,13 +62,6 @@ func WithMaxInFlightLimit(
|
|||||||
}
|
}
|
||||||
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// TODO: migrate to use requestInfo instead of having custom request parser.
|
|
||||||
if longRunningRequestCheck(r) {
|
|
||||||
// Skip tracking long running events.
|
|
||||||
handler.ServeHTTP(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, ok := requestContextMapper.Get(r)
|
ctx, ok := requestContextMapper.Get(r)
|
||||||
if !ok {
|
if !ok {
|
||||||
handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
|
handleError(w, r, fmt.Errorf("no context found for request, handler chain must be wrong"))
|
||||||
@ -80,6 +73,12 @@ func WithMaxInFlightLimit(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip tracking long running events.
|
||||||
|
if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var c chan bool
|
var c chan bool
|
||||||
if !nonMutatingRequestVerbs.Has(requestInfo.Verb) {
|
if !nonMutatingRequestVerbs.Has(requestInfo.Verb) {
|
||||||
c = mutatingChan
|
c = mutatingChan
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"regexp"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -29,17 +28,15 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
|
apiserverfilters "k8s.io/kubernetes/pkg/apiserver/filters"
|
||||||
"k8s.io/kubernetes/pkg/apiserver/request"
|
"k8s.io/kubernetes/pkg/apiserver/request"
|
||||||
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
|
func createMaxInflightServer(callsWg, blockWg *sync.WaitGroup, disableCallsWg *bool, disableCallsWgMutex *sync.Mutex, nonMutating, mutating int) *httptest.Server {
|
||||||
|
|
||||||
// notAccountedPathsRegexp specifies paths requests to which we don't account into
|
longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy"))
|
||||||
// requests in flight.
|
|
||||||
notAccountedPathsRegexp := regexp.MustCompile(".*\\/watch")
|
|
||||||
longRunningRequestCheck := BasicLongRunningRequestCheck(notAccountedPathsRegexp, map[string]string{"watch": "true"})
|
|
||||||
|
|
||||||
requestContextMapper := api.NewRequestContextMapper()
|
requestContextMapper := api.NewRequestContextMapper()
|
||||||
requestInfoFactory := &request.RequestInfoFactory{}
|
requestInfoFactory := &request.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
|
||||||
handler := WithMaxInFlightLimit(
|
handler := WithMaxInFlightLimit(
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// A short, accounted request that does not wait for block WaitGroup.
|
// A short, accounted request that does not wait for block WaitGroup.
|
||||||
@ -103,7 +100,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
|
|||||||
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
|
for i := 0; i < AllowedNonMutatingInflightRequestsNo; i++ {
|
||||||
// These should hang waiting on block...
|
// These should hang waiting on block...
|
||||||
go func() {
|
go func() {
|
||||||
if err := expectHTTPGet(server.URL+"/foo/bar?watch=true", http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL+"/api/v1/namespaces/default/wait?watch=true", http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
responses.Done()
|
responses.Done()
|
||||||
@ -139,7 +136,7 @@ func TestMaxInFlightNonMutating(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Validate that non-accounted URLs still work. use a path regex match
|
// Validate that non-accounted URLs still work. use a path regex match
|
||||||
if err := expectHTTPGet(server.URL+"/dontwait/watch", http.StatusOK); err != nil {
|
if err := expectHTTPGet(server.URL+"/api/v1/watch/namespaces/default/dontwait", http.StatusOK); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/apiserver/request"
|
||||||
)
|
)
|
||||||
|
|
||||||
const globalTimeout = time.Minute
|
const globalTimeout = time.Minute
|
||||||
@ -34,13 +35,23 @@ const globalTimeout = time.Minute
|
|||||||
var errConnKilled = fmt.Errorf("kill connection/stream")
|
var errConnKilled = fmt.Errorf("kill connection/stream")
|
||||||
|
|
||||||
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by globalTimeout.
|
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by globalTimeout.
|
||||||
func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning LongRunningRequestCheck) http.Handler {
|
func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMapper api.RequestContextMapper, longRunning LongRunningRequestCheck) http.Handler {
|
||||||
if longRunning == nil {
|
if longRunning == nil {
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
timeoutFunc := func(req *http.Request) (<-chan time.Time, string) {
|
timeoutFunc := func(req *http.Request) (<-chan time.Time, string) {
|
||||||
// TODO unify this with apiserver.MaxInFlightLimit
|
// TODO unify this with apiserver.MaxInFlightLimit
|
||||||
if longRunning(req) {
|
ctx, ok := requestContextMapper.Get(req)
|
||||||
|
if !ok {
|
||||||
|
return time.After(globalTimeout), ""
|
||||||
|
}
|
||||||
|
|
||||||
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
||||||
|
if !ok {
|
||||||
|
return time.After(globalTimeout), ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if longRunning(req, requestInfo) {
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
return time.After(globalTimeout), ""
|
return time.After(globalTimeout), ""
|
||||||
|
@ -31,11 +31,6 @@ import (
|
|||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// TODO: This can be tightened up. It still matches objects named watch or proxy.
|
|
||||||
DefaultLongRunningRequestRE = "(/|^)((watch|proxy)(/|$)|(logs?|portforward|exec|attach)/?$)"
|
|
||||||
)
|
|
||||||
|
|
||||||
var DefaultServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
|
var DefaultServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
|
||||||
|
|
||||||
// ServerRunOptions contains the options while running a generic api server.
|
// ServerRunOptions contains the options while running a generic api server.
|
||||||
@ -60,7 +55,6 @@ type ServerRunOptions struct {
|
|||||||
EnableWatchCache bool
|
EnableWatchCache bool
|
||||||
ExternalHost string
|
ExternalHost string
|
||||||
KubernetesServiceNodePort int
|
KubernetesServiceNodePort int
|
||||||
LongRunningRequestRE string
|
|
||||||
MasterCount int
|
MasterCount int
|
||||||
MasterServiceNamespace string
|
MasterServiceNamespace string
|
||||||
MaxRequestsInFlight int
|
MaxRequestsInFlight int
|
||||||
@ -88,7 +82,6 @@ func NewServerRunOptions() *ServerRunOptions {
|
|||||||
EnableProfiling: true,
|
EnableProfiling: true,
|
||||||
EnableContentionProfiling: false,
|
EnableContentionProfiling: false,
|
||||||
EnableWatchCache: true,
|
EnableWatchCache: true,
|
||||||
LongRunningRequestRE: DefaultLongRunningRequestRE,
|
|
||||||
MasterCount: 1,
|
MasterCount: 1,
|
||||||
MasterServiceNamespace: api.NamespaceDefault,
|
MasterServiceNamespace: api.NamespaceDefault,
|
||||||
MaxRequestsInFlight: 400,
|
MaxRequestsInFlight: 400,
|
||||||
@ -243,9 +236,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
|
|||||||
"of type NodePort, using this as the value of the port. If zero, the Kubernetes master "+
|
"of type NodePort, using this as the value of the port. If zero, the Kubernetes master "+
|
||||||
"service will be of type ClusterIP.")
|
"service will be of type ClusterIP.")
|
||||||
|
|
||||||
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", s.LongRunningRequestRE, ""+
|
// TODO: remove post-1.6
|
||||||
|
fs.String("long-running-request-regexp", "", ""+
|
||||||
"A regular expression matching long running requests which should "+
|
"A regular expression matching long running requests which should "+
|
||||||
"be excluded from maximum inflight request handling.")
|
"be excluded from maximum inflight request handling.")
|
||||||
|
fs.MarkDeprecated("long-running-request-regexp", "regular expression matching of long-running requests is no longer supported")
|
||||||
|
|
||||||
fs.IntVar(&s.MasterCount, "apiserver-count", s.MasterCount,
|
fs.IntVar(&s.MasterCount, "apiserver-count", s.MasterCount,
|
||||||
"The number of apiservers running in the cluster.")
|
"The number of apiservers running in the cluster.")
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -37,47 +36,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/runtime/schema"
|
"k8s.io/kubernetes/pkg/runtime/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLongRunningRequestRegexp(t *testing.T) {
|
|
||||||
regexp := regexp.MustCompile(options.NewServerRunOptions().GenericServerRunOptions.LongRunningRequestRE)
|
|
||||||
dontMatch := []string{
|
|
||||||
"/api/v1/watch-namespace/",
|
|
||||||
"/api/v1/namespace-proxy/",
|
|
||||||
"/api/v1/namespace-watch",
|
|
||||||
"/api/v1/namespace-proxy",
|
|
||||||
"/api/v1/namespace-portforward/pods",
|
|
||||||
"/api/v1/portforward/pods",
|
|
||||||
". anything",
|
|
||||||
"/ that",
|
|
||||||
}
|
|
||||||
doMatch := []string{
|
|
||||||
"/api/v1/pods/watch",
|
|
||||||
"/api/v1/watch/stuff",
|
|
||||||
"/api/v1/default/service/proxy",
|
|
||||||
"/api/v1/pods/proxy/path/to/thing",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/log",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/logs",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/portforward",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/exec",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/attach",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/log/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/logs/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/portforward/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/exec/",
|
|
||||||
"/api/v1/namespaces/myns/pods/mypod/attach/",
|
|
||||||
"/api/v1/watch/namespaces/myns/pods",
|
|
||||||
}
|
|
||||||
for _, path := range dontMatch {
|
|
||||||
if regexp.MatchString(path) {
|
|
||||||
t.Errorf("path should not have match regexp but did: %s", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, path := range doMatch {
|
|
||||||
if !regexp.MatchString(path) {
|
|
||||||
t.Errorf("path should have match regexp did not: %s", path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var securePort = 6443 + 2
|
var securePort = 6443 + 2
|
||||||
var insecurePort = 8080 + 2
|
var insecurePort = 8080 + 2
|
||||||
var serverIP = fmt.Sprintf("http://localhost:%v", insecurePort)
|
var serverIP = fmt.Sprintf("http://localhost:%v", insecurePort)
|
||||||
|
Loading…
Reference in New Issue
Block a user