Use cri streaming pkg from k8s staging

Use staging k8s.io/kubelet/cri/streaming package

Signed-off-by: Aditi Sharma <adi.sky17@gmail.com>
This commit is contained in:
Aditi Sharma
2023-10-17 11:18:20 +05:30
parent aef2ebc76a
commit 03d81f595f
275 changed files with 7417 additions and 31858 deletions

View File

@@ -43,7 +43,8 @@ var (
gitMinor string = "" // minor version, numeric possibly followed by "+"
// semantic version, derived by build scripts (see
// https://git.k8s.io/community/contributors/design-proposals/release/versioning.md
// https://github.com/kubernetes/sig-release/blob/master/release-engineering/versioning.md#kubernetes-release-versioning
// https://kubernetes.io/releases/version-skew-policy/
// for a detailed discussion of this field)
//
// TODO: This field is still called "gitVersion" for legacy

View File

@@ -32,12 +32,12 @@ import (
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"golang.org/x/term"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/dump"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/pkg/apis/clientauthentication"
"k8s.io/client-go/pkg/apis/clientauthentication/install"
@@ -81,8 +81,6 @@ func newCache() *cache {
return &cache{m: make(map[string]*Authenticator)}
}
var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) string {
key := struct {
conf *api.ExecConfig
@@ -91,7 +89,7 @@ func cacheKey(conf *api.ExecConfig, cluster *clientauthentication.Cluster) strin
conf: conf,
cluster: cluster,
}
return spewConfig.Sprint(key)
return dump.Pretty(key)
}
type cache struct {

View File

@@ -52,8 +52,7 @@ type Interface interface {
// ClientContentConfig controls how RESTClient communicates with the server.
//
// TODO: ContentConfig will be updated to accept a Negotiator instead of a
//
// NegotiatedSerializer and NegotiatedSerializer will be removed.
// NegotiatedSerializer and NegotiatedSerializer will be removed.
type ClientContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header

View File

@@ -316,7 +316,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
// Validate config.Host before constructing the transport/client so we can fail fast.
// ServerURL will be obtained later in RESTClientForConfigAndClient()
_, _, err := defaultServerUrlFor(config)
_, _, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -343,7 +343,7 @@ func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RES
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -390,7 +390,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
// Validate config.Host before constructing the transport/client so we can fail fast.
// ServerURL will be obtained later in UnversionedRESTClientForConfigAndClient()
_, _, err := defaultServerUrlFor(config)
_, _, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -410,7 +410,7 @@ func UnversionedRESTClientForConfigAndClient(config *Config, httpClient *http.Cl
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
@@ -548,7 +548,7 @@ func InClusterConfig() (*Config, error) {
// Note: the Insecure flag is ignored when testing for this value, so MITM attacks are
// still possible.
func IsConfigTransportTLS(config Config) bool {
baseURL, _, err := defaultServerUrlFor(&config)
baseURL, _, err := DefaultServerUrlFor(&config)
if err != nil {
return false
}

View File

@@ -24,6 +24,7 @@ import (
"io"
"mime"
"net/http"
"net/http/httptrace"
"net/url"
"os"
"path"
@@ -481,7 +482,13 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
// URL returns the current working URL.
// Error returns any error encountered constructing the request, if any.
func (r *Request) Error() error {
return r.err
}
// URL returns the current working URL. Check the result of Error() to ensure
// that the returned URL is valid.
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
@@ -726,7 +733,6 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
@@ -786,22 +792,36 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
), nil
}
// updateURLMetrics is a convenience function for pushing metrics.
// It also handles corner cases for incomplete/invalid request data.
func updateURLMetrics(ctx context.Context, req *Request, resp *http.Response, err error) {
url := "none"
// updateRequestResultMetric increments the RequestResult metric counter,
// it should be called with the (response, err) tuple from the final
// reply from the server.
func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestResult.Increment(ctx, code, req.verb, host)
}
// updateRequestRetryMetric increments the RequestRetry metric counter,
// it should be called with the (response, err) tuple for each retry
// except for the final attempt.
func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
}
func sanitize(req *Request, resp *http.Response, err error) (string, string) {
host := "none"
if req.c.base != nil {
url = req.c.base.Host
host = req.c.base.Host
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
// system so we just report them as `<error>`.
if err != nil {
metrics.RequestResult.Increment(ctx, "<error>", req.verb, url)
} else {
// Metrics for failure codes
metrics.RequestResult.Increment(ctx, strconv.Itoa(resp.StatusCode), req.verb, url)
code := "<error>"
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
}
return code, host
}
// Stream formats and executes the request, and offers streaming of the response.
@@ -834,7 +854,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
return nil, err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err)
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
@@ -907,15 +926,38 @@ func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, body)
req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, newDNSMetricsTrace(ctx)), r.verb, url, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
return req, nil
}
// newDNSMetricsTrace returns an HTTP trace that tracks time spent on DNS lookups per host.
// This metric is available in client as "rest_client_dns_resolution_duration_seconds".
func newDNSMetricsTrace(ctx context.Context) *httptrace.ClientTrace {
type dnsMetric struct {
start time.Time
host string
sync.Mutex
}
dns := &dnsMetric{}
return &httptrace.ClientTrace{
DNSStart: func(info httptrace.DNSStartInfo) {
dns.Lock()
defer dns.Unlock()
dns.start = time.Now()
dns.host = info.Host
},
DNSDone: func(info httptrace.DNSDoneInfo) {
dns.Lock()
defer dns.Unlock()
metrics.ResolverLatency.Observe(ctx, dns.host, time.Since(dns.start))
},
}
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
@@ -979,7 +1021,6 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {

View File

@@ -77,9 +77,9 @@ func DefaultVersionedAPIPath(apiPath string, groupVersion schema.GroupVersion) s
return versionedAPIPath
}
// defaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor. It
// DefaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor. It
// requires Host and Version to be set prior to being called.
func defaultServerUrlFor(config *Config) (*url.URL, string, error) {
func DefaultServerUrlFor(config *Config) (*url.URL, string, error) {
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0

View File

@@ -242,8 +242,20 @@ func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Resp
// parameters calculated from the (response, err) tuple from
// attempt N-1, so r.retryAfter is outdated and should not be
// referred to here.
isRetry := r.retryAfter != nil
r.retryAfter = nil
// the client finishes a single request after N attempts (1..N)
// - all attempts (1..N) are counted to the rest_client_requests_total
// metric (current behavior).
// - every attempt after the first (2..N) are counted to the
// rest_client_request_retries_total metric.
updateRequestResultMetric(ctx, request, resp, err)
if isRetry {
// this is attempt 2 or later
updateRequestRetryMetric(ctx, request, resp, err)
}
if request.c.base != nil {
if err != nil {
request.backoff.UpdateBackoff(request.URL(), err, 0)
@@ -346,8 +358,12 @@ func retryAfterResponse() *http.Response {
}
func retryAfterResponseWithDelay(delay string) *http.Response {
return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay)
}
func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response {
return &http.Response{
StatusCode: http.StatusInternalServerError,
StatusCode: code,
Header: http.Header{"Retry-After": []string{delay}},
}
}

View File

@@ -67,7 +67,7 @@ type Preferences struct {
type Cluster struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
// +k8s:conversion-gen=false
LocationOfOrigin string
LocationOfOrigin string `json:"-"`
// Server is the address of the kubernetes cluster (https://hostname:port).
Server string `json:"server"`
// TLSServerName is used to check server certificate. If TLSServerName is empty, the hostname used to contact the server is used.
@@ -107,7 +107,7 @@ type Cluster struct {
type AuthInfo struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
// +k8s:conversion-gen=false
LocationOfOrigin string
LocationOfOrigin string `json:"-"`
// ClientCertificate is the path to a client cert file for TLS.
// +optional
ClientCertificate string `json:"client-certificate,omitempty"`
@@ -159,7 +159,7 @@ type AuthInfo struct {
type Context struct {
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
// +k8s:conversion-gen=false
LocationOfOrigin string
LocationOfOrigin string `json:"-"`
// Cluster is the name of the cluster for this context
Cluster string `json:"cluster"`
// AuthInfo is the name of the authInfo for this context
@@ -252,7 +252,7 @@ type ExecConfig struct {
// recommended as one of the prime benefits of exec plugins is that no secrets need
// to be stored directly in the kubeconfig.
// +k8s:conversion-gen=false
Config runtime.Object
Config runtime.Object `json:"-"`
// InteractiveMode determines this plugin's relationship with standard input. Valid
// values are "Never" (this exec plugin never uses standard input), "IfAvailable" (this
@@ -264,7 +264,7 @@ type ExecConfig struct {
// client.authentication.k8s.io/v1beta1, then this field is optional and defaults
// to "IfAvailable" when unset. Otherwise, this field is required.
// +optional
InteractiveMode ExecInteractiveMode
InteractiveMode ExecInteractiveMode `json:"interactiveMode,omitempty"`
// StdinUnavailable indicates whether the exec authenticator can pass standard
// input through to this exec plugin. For example, a higher level entity might be using
@@ -272,14 +272,14 @@ type ExecConfig struct {
// plugin to use standard input. This is kept here in order to keep all of the exec configuration
// together, but it is never serialized.
// +k8s:conversion-gen=false
StdinUnavailable bool
StdinUnavailable bool `json:"-"`
// StdinUnavailableMessage is an optional message to be displayed when the exec authenticator
// cannot successfully run this exec plugin because it needs to use standard input and
// StdinUnavailable is true. For example, a process that is already using standard input to
// read user instructions might set this to "used by my-program to read user instructions".
// +k8s:conversion-gen=false
StdinUnavailableMessage string
StdinUnavailableMessage string `json:"-"`
}
var _ fmt.Stringer = new(ExecConfig)

View File

@@ -42,6 +42,10 @@ type LatencyMetric interface {
Observe(ctx context.Context, verb string, u url.URL, latency time.Duration)
}
type ResolverLatencyMetric interface {
Observe(ctx context.Context, host string, latency time.Duration)
}
// SizeMetric observes client response size partitioned by verb and host.
type SizeMetric interface {
Observe(ctx context.Context, verb string, host string, size float64)
@@ -58,6 +62,23 @@ type CallsMetric interface {
Increment(exitCode int, callStatus string)
}
// RetryMetric counts the number of retries sent to the server
// partitioned by code, method, and host.
type RetryMetric interface {
IncrementRetry(ctx context.Context, code string, method string, host string)
}
// TransportCacheMetric shows the number of entries in the internal transport cache
type TransportCacheMetric interface {
Observe(value int)
}
// TransportCreateCallsMetric counts the number of times a transport is created
// partitioned by the result of the cache: hit, miss, uncacheable
type TransportCreateCallsMetric interface {
Increment(result string)
}
var (
// ClientCertExpiry is the expiry time of a client certificate
ClientCertExpiry ExpiryMetric = noopExpiry{}
@@ -65,6 +86,8 @@ var (
ClientCertRotationAge DurationMetric = noopDuration{}
// RequestLatency is the latency metric that rest clients will update.
RequestLatency LatencyMetric = noopLatency{}
// ResolverLatency is the latency metric that DNS resolver will update
ResolverLatency ResolverLatencyMetric = noopResolverLatency{}
// RequestSize is the request size metric that rest clients will update.
RequestSize SizeMetric = noopSize{}
// ResponseSize is the response size metric that rest clients will update.
@@ -76,6 +99,15 @@ var (
// ExecPluginCalls is the number of calls made to an exec plugin, partitioned by
// exit code and call status.
ExecPluginCalls CallsMetric = noopCalls{}
// RequestRetry is the retry metric that tracks the number of
// retries sent to the server.
RequestRetry RetryMetric = noopRetry{}
// TransportCacheEntries is the metric that tracks the number of entries in the
// internal transport cache.
TransportCacheEntries TransportCacheMetric = noopTransportCache{}
// TransportCreateCalls is the metric that counts the number of times a new transport
// is created
TransportCreateCalls TransportCreateCallsMetric = noopTransportCreateCalls{}
)
// RegisterOpts contains all the metrics to register. Metrics may be nil.
@@ -83,11 +115,15 @@ type RegisterOpts struct {
ClientCertExpiry ExpiryMetric
ClientCertRotationAge DurationMetric
RequestLatency LatencyMetric
ResolverLatency ResolverLatencyMetric
RequestSize SizeMetric
ResponseSize SizeMetric
RateLimiterLatency LatencyMetric
RequestResult ResultMetric
ExecPluginCalls CallsMetric
RequestRetry RetryMetric
TransportCacheEntries TransportCacheMetric
TransportCreateCalls TransportCreateCallsMetric
}
// Register registers metrics for the rest client to use. This can
@@ -103,6 +139,9 @@ func Register(opts RegisterOpts) {
if opts.RequestLatency != nil {
RequestLatency = opts.RequestLatency
}
if opts.ResolverLatency != nil {
ResolverLatency = opts.ResolverLatency
}
if opts.RequestSize != nil {
RequestSize = opts.RequestSize
}
@@ -118,6 +157,15 @@ func Register(opts RegisterOpts) {
if opts.ExecPluginCalls != nil {
ExecPluginCalls = opts.ExecPluginCalls
}
if opts.RequestRetry != nil {
RequestRetry = opts.RequestRetry
}
if opts.TransportCacheEntries != nil {
TransportCacheEntries = opts.TransportCacheEntries
}
if opts.TransportCreateCalls != nil {
TransportCreateCalls = opts.TransportCreateCalls
}
})
}
@@ -133,6 +181,11 @@ type noopLatency struct{}
func (noopLatency) Observe(context.Context, string, url.URL, time.Duration) {}
type noopResolverLatency struct{}
func (n noopResolverLatency) Observe(ctx context.Context, host string, latency time.Duration) {
}
type noopSize struct{}
func (noopSize) Observe(context.Context, string, string, float64) {}
@@ -144,3 +197,15 @@ func (noopResult) Increment(context.Context, string, string, string) {}
type noopCalls struct{}
func (noopCalls) Increment(int, string) {}
type noopRetry struct{}
func (noopRetry) IncrementRetry(context.Context, string, string, string) {}
type noopTransportCache struct{}
func (noopTransportCache) Observe(int) {}
type noopTransportCreateCalls struct{}
func (noopTransportCreateCalls) Increment(string) {}

View File

@@ -27,6 +27,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/metrics"
)
// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
@@ -80,11 +81,16 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
defer metrics.TransportCacheEntries.Observe(len(c.transports))
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
metrics.TransportCreateCalls.Increment("hit")
return t, nil
}
metrics.TransportCreateCalls.Increment("miss")
} else {
metrics.TransportCreateCalls.Increment("uncacheable")
}
// Get the TLS options for this client config

View File

@@ -25,6 +25,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math"
"math/big"
"net"
"os"
@@ -44,6 +45,7 @@ type Config struct {
Organization []string
AltNames AltNames
Usages []x509.ExtKeyUsage
NotBefore time.Time
}
// AltNames contains the domain names and IP addresses that will be added
@@ -57,14 +59,24 @@ type AltNames struct {
// NewSelfSignedCACert creates a CA certificate
func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, error) {
now := time.Now()
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
notBefore := now.UTC()
if !cfg.NotBefore.IsZero() {
notBefore = cfg.NotBefore.UTC()
}
tmpl := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
DNSNames: []string{cfg.CommonName},
NotBefore: now.UTC(),
NotBefore: notBefore,
NotAfter: now.Add(duration365d * 10).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
@@ -116,9 +128,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
@@ -144,9 +161,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err = cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
template := x509.Certificate{
SerialNumber: big.NewInt(2),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},
@@ -191,7 +213,7 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err := os.WriteFile(certFixturePath, certBuffer.Bytes(), 0644); err != nil {
return nil, nil, fmt.Errorf("failed to write cert fixture to %s: %v", certFixturePath, err)
}
if err := os.WriteFile(keyFixturePath, keyBuffer.Bytes(), 0644); err != nil {
if err := os.WriteFile(keyFixturePath, keyBuffer.Bytes(), 0600); err != nil {
return nil, nil, fmt.Errorf("failed to write key fixture to %s: %v", certFixturePath, err)
}
}

View File

@@ -33,38 +33,81 @@ type DelayingInterface interface {
AddAfter(item interface{}, duration time.Duration)
}
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type DelayingQueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock optionally allows injecting a real or fake clock for testing purposes.
Clock clock.WithTicker
// Queue optionally allows injecting custom queue Interface instead of the default one.
Queue Interface
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedDelayingQueue instead.
// NewDelayingQueueWithConfig instead and specify a name.
func NewDelayingQueue() DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}
// NewDelayingQueueWithConfig constructs a new workqueue with options to
// customize different properties.
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.Queue == nil {
config.Queue = NewWithConfig(QueueConfig{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
}
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
// inject custom queue Interface instead of the default one
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, q, name)
return NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: name,
Queue: q,
})
}
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue(name string) DelayingInterface {
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
}
// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes
// with ability to inject real or fake clock for testing purposes.
// Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
return newDelayingQueue(clock, NewNamed(name), name)
return NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: name,
Clock: clock,
})
}
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
metrics: newRetryMetrics(name, provider),
}
go ret.waitingLoop()

View File

@@ -244,13 +244,18 @@ func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) qu
}
}
func newRetryMetrics(name string) retryMetrics {
func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
var ret *defaultRetryMetrics
if len(name) == 0 {
return ret
}
if provider == nil {
provider = globalMetricsFactory.metricsProvider
}
return &defaultRetryMetrics{
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
retries: provider.NewRetriesMetric(name),
}
}

View File

@@ -33,17 +33,60 @@ type Interface interface {
ShuttingDown() bool
}
// New constructs a new work queue (see the package comment).
func New() *Type {
return NewNamed("")
// QueueConfig specifies optional configurations to customize an Interface.
type QueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock ability to inject real or fake clock for testing purposes.
Clock clock.WithTicker
}
// New constructs a new work queue (see the package comment).
func New() *Type {
return NewWithConfig(QueueConfig{
Name: "",
})
}
// NewWithConfig constructs a new workqueue with ability to
// customize different properties.
func NewWithConfig(config QueueConfig) *Type {
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}
// NewNamed creates a new named queue.
// Deprecated: Use NewWithConfig instead.
func NewNamed(name string) *Type {
rc := clock.RealClock{}
return NewWithConfig(QueueConfig{
Name: name,
})
}
// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
var metricsFactory *queueMetricsFactory
if config.MetricsProvider != nil {
metricsFactory = &queueMetricsFactory{
metricsProvider: config.MetricsProvider,
}
} else {
metricsFactory = &globalMetricsFactory
}
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
return newQueue(
rc,
globalMetricsFactory.newQueueMetrics(name, rc),
defaultUnfinishedWorkUpdatePeriod,
config.Clock,
metricsFactory.newQueueMetrics(config.Name, config.Clock),
updatePeriod,
)
}

View File

@@ -16,6 +16,8 @@ limitations under the License.
package workqueue
import "k8s.io/utils/clock"
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
@@ -32,29 +34,68 @@ type RateLimitingInterface interface {
NumRequeues(item interface{}) int
}
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
type RateLimitingQueueConfig struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
// MetricsProvider optionally allows specifying a metrics provider to use for the queue
// instead of the global provider.
MetricsProvider MetricsProvider
// Clock optionally allows injecting a real or fake clock for testing purposes.
Clock clock.WithTicker
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
DelayingQueue DelayingInterface
}
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever.
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewNamedRateLimitingQueue instead.
// NewRateLimitingQueueWithConfig instead and specify a name.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
}
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
// with options to customize different properties.
// Remember to call Forget! If you don't, you may end up tracking failures forever.
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.DelayingQueue == nil {
config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return &rateLimitingType{
DelayingInterface: NewDelayingQueue(),
DelayingInterface: config.DelayingQueue,
rateLimiter: rateLimiter,
}
}
// NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability.
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: NewNamedDelayingQueue(name),
rateLimiter: rateLimiter,
}
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
Name: name,
})
}
// NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability
// with the option to inject a custom delaying queue instead of the default one.
// Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface {
return &rateLimitingType{
DelayingInterface: di,
rateLimiter: rateLimiter,
}
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{
DelayingQueue: di,
})
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing