Update kubernetes vendor to 0.22.5
Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/v1/zz_generated.conversion.go
generated
vendored
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/v1/zz_generated.conversion.go
generated
vendored
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/v1/zz_generated.deepcopy.go
generated
vendored
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/v1/zz_generated.deepcopy.go
generated
vendored
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/v1/zz_generated.defaults.go
generated
vendored
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/v1/zz_generated.defaults.go
generated
vendored
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/zz_generated.deepcopy.go
generated
vendored
1
vendor/k8s.io/client-go/pkg/apis/clientauthentication/zz_generated.deepcopy.go
generated
vendored
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
9
vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go
generated
vendored
9
vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go
generated
vendored
@@ -38,7 +38,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/client-go/pkg/apis/clientauthentication"
|
||||
"k8s.io/client-go/pkg/apis/clientauthentication/install"
|
||||
clientauthenticationv1 "k8s.io/client-go/pkg/apis/clientauthentication/v1"
|
||||
@@ -49,7 +49,6 @@ import (
|
||||
"k8s.io/client-go/transport"
|
||||
"k8s.io/client-go/util/connrotation"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
const execInfoEnv = "KUBERNETES_EXEC_INFO"
|
||||
@@ -317,17 +316,11 @@ func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &roundTripper{}
|
||||
|
||||
type roundTripper struct {
|
||||
a *Authenticator
|
||||
base http.RoundTripper
|
||||
}
|
||||
|
||||
func (r *roundTripper) WrappedRoundTripper() http.RoundTripper {
|
||||
return r.base
|
||||
}
|
||||
|
||||
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
// If a user has already set credentials, use that. This makes commands like
|
||||
// "kubectl get --token (token) pods" work.
|
||||
|
||||
84
vendor/k8s.io/client-go/rest/config.go
generated
vendored
84
vendor/k8s.io/client-go/rest/config.go
generated
vendored
@@ -202,8 +202,6 @@ func (c *Config) String() string {
|
||||
type ImpersonationConfig struct {
|
||||
// UserName is the username to impersonate on each request.
|
||||
UserName string
|
||||
// UID is a unique value that identifies the user.
|
||||
UID string
|
||||
// Groups are the groups to impersonate on each request.
|
||||
Groups []string
|
||||
// Extra is a free-form field which can be used to link some authentication information
|
||||
@@ -305,8 +303,6 @@ type ContentConfig struct {
|
||||
// object. Note that a RESTClient may require fields that are optional when initializing a Client.
|
||||
// A RESTClient created by this method is generic - it expects to operate on an API that follows
|
||||
// the Kubernetes conventions, but may not be the Kubernetes API.
|
||||
// RESTClientFor is equivalent to calling RESTClientForConfigAndClient(config, httpClient),
|
||||
// where httpClient was generated with HTTPClientFor(config).
|
||||
func RESTClientFor(config *Config) (*RESTClient, error) {
|
||||
if config.GroupVersion == nil {
|
||||
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
|
||||
@@ -315,40 +311,24 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
|
||||
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
|
||||
}
|
||||
|
||||
// Validate config.Host before constructing the transport/client so we can fail fast.
|
||||
// ServerURL will be obtained later in RESTClientForConfigAndClient()
|
||||
_, _, err := defaultServerUrlFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpClient, err := HTTPClientFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return RESTClientForConfigAndClient(config, httpClient)
|
||||
}
|
||||
|
||||
// RESTClientForConfigAndClient returns a RESTClient that satisfies the requested attributes on a
|
||||
// client Config object.
|
||||
// Unlike RESTClientFor, RESTClientForConfigAndClient allows to pass an http.Client that is shared
|
||||
// between all the API Groups and Versions.
|
||||
// Note that the http client takes precedence over the transport values configured.
|
||||
// The http client defaults to the `http.DefaultClient` if nil.
|
||||
func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
|
||||
if config.GroupVersion == nil {
|
||||
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
|
||||
}
|
||||
if config.NegotiatedSerializer == nil {
|
||||
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
|
||||
}
|
||||
|
||||
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transport, err := TransportFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var httpClient *http.Client
|
||||
if transport != http.DefaultTransport {
|
||||
httpClient = &http.Client{Transport: transport}
|
||||
if config.Timeout > 0 {
|
||||
httpClient.Timeout = config.Timeout
|
||||
}
|
||||
}
|
||||
|
||||
rateLimiter := config.RateLimiter
|
||||
if rateLimiter == nil {
|
||||
qps := config.QPS
|
||||
@@ -389,33 +369,24 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
|
||||
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
|
||||
}
|
||||
|
||||
// Validate config.Host before constructing the transport/client so we can fail fast.
|
||||
// ServerURL will be obtained later in UnversionedRESTClientForConfigAndClient()
|
||||
_, _, err := defaultServerUrlFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpClient, err := HTTPClientFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return UnversionedRESTClientForConfigAndClient(config, httpClient)
|
||||
}
|
||||
|
||||
// UnversionedRESTClientForConfigAndClient is the same as RESTClientForConfigAndClient,
|
||||
// except that it allows the config.Version to be empty.
|
||||
func UnversionedRESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
|
||||
if config.NegotiatedSerializer == nil {
|
||||
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
|
||||
}
|
||||
|
||||
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transport, err := TransportFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var httpClient *http.Client
|
||||
if transport != http.DefaultTransport {
|
||||
httpClient = &http.Client{Transport: transport}
|
||||
if config.Timeout > 0 {
|
||||
httpClient.Timeout = config.Timeout
|
||||
}
|
||||
}
|
||||
|
||||
rateLimiter := config.RateLimiter
|
||||
if rateLimiter == nil {
|
||||
qps := config.QPS
|
||||
@@ -637,10 +608,9 @@ func CopyConfig(config *Config) *Config {
|
||||
BearerToken: config.BearerToken,
|
||||
BearerTokenFile: config.BearerTokenFile,
|
||||
Impersonate: ImpersonationConfig{
|
||||
UserName: config.Impersonate.UserName,
|
||||
UID: config.Impersonate.UID,
|
||||
Groups: config.Impersonate.Groups,
|
||||
Extra: config.Impersonate.Extra,
|
||||
UserName: config.Impersonate.UserName,
|
||||
},
|
||||
AuthProvider: config.AuthProvider,
|
||||
AuthConfigPersister: config.AuthConfigPersister,
|
||||
|
||||
6
vendor/k8s.io/client-go/rest/request.go
generated
vendored
6
vendor/k8s.io/client-go/rest/request.go
generated
vendored
@@ -39,13 +39,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
||||
utilclock "k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclientwatch "k8s.io/client-go/rest/watch"
|
||||
"k8s.io/client-go/tools/metrics"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -619,12 +619,12 @@ type throttleSettings struct {
|
||||
}
|
||||
|
||||
type throttledLogger struct {
|
||||
clock clock.PassiveClock
|
||||
clock utilclock.PassiveClock
|
||||
settings []*throttleSettings
|
||||
}
|
||||
|
||||
var globalThrottledLogger = &throttledLogger{
|
||||
clock: clock.RealClock{},
|
||||
clock: utilclock.RealClock{},
|
||||
settings: []*throttleSettings{
|
||||
{
|
||||
logLevel: 2,
|
||||
|
||||
22
vendor/k8s.io/client-go/rest/transport.go
generated
vendored
22
vendor/k8s.io/client-go/rest/transport.go
generated
vendored
@@ -26,27 +26,6 @@ import (
|
||||
"k8s.io/client-go/transport"
|
||||
)
|
||||
|
||||
// HTTPClientFor returns an http.Client that will provide the authentication
|
||||
// or transport level security defined by the provided Config. Will return the
|
||||
// default http.DefaultClient if no special case behavior is needed.
|
||||
func HTTPClientFor(config *Config) (*http.Client, error) {
|
||||
transport, err := TransportFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var httpClient *http.Client
|
||||
if transport != http.DefaultTransport || config.Timeout > 0 {
|
||||
httpClient = &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: config.Timeout,
|
||||
}
|
||||
} else {
|
||||
httpClient = http.DefaultClient
|
||||
}
|
||||
|
||||
return httpClient, nil
|
||||
}
|
||||
|
||||
// TLSConfigFor returns a tls.Config that will provide the transport level security defined
|
||||
// by the provided Config. Will return nil if no transport level security is requested.
|
||||
func TLSConfigFor(config *Config) (*tls.Config, error) {
|
||||
@@ -104,7 +83,6 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
|
||||
BearerTokenFile: c.BearerTokenFile,
|
||||
Impersonate: transport.ImpersonationConfig{
|
||||
UserName: c.Impersonate.UserName,
|
||||
UID: c.Impersonate.UID,
|
||||
Groups: c.Impersonate.Groups,
|
||||
Extra: c.Impersonate.Extra,
|
||||
},
|
||||
|
||||
1
vendor/k8s.io/client-go/rest/zz_generated.deepcopy.go
generated
vendored
1
vendor/k8s.io/client-go/rest/zz_generated.deepcopy.go
generated
vendored
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
5
vendor/k8s.io/client-go/tools/clientcmd/api/types.go
generated
vendored
5
vendor/k8s.io/client-go/tools/clientcmd/api/types.go
generated
vendored
@@ -124,10 +124,7 @@ type AuthInfo struct {
|
||||
// Impersonate is the username to act-as.
|
||||
// +optional
|
||||
Impersonate string `json:"act-as,omitempty"`
|
||||
// ImpersonateUID is the uid to impersonate.
|
||||
// +optional
|
||||
ImpersonateUID string `json:"act-as-uid,omitempty"`
|
||||
// ImpersonateGroups is the groups to impersonate.
|
||||
// ImpersonateGroups is the groups to imperonate.
|
||||
// +optional
|
||||
ImpersonateGroups []string `json:"act-as-groups,omitempty"`
|
||||
// ImpersonateUserExtra contains additional information for impersonated user.
|
||||
|
||||
1
vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go
generated
vendored
1
vendor/k8s.io/client-go/tools/clientcmd/api/zz_generated.deepcopy.go
generated
vendored
@@ -1,4 +1,3 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
|
||||
2
vendor/k8s.io/client-go/transport/config.go
generated
vendored
2
vendor/k8s.io/client-go/transport/config.go
generated
vendored
@@ -82,8 +82,6 @@ type Config struct {
|
||||
type ImpersonationConfig struct {
|
||||
// UserName matches user.Info.GetName()
|
||||
UserName string
|
||||
// UID matches user.Info.GetUID()
|
||||
UID string
|
||||
// Groups matches user.Info.GetGroups()
|
||||
Groups []string
|
||||
// Extra matches user.Info.GetExtra()
|
||||
|
||||
121
vendor/k8s.io/client-go/transport/round_trippers.go
generated
vendored
121
vendor/k8s.io/client-go/transport/round_trippers.go
generated
vendored
@@ -17,12 +17,9 @@ limitations under the License.
|
||||
package transport
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptrace"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
@@ -60,7 +57,6 @@ func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTrip
|
||||
rt = NewUserAgentRoundTripper(config.UserAgent, rt)
|
||||
}
|
||||
if len(config.Impersonate.UserName) > 0 ||
|
||||
len(config.Impersonate.UID) > 0 ||
|
||||
len(config.Impersonate.Groups) > 0 ||
|
||||
len(config.Impersonate.Extra) > 0 {
|
||||
rt = NewImpersonatingRoundTripper(config.Impersonate, rt)
|
||||
@@ -72,7 +68,7 @@ func HTTPWrappersForConfig(config *Config, rt http.RoundTripper) (http.RoundTrip
|
||||
func DebugWrappers(rt http.RoundTripper) http.RoundTripper {
|
||||
switch {
|
||||
case bool(klog.V(9).Enabled()):
|
||||
rt = NewDebuggingRoundTripper(rt, DebugCurlCommand, DebugDetailedTiming, DebugResponseHeaders)
|
||||
rt = NewDebuggingRoundTripper(rt, DebugCurlCommand, DebugURLTiming, DebugResponseHeaders)
|
||||
case bool(klog.V(8).Enabled()):
|
||||
rt = NewDebuggingRoundTripper(rt, DebugJustURL, DebugRequestHeaders, DebugResponseStatus, DebugResponseHeaders)
|
||||
case bool(klog.V(7).Enabled()):
|
||||
@@ -92,8 +88,6 @@ type authProxyRoundTripper struct {
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &authProxyRoundTripper{}
|
||||
|
||||
// NewAuthProxyRoundTripper provides a roundtripper which will add auth proxy fields to requests for
|
||||
// authentication terminating proxy cases
|
||||
// assuming you pull the user from the context:
|
||||
@@ -152,8 +146,6 @@ type userAgentRoundTripper struct {
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &userAgentRoundTripper{}
|
||||
|
||||
// NewUserAgentRoundTripper will add User-Agent header to a request unless it has already been set.
|
||||
func NewUserAgentRoundTripper(agent string, rt http.RoundTripper) http.RoundTripper {
|
||||
return &userAgentRoundTripper{agent, rt}
|
||||
@@ -180,8 +172,6 @@ type basicAuthRoundTripper struct {
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &basicAuthRoundTripper{}
|
||||
|
||||
// NewBasicAuthRoundTripper will apply a BASIC auth authorization header to a
|
||||
// request unless it has already been set.
|
||||
func NewBasicAuthRoundTripper(username, password string, rt http.RoundTripper) http.RoundTripper {
|
||||
@@ -209,9 +199,6 @@ const (
|
||||
// ImpersonateUserHeader is used to impersonate a particular user during an API server request
|
||||
ImpersonateUserHeader = "Impersonate-User"
|
||||
|
||||
// ImpersonateUIDHeader is used to impersonate a particular UID during an API server request
|
||||
ImpersonateUIDHeader = "Impersonate-Uid"
|
||||
|
||||
// ImpersonateGroupHeader is used to impersonate a particular group during an API server request.
|
||||
// It can be repeated multiplied times for multiple groups.
|
||||
ImpersonateGroupHeader = "Impersonate-Group"
|
||||
@@ -231,8 +218,6 @@ type impersonatingRoundTripper struct {
|
||||
delegate http.RoundTripper
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &impersonatingRoundTripper{}
|
||||
|
||||
// NewImpersonatingRoundTripper will add an Act-As header to a request unless it has already been set.
|
||||
func NewImpersonatingRoundTripper(impersonate ImpersonationConfig, delegate http.RoundTripper) http.RoundTripper {
|
||||
return &impersonatingRoundTripper{impersonate, delegate}
|
||||
@@ -245,9 +230,7 @@ func (rt *impersonatingRoundTripper) RoundTrip(req *http.Request) (*http.Respons
|
||||
}
|
||||
req = utilnet.CloneRequest(req)
|
||||
req.Header.Set(ImpersonateUserHeader, rt.impersonate.UserName)
|
||||
if rt.impersonate.UID != "" {
|
||||
req.Header.Set(ImpersonateUIDHeader, rt.impersonate.UID)
|
||||
}
|
||||
|
||||
for _, group := range rt.impersonate.Groups {
|
||||
req.Header.Add(ImpersonateGroupHeader, group)
|
||||
}
|
||||
@@ -272,8 +255,6 @@ type bearerAuthRoundTripper struct {
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &bearerAuthRoundTripper{}
|
||||
|
||||
// NewBearerAuthRoundTripper adds the provided bearer token to a request
|
||||
// unless the authorization header has already been set.
|
||||
func NewBearerAuthRoundTripper(bearer string, rt http.RoundTripper) http.RoundTripper {
|
||||
@@ -333,14 +314,6 @@ type requestInfo struct {
|
||||
ResponseHeaders http.Header
|
||||
ResponseErr error
|
||||
|
||||
muTrace sync.Mutex // Protect trace fields
|
||||
DNSLookup time.Duration
|
||||
Dialing time.Duration
|
||||
GetConnection time.Duration
|
||||
TLSHandshake time.Duration
|
||||
ServerProcessing time.Duration
|
||||
ConnectionReused bool
|
||||
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
@@ -383,8 +356,6 @@ type debuggingRoundTripper struct {
|
||||
levels map[DebugLevel]bool
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &debuggingRoundTripper{}
|
||||
|
||||
// DebugLevel is used to enable debugging of certain
|
||||
// HTTP requests and responses fields via the debuggingRoundTripper.
|
||||
type DebugLevel int
|
||||
@@ -403,8 +374,6 @@ const (
|
||||
DebugResponseStatus
|
||||
// DebugResponseHeaders will add to the debug output the HTTP response headers.
|
||||
DebugResponseHeaders
|
||||
// DebugDetailedTiming will add to the debug output the duration of the HTTP requests events.
|
||||
DebugDetailedTiming
|
||||
)
|
||||
|
||||
// NewDebuggingRoundTripper allows to display in the logs output debug information
|
||||
@@ -476,74 +445,6 @@ func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
if rt.levels[DebugDetailedTiming] {
|
||||
var getConn, dnsStart, dialStart, tlsStart, serverStart time.Time
|
||||
var host string
|
||||
trace := &httptrace.ClientTrace{
|
||||
// DNS
|
||||
DNSStart: func(info httptrace.DNSStartInfo) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
dnsStart = time.Now()
|
||||
host = info.Host
|
||||
},
|
||||
DNSDone: func(info httptrace.DNSDoneInfo) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
reqInfo.DNSLookup = time.Now().Sub(dnsStart)
|
||||
klog.Infof("HTTP Trace: DNS Lookup for %s resolved to %v", host, info.Addrs)
|
||||
},
|
||||
// Dial
|
||||
ConnectStart: func(network, addr string) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
dialStart = time.Now()
|
||||
},
|
||||
ConnectDone: func(network, addr string, err error) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
reqInfo.Dialing = time.Now().Sub(dialStart)
|
||||
if err != nil {
|
||||
klog.Infof("HTTP Trace: Dial to %s:%s failed: %v", network, addr, err)
|
||||
} else {
|
||||
klog.Infof("HTTP Trace: Dial to %s:%s succeed", network, addr)
|
||||
}
|
||||
},
|
||||
// TLS
|
||||
TLSHandshakeStart: func() {
|
||||
tlsStart = time.Now()
|
||||
},
|
||||
TLSHandshakeDone: func(_ tls.ConnectionState, _ error) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
reqInfo.TLSHandshake = time.Now().Sub(tlsStart)
|
||||
},
|
||||
// Connection (it can be DNS + Dial or just the time to get one from the connection pool)
|
||||
GetConn: func(hostPort string) {
|
||||
getConn = time.Now()
|
||||
},
|
||||
GotConn: func(info httptrace.GotConnInfo) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
reqInfo.GetConnection = time.Now().Sub(getConn)
|
||||
reqInfo.ConnectionReused = info.Reused
|
||||
},
|
||||
// Server Processing (time since we wrote the request until first byte is received)
|
||||
WroteRequest: func(info httptrace.WroteRequestInfo) {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
serverStart = time.Now()
|
||||
},
|
||||
GotFirstResponseByte: func() {
|
||||
reqInfo.muTrace.Lock()
|
||||
defer reqInfo.muTrace.Unlock()
|
||||
reqInfo.ServerProcessing = time.Now().Sub(serverStart)
|
||||
},
|
||||
}
|
||||
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
|
||||
}
|
||||
|
||||
response, err := rt.delegatedRoundTripper.RoundTrip(req)
|
||||
reqInfo.Duration = time.Since(startTime)
|
||||
|
||||
@@ -552,24 +453,6 @@ func (rt *debuggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, e
|
||||
if rt.levels[DebugURLTiming] {
|
||||
klog.Infof("%s %s %s in %d milliseconds", reqInfo.RequestVerb, reqInfo.RequestURL, reqInfo.ResponseStatus, reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
||||
}
|
||||
if rt.levels[DebugDetailedTiming] {
|
||||
stats := ""
|
||||
if !reqInfo.ConnectionReused {
|
||||
stats += fmt.Sprintf(`DNSLookup %d ms Dial %d ms TLSHandshake %d ms`,
|
||||
reqInfo.DNSLookup.Nanoseconds()/int64(time.Millisecond),
|
||||
reqInfo.Dialing.Nanoseconds()/int64(time.Millisecond),
|
||||
reqInfo.TLSHandshake.Nanoseconds()/int64(time.Millisecond),
|
||||
)
|
||||
} else {
|
||||
stats += fmt.Sprintf(`GetConnection %d ms`, reqInfo.GetConnection.Nanoseconds()/int64(time.Millisecond))
|
||||
}
|
||||
if reqInfo.ServerProcessing != 0 {
|
||||
stats += fmt.Sprintf(` ServerProcessing %d ms`, reqInfo.ServerProcessing.Nanoseconds()/int64(time.Millisecond))
|
||||
}
|
||||
stats += fmt.Sprintf(` Duration %d ms`, reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
||||
klog.Infof("HTTP Statistics: %s", stats)
|
||||
}
|
||||
|
||||
if rt.levels[DebugResponseStatus] {
|
||||
klog.Infof("Response Status: %s in %d milliseconds", reqInfo.ResponseStatus, reqInfo.Duration.Nanoseconds()/int64(time.Millisecond))
|
||||
}
|
||||
|
||||
5
vendor/k8s.io/client-go/transport/token_source.go
generated
vendored
5
vendor/k8s.io/client-go/transport/token_source.go
generated
vendored
@@ -26,7 +26,6 @@ import (
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@@ -96,8 +95,6 @@ type tokenSourceTransport struct {
|
||||
src ResettableTokenSource
|
||||
}
|
||||
|
||||
var _ utilnet.RoundTripperWrapper = &tokenSourceTransport{}
|
||||
|
||||
func (tst *tokenSourceTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
// This is to allow --token to override other bearer token providers.
|
||||
if req.Header.Get("Authorization") != "" {
|
||||
@@ -122,8 +119,6 @@ func (tst *tokenSourceTransport) CancelRequest(req *http.Request) {
|
||||
tryCancelRequest(tst.ort, req)
|
||||
}
|
||||
|
||||
func (tst *tokenSourceTransport) WrappedRoundTripper() http.RoundTripper { return tst.base }
|
||||
|
||||
type fileTokenSource struct {
|
||||
path string
|
||||
period time.Duration
|
||||
|
||||
3
vendor/k8s.io/client-go/util/cert/cert.go
generated
vendored
3
vendor/k8s.io/client-go/util/cert/cert.go
generated
vendored
@@ -33,7 +33,6 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/keyutil"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
const duration365d = time.Hour * 24 * 365
|
||||
@@ -158,7 +157,7 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
|
||||
BasicConstraintsValid: true,
|
||||
}
|
||||
|
||||
if ip := netutils.ParseIPSloppy(host); ip != nil {
|
||||
if ip := net.ParseIP(host); ip != nil {
|
||||
template.IPAddresses = append(template.IPAddresses, ip)
|
||||
} else {
|
||||
template.DNSNames = append(template.DNSNames, host)
|
||||
|
||||
52
vendor/k8s.io/client-go/util/flowcontrol/backoff.go
generated
vendored
52
vendor/k8s.io/client-go/util/flowcontrol/backoff.go
generated
vendored
@@ -17,12 +17,10 @@ limitations under the License.
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/utils/integer"
|
||||
)
|
||||
|
||||
@@ -37,43 +35,23 @@ type Backoff struct {
|
||||
defaultDuration time.Duration
|
||||
maxDuration time.Duration
|
||||
perItemBackoff map[string]*backoffEntry
|
||||
rand *rand.Rand
|
||||
|
||||
// maxJitterFactor adds jitter to the exponentially backed off delay.
|
||||
// if maxJitterFactor is zero, no jitter is added to the delay in
|
||||
// order to maintain current behavior.
|
||||
maxJitterFactor float64
|
||||
}
|
||||
|
||||
func NewFakeBackOff(initial, max time.Duration, tc *testingclock.FakeClock) *Backoff {
|
||||
return newBackoff(tc, initial, max, 0.0)
|
||||
func NewFakeBackOff(initial, max time.Duration, tc *clock.FakeClock) *Backoff {
|
||||
return &Backoff{
|
||||
perItemBackoff: map[string]*backoffEntry{},
|
||||
Clock: tc,
|
||||
defaultDuration: initial,
|
||||
maxDuration: max,
|
||||
}
|
||||
}
|
||||
|
||||
func NewBackOff(initial, max time.Duration) *Backoff {
|
||||
return NewBackOffWithJitter(initial, max, 0.0)
|
||||
}
|
||||
|
||||
func NewFakeBackOffWithJitter(initial, max time.Duration, tc *testingclock.FakeClock, maxJitterFactor float64) *Backoff {
|
||||
return newBackoff(tc, initial, max, maxJitterFactor)
|
||||
}
|
||||
|
||||
func NewBackOffWithJitter(initial, max time.Duration, maxJitterFactor float64) *Backoff {
|
||||
clock := clock.RealClock{}
|
||||
return newBackoff(clock, initial, max, maxJitterFactor)
|
||||
}
|
||||
|
||||
func newBackoff(clock clock.Clock, initial, max time.Duration, maxJitterFactor float64) *Backoff {
|
||||
var random *rand.Rand
|
||||
if maxJitterFactor > 0 {
|
||||
random = rand.New(rand.NewSource(clock.Now().UnixNano()))
|
||||
}
|
||||
return &Backoff{
|
||||
perItemBackoff: map[string]*backoffEntry{},
|
||||
Clock: clock,
|
||||
Clock: clock.RealClock{},
|
||||
defaultDuration: initial,
|
||||
maxDuration: max,
|
||||
maxJitterFactor: maxJitterFactor,
|
||||
rand: random,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,10 +74,8 @@ func (p *Backoff) Next(id string, eventTime time.Time) {
|
||||
entry, ok := p.perItemBackoff[id]
|
||||
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
|
||||
entry = p.initEntryUnsafe(id)
|
||||
entry.backoff += p.jitter(entry.backoff)
|
||||
} else {
|
||||
delay := entry.backoff * 2 // exponential
|
||||
delay += p.jitter(entry.backoff) // add some jitter to the delay
|
||||
delay := entry.backoff * 2 // exponential
|
||||
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
|
||||
}
|
||||
entry.lastUpdate = p.Clock.Now()
|
||||
@@ -167,14 +143,6 @@ func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
|
||||
return entry
|
||||
}
|
||||
|
||||
func (p *Backoff) jitter(delay time.Duration) time.Duration {
|
||||
if p.rand == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay))
|
||||
}
|
||||
|
||||
// After 2*maxDuration we restart the backoff factor to the beginning
|
||||
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
|
||||
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
|
||||
|
||||
99
vendor/k8s.io/client-go/util/flowcontrol/throttle.go
generated
vendored
99
vendor/k8s.io/client-go/util/flowcontrol/throttle.go
generated
vendored
@@ -23,36 +23,26 @@ import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
type PassiveRateLimiter interface {
|
||||
type RateLimiter interface {
|
||||
// TryAccept returns true if a token is taken immediately. Otherwise,
|
||||
// it returns false.
|
||||
TryAccept() bool
|
||||
// Accept returns once a token becomes available.
|
||||
Accept()
|
||||
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
|
||||
Stop()
|
||||
// QPS returns QPS of this rate limiter
|
||||
QPS() float32
|
||||
}
|
||||
|
||||
type RateLimiter interface {
|
||||
PassiveRateLimiter
|
||||
// Accept returns once a token becomes available.
|
||||
Accept()
|
||||
// Wait returns nil if a token is taken before the Context is done.
|
||||
Wait(ctx context.Context) error
|
||||
}
|
||||
|
||||
type tokenBucketPassiveRateLimiter struct {
|
||||
limiter *rate.Limiter
|
||||
qps float32
|
||||
clock clock.PassiveClock
|
||||
}
|
||||
|
||||
type tokenBucketRateLimiter struct {
|
||||
tokenBucketPassiveRateLimiter
|
||||
clock Clock
|
||||
limiter *rate.Limiter
|
||||
clock Clock
|
||||
qps float32
|
||||
}
|
||||
|
||||
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
|
||||
@@ -62,73 +52,58 @@ type tokenBucketRateLimiter struct {
|
||||
// The maximum number of tokens in the bucket is capped at 'burst'.
|
||||
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
|
||||
}
|
||||
|
||||
// NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns
|
||||
// a PassiveRateLimiter which does not have Accept() and Wait() methods.
|
||||
func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
|
||||
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
|
||||
}
|
||||
|
||||
// An injectable, mockable clock interface.
|
||||
type Clock interface {
|
||||
clock.PassiveClock
|
||||
Now() time.Time
|
||||
Sleep(time.Duration)
|
||||
}
|
||||
|
||||
var _ Clock = (*clock.RealClock)(nil)
|
||||
type realClock struct{}
|
||||
|
||||
func (realClock) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
func (realClock) Sleep(d time.Duration) {
|
||||
time.Sleep(d)
|
||||
}
|
||||
|
||||
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
|
||||
// but allows an injectable clock, for testing.
|
||||
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiterWithClock(limiter, c, qps)
|
||||
return newTokenBucketRateLimiter(limiter, c, qps)
|
||||
}
|
||||
|
||||
// NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
|
||||
// except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
|
||||
// and uses a PassiveClock.
|
||||
func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
|
||||
limiter := rate.NewLimiter(rate.Limit(qps), burst)
|
||||
return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
|
||||
}
|
||||
|
||||
func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
|
||||
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
|
||||
return &tokenBucketRateLimiter{
|
||||
tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
|
||||
clock: c,
|
||||
}
|
||||
}
|
||||
|
||||
func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
|
||||
return &tokenBucketPassiveRateLimiter{
|
||||
limiter: limiter,
|
||||
qps: qps,
|
||||
clock: c,
|
||||
qps: qps,
|
||||
}
|
||||
}
|
||||
|
||||
func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
|
||||
}
|
||||
|
||||
func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
|
||||
return tbprl.qps
|
||||
}
|
||||
|
||||
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
|
||||
return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
|
||||
func (t *tokenBucketRateLimiter) TryAccept() bool {
|
||||
return t.limiter.AllowN(t.clock.Now(), 1)
|
||||
}
|
||||
|
||||
// Accept will block until a token becomes available
|
||||
func (tbrl *tokenBucketRateLimiter) Accept() {
|
||||
now := tbrl.clock.Now()
|
||||
tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
|
||||
func (t *tokenBucketRateLimiter) Accept() {
|
||||
now := t.clock.Now()
|
||||
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
|
||||
}
|
||||
|
||||
func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
|
||||
return tbrl.limiter.Wait(ctx)
|
||||
func (t *tokenBucketRateLimiter) Stop() {
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) QPS() float32 {
|
||||
return t.qps
|
||||
}
|
||||
|
||||
func (t *tokenBucketRateLimiter) Wait(ctx context.Context) error {
|
||||
return t.limiter.Wait(ctx)
|
||||
}
|
||||
|
||||
type fakeAlwaysRateLimiter struct{}
|
||||
@@ -182,11 +157,3 @@ func (t *fakeNeverRateLimiter) QPS() float32 {
|
||||
func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
|
||||
return errors.New("can not be accept")
|
||||
}
|
||||
|
||||
var (
|
||||
_ RateLimiter = (*tokenBucketRateLimiter)(nil)
|
||||
_ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
|
||||
_ RateLimiter = (*fakeNeverRateLimiter)(nil)
|
||||
)
|
||||
|
||||
var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)
|
||||
|
||||
29
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
generated
vendored
29
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
generated
vendored
@@ -27,7 +27,7 @@ import (
|
||||
type RateLimiter interface {
|
||||
// When gets an item and gets to decide how long that item should wait
|
||||
When(item interface{}) time.Duration
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
|
||||
// or for success, we'll stop tracking it
|
||||
Forget(item interface{})
|
||||
// NumRequeues returns back how many failures the item has had
|
||||
@@ -209,30 +209,3 @@ func (r *MaxOfRateLimiter) Forget(item interface{}) {
|
||||
limiter.Forget(item)
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
|
||||
type WithMaxWaitRateLimiter struct {
|
||||
limiter RateLimiter
|
||||
maxDelay time.Duration
|
||||
}
|
||||
|
||||
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
|
||||
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
|
||||
}
|
||||
|
||||
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
|
||||
delay := w.limiter.When(item)
|
||||
if delay > w.maxDelay {
|
||||
return w.maxDelay
|
||||
}
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
|
||||
w.limiter.Forget(item)
|
||||
}
|
||||
|
||||
func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
|
||||
return w.limiter.NumRequeues(item)
|
||||
}
|
||||
|
||||
6
vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
generated
vendored
6
vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
generated
vendored
@@ -21,8 +21,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||
@@ -51,11 +51,11 @@ func NewNamedDelayingQueue(name string) DelayingInterface {
|
||||
|
||||
// NewDelayingQueueWithCustomClock constructs a new named workqueue
|
||||
// with ability to inject real or fake clock for testing purposes
|
||||
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
|
||||
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
|
||||
return newDelayingQueue(clock, NewNamed(name), name)
|
||||
}
|
||||
|
||||
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
|
||||
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
|
||||
ret := &delayingType{
|
||||
Interface: q,
|
||||
clock: clock,
|
||||
|
||||
2
vendor/k8s.io/client-go/util/workqueue/metrics.go
generated
vendored
2
vendor/k8s.io/client-go/util/workqueue/metrics.go
generated
vendored
@@ -20,7 +20,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
// This file provides abstractions for setting the provider (e.g., prometheus)
|
||||
|
||||
81
vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
81
vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
@@ -20,7 +20,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
type Interface interface {
|
||||
@@ -29,7 +29,6 @@ type Interface interface {
|
||||
Get() (item interface{}, shutdown bool)
|
||||
Done(item interface{})
|
||||
ShutDown()
|
||||
ShutDownWithDrain()
|
||||
ShuttingDown() bool
|
||||
}
|
||||
|
||||
@@ -47,7 +46,7 @@ func NewNamed(name string) *Type {
|
||||
)
|
||||
}
|
||||
|
||||
func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
|
||||
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
|
||||
t := &Type{
|
||||
clock: c,
|
||||
dirty: set{},
|
||||
@@ -87,12 +86,11 @@ type Type struct {
|
||||
cond *sync.Cond
|
||||
|
||||
shuttingDown bool
|
||||
drain bool
|
||||
|
||||
metrics queueMetrics
|
||||
|
||||
unfinishedWorkUpdatePeriod time.Duration
|
||||
clock clock.WithTicker
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
@@ -112,10 +110,6 @@ func (s set) delete(item t) {
|
||||
delete(s, item)
|
||||
}
|
||||
|
||||
func (s set) len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
// Add marks item as needing processing.
|
||||
func (q *Type) Add(item interface{}) {
|
||||
q.cond.L.Lock()
|
||||
@@ -161,10 +155,7 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
item = q.queue[0]
|
||||
// The underlying array still exists and reference this object, so the object will not be garbage collected.
|
||||
q.queue[0] = nil
|
||||
q.queue = q.queue[1:]
|
||||
item, q.queue = q.queue[0], q.queue[1:]
|
||||
|
||||
q.metrics.get(item)
|
||||
|
||||
@@ -187,71 +178,13 @@ func (q *Type) Done(item interface{}) {
|
||||
if q.dirty.has(item) {
|
||||
q.queue = append(q.queue, item)
|
||||
q.cond.Signal()
|
||||
} else if q.processing.len() == 0 {
|
||||
q.cond.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
// ShutDown will cause q to ignore all new items added to it and
|
||||
// immediately instruct the worker goroutines to exit.
|
||||
// ShutDown will cause q to ignore all new items added to it. As soon as the
|
||||
// worker goroutines have drained the existing items in the queue, they will be
|
||||
// instructed to exit.
|
||||
func (q *Type) ShutDown() {
|
||||
q.setDrain(false)
|
||||
q.shutdown()
|
||||
}
|
||||
|
||||
// ShutDownWithDrain will cause q to ignore all new items added to it. As soon
|
||||
// as the worker goroutines have "drained", i.e: finished processing and called
|
||||
// Done on all existing items in the queue; they will be instructed to exit and
|
||||
// ShutDownWithDrain will return. Hence: a strict requirement for using this is;
|
||||
// your workers must ensure that Done is called on all items in the queue once
|
||||
// the shut down has been initiated, if that is not the case: this will block
|
||||
// indefinitely. It is, however, safe to call ShutDown after having called
|
||||
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
|
||||
// without waiting for the drainage.
|
||||
func (q *Type) ShutDownWithDrain() {
|
||||
q.setDrain(true)
|
||||
q.shutdown()
|
||||
for q.isProcessing() && q.shouldDrain() {
|
||||
q.waitForProcessing()
|
||||
}
|
||||
}
|
||||
|
||||
// isProcessing indicates if there are still items on the work queue being
|
||||
// processed. It's used to drain the work queue on an eventual shutdown.
|
||||
func (q *Type) isProcessing() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
return q.processing.len() != 0
|
||||
}
|
||||
|
||||
// waitForProcessing waits for the worker goroutines to finish processing items
|
||||
// and call Done on them.
|
||||
func (q *Type) waitForProcessing() {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
// Ensure that we do not wait on a queue which is already empty, as that
|
||||
// could result in waiting for Done to be called on items in an empty queue
|
||||
// which has already been shut down, which will result in waiting
|
||||
// indefinitely.
|
||||
if q.processing.len() == 0 {
|
||||
return
|
||||
}
|
||||
q.cond.Wait()
|
||||
}
|
||||
|
||||
func (q *Type) setDrain(shouldDrain bool) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
q.drain = shouldDrain
|
||||
}
|
||||
|
||||
func (q *Type) shouldDrain() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
return q.drain
|
||||
}
|
||||
|
||||
func (q *Type) shutdown() {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
q.shuttingDown = true
|
||||
|
||||
Reference in New Issue
Block a user