vendor: update containerd/cri 61b7af7564
full diff:92cb4ed978..61b7af7564This adds new dependency github.com/fsnotify/fsnotify since4ce334aa49Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
This commit is contained in:
10
vendor/k8s.io/client-go/go.mod
generated
vendored
10
vendor/k8s.io/client-go/go.mod
generated
vendored
@@ -28,16 +28,16 @@ require (
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
k8s.io/api v0.18.0-beta.1
|
||||
k8s.io/apimachinery v0.18.0-beta.1
|
||||
k8s.io/api v0.18.0
|
||||
k8s.io/apimachinery v0.18.0
|
||||
k8s.io/klog v1.0.0
|
||||
k8s.io/utils v0.0.0-20200117235808-5f6fbceb4c31
|
||||
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
)
|
||||
|
||||
replace (
|
||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
|
||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
|
||||
k8s.io/api => k8s.io/api v0.18.0-beta.1
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.18.0-beta.1
|
||||
k8s.io/api => k8s.io/api v0.18.0
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.18.0
|
||||
)
|
||||
|
||||
69
vendor/k8s.io/client-go/rest/request.go
generated
vendored
69
vendor/k8s.io/client-go/rest/request.go
generated
vendored
@@ -39,6 +39,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
||||
utilclock "k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclientwatch "k8s.io/client-go/rest/watch"
|
||||
@@ -556,37 +557,69 @@ func (r *Request) tryThrottle(ctx context.Context) error {
|
||||
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
|
||||
}
|
||||
if latency > extraLongThrottleLatency {
|
||||
globalThrottledLogger.Log(2, fmt.Sprintf("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String()))
|
||||
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
|
||||
// but we use a throttled logger to prevent spamming.
|
||||
globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
|
||||
}
|
||||
metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type throttledLogger struct {
|
||||
logTimeLock sync.RWMutex
|
||||
lastLogTime time.Time
|
||||
type throttleSettings struct {
|
||||
logLevel klog.Level
|
||||
minLogInterval time.Duration
|
||||
|
||||
lastLogTime time.Time
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
type throttledLogger struct {
|
||||
clock utilclock.PassiveClock
|
||||
settings []*throttleSettings
|
||||
}
|
||||
|
||||
var globalThrottledLogger = &throttledLogger{
|
||||
minLogInterval: 1 * time.Second,
|
||||
clock: utilclock.RealClock{},
|
||||
settings: []*throttleSettings{
|
||||
{
|
||||
logLevel: 2,
|
||||
minLogInterval: 1 * time.Second,
|
||||
}, {
|
||||
logLevel: 0,
|
||||
minLogInterval: 10 * time.Second,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func (b *throttledLogger) Log(level klog.Level, message string) {
|
||||
if bool(klog.V(level)) {
|
||||
if func() bool {
|
||||
b.logTimeLock.RLock()
|
||||
defer b.logTimeLock.RUnlock()
|
||||
return time.Since(b.lastLogTime) > b.minLogInterval
|
||||
}() {
|
||||
b.logTimeLock.Lock()
|
||||
defer b.logTimeLock.Unlock()
|
||||
if time.Since(b.lastLogTime) > b.minLogInterval {
|
||||
klog.V(level).Info(message)
|
||||
b.lastLogTime = time.Now()
|
||||
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
|
||||
for _, setting := range b.settings {
|
||||
if bool(klog.V(setting.logLevel)) {
|
||||
// Return early without write locking if possible.
|
||||
if func() bool {
|
||||
setting.lock.RLock()
|
||||
defer setting.lock.RUnlock()
|
||||
return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
|
||||
}() {
|
||||
setting.lock.Lock()
|
||||
defer setting.lock.Unlock()
|
||||
if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
|
||||
setting.lastLogTime = b.clock.Now()
|
||||
return setting.logLevel, true
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
|
||||
// Infof will write a log message at each logLevel specified by the reciever's throttleSettings
|
||||
// as long as it hasn't written a log message more recently than minLogInterval.
|
||||
func (b *throttledLogger) Infof(message string, args ...interface{}) {
|
||||
if logLevel, ok := b.attemptToLog(); ok {
|
||||
klog.V(logLevel).Infof(message, args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Watch attempts to begin watching the requested location.
|
||||
@@ -835,7 +868,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
|
||||
return err
|
||||
}
|
||||
// For connection errors and apiserver shutdown errors retry.
|
||||
if net.IsConnectionReset(err) {
|
||||
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
|
||||
// For the purpose of retry, we set the artificial "retry-after" response.
|
||||
// TODO: Should we clean the original response if it exists?
|
||||
resp = &http.Response{
|
||||
|
||||
3
vendor/k8s.io/client-go/tools/clientcmd/api/types.go
generated
vendored
3
vendor/k8s.io/client-go/tools/clientcmd/api/types.go
generated
vendored
@@ -70,6 +70,9 @@ type Cluster struct {
|
||||
LocationOfOrigin string
|
||||
// Server is the address of the kubernetes cluster (https://hostname:port).
|
||||
Server string `json:"server"`
|
||||
// TLSServerName is used to check server certificate. If TLSServerName is empty, the hostname used to contact the server is used.
|
||||
// +optional
|
||||
TLSServerName string `json:"tls-server-name,omitempty"`
|
||||
// InsecureSkipTLSVerify skips the validity check for the server's certificate. This will make your HTTPS connections insecure.
|
||||
// +optional
|
||||
InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify,omitempty"`
|
||||
|
||||
6
vendor/k8s.io/client-go/tools/metrics/metrics.go
generated
vendored
6
vendor/k8s.io/client-go/tools/metrics/metrics.go
generated
vendored
@@ -53,6 +53,8 @@ var (
|
||||
ClientCertRotationAge DurationMetric = noopDuration{}
|
||||
// RequestLatency is the latency metric that rest clients will update.
|
||||
RequestLatency LatencyMetric = noopLatency{}
|
||||
// RateLimiterLatency is the client side rate limiter latency metric.
|
||||
RateLimiterLatency LatencyMetric = noopLatency{}
|
||||
// RequestResult is the result metric that rest clients will update.
|
||||
RequestResult ResultMetric = noopResult{}
|
||||
)
|
||||
@@ -62,6 +64,7 @@ type RegisterOpts struct {
|
||||
ClientCertExpiry ExpiryMetric
|
||||
ClientCertRotationAge DurationMetric
|
||||
RequestLatency LatencyMetric
|
||||
RateLimiterLatency LatencyMetric
|
||||
RequestResult ResultMetric
|
||||
}
|
||||
|
||||
@@ -78,6 +81,9 @@ func Register(opts RegisterOpts) {
|
||||
if opts.RequestLatency != nil {
|
||||
RequestLatency = opts.RequestLatency
|
||||
}
|
||||
if opts.RateLimiterLatency != nil {
|
||||
RateLimiterLatency = opts.RateLimiterLatency
|
||||
}
|
||||
if opts.RequestResult != nil {
|
||||
RequestResult = opts.RequestResult
|
||||
}
|
||||
|
||||
29
vendor/k8s.io/client-go/transport/cache.go
generated
vendored
29
vendor/k8s.io/client-go/transport/cache.go
generated
vendored
@@ -25,6 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
|
||||
@@ -44,6 +45,8 @@ type tlsCacheKey struct {
|
||||
caData string
|
||||
certData string
|
||||
keyData string
|
||||
certFile string
|
||||
keyFile string
|
||||
getCert string
|
||||
serverName string
|
||||
nextProtos string
|
||||
@@ -91,6 +94,16 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext
|
||||
}
|
||||
|
||||
// If we use are reloading files, we need to handle certificate rotation properly
|
||||
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
|
||||
if config.TLS.ReloadTLSFiles {
|
||||
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
|
||||
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
|
||||
dial = dynamicCertDialer.connDialer.DialContext
|
||||
go dynamicCertDialer.Run(wait.NeverStop)
|
||||
}
|
||||
|
||||
// Cache a single transport for these options
|
||||
c.transports[key] = utilnet.SetTransportDefaults(&http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
@@ -109,15 +122,23 @@ func tlsConfigKey(c *Config) (tlsCacheKey, error) {
|
||||
if err := loadTLSFiles(c); err != nil {
|
||||
return tlsCacheKey{}, err
|
||||
}
|
||||
return tlsCacheKey{
|
||||
k := tlsCacheKey{
|
||||
insecure: c.TLS.Insecure,
|
||||
caData: string(c.TLS.CAData),
|
||||
certData: string(c.TLS.CertData),
|
||||
keyData: string(c.TLS.KeyData),
|
||||
getCert: fmt.Sprintf("%p", c.TLS.GetCert),
|
||||
serverName: c.TLS.ServerName,
|
||||
nextProtos: strings.Join(c.TLS.NextProtos, ","),
|
||||
dial: fmt.Sprintf("%p", c.Dial),
|
||||
disableCompression: c.DisableCompression,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if c.TLS.ReloadTLSFiles {
|
||||
k.certFile = c.TLS.CertFile
|
||||
k.keyFile = c.TLS.KeyFile
|
||||
} else {
|
||||
k.certData = string(c.TLS.CertData)
|
||||
k.keyData = string(c.TLS.KeyData)
|
||||
}
|
||||
|
||||
return k, nil
|
||||
}
|
||||
|
||||
176
vendor/k8s.io/client-go/transport/cert_rotation.go
generated
vendored
Normal file
176
vendor/k8s.io/client-go/transport/cert_rotation.go
generated
vendored
Normal file
@@ -0,0 +1,176 @@
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/connrotation"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const workItemKey = "key"
|
||||
|
||||
// CertCallbackRefreshDuration is exposed so that integration tests can crank up the reload speed.
|
||||
var CertCallbackRefreshDuration = 5 * time.Minute
|
||||
|
||||
type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
|
||||
type dynamicClientCert struct {
|
||||
clientCert *tls.Certificate
|
||||
certMtx sync.RWMutex
|
||||
|
||||
reload reloadFunc
|
||||
connDialer *connrotation.Dialer
|
||||
|
||||
// queue only ever has one item, but it has nice error handling backoff/retry semantics
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func certRotatingDialer(reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert {
|
||||
d := &dynamicClientCert{
|
||||
reload: reload,
|
||||
connDialer: connrotation.NewDialer(connrotation.DialFunc(dial)),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DynamicClientCertificate"),
|
||||
}
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
// loadClientCert calls the callback and rotates connections if needed
|
||||
func (c *dynamicClientCert) loadClientCert() (*tls.Certificate, error) {
|
||||
cert, err := c.reload(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check to see if we have a change. If the values are the same, do nothing.
|
||||
c.certMtx.RLock()
|
||||
haveCert := c.clientCert != nil
|
||||
if certsEqual(c.clientCert, cert) {
|
||||
c.certMtx.RUnlock()
|
||||
return c.clientCert, nil
|
||||
}
|
||||
c.certMtx.RUnlock()
|
||||
|
||||
c.certMtx.Lock()
|
||||
c.clientCert = cert
|
||||
c.certMtx.Unlock()
|
||||
|
||||
// The first certificate requested is not a rotation that is worth closing connections for
|
||||
if !haveCert {
|
||||
return cert, nil
|
||||
}
|
||||
|
||||
klog.V(1).Infof("certificate rotation detected, shutting down client connections to start using new credentials")
|
||||
c.connDialer.CloseAll()
|
||||
|
||||
return cert, nil
|
||||
}
|
||||
|
||||
// certsEqual compares tls Certificates, ignoring the Leaf which may get filled in dynamically
|
||||
func certsEqual(left, right *tls.Certificate) bool {
|
||||
if left == nil || right == nil {
|
||||
return left == right
|
||||
}
|
||||
|
||||
if !byteMatrixEqual(left.Certificate, right.Certificate) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(left.PrivateKey, right.PrivateKey) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !byteMatrixEqual(left.SignedCertificateTimestamps, right.SignedCertificateTimestamps) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !bytes.Equal(left.OCSPStaple, right.OCSPStaple) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func byteMatrixEqual(left, right [][]byte) bool {
|
||||
if len(left) != len(right) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := range left {
|
||||
if !bytes.Equal(left[i], right[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// run starts the controller and blocks until stopCh is closed.
|
||||
func (c *dynamicClientCert) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
klog.V(3).Infof("Starting client certificate rotation controller")
|
||||
defer klog.V(3).Infof("Shutting down client certificate rotation controller")
|
||||
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
|
||||
go wait.PollImmediateUntil(CertCallbackRefreshDuration, func() (bool, error) {
|
||||
c.queue.Add(workItemKey)
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *dynamicClientCert) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *dynamicClientCert) processNextWorkItem() bool {
|
||||
dsKey, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(dsKey)
|
||||
|
||||
_, err := c.loadClientCert()
|
||||
if err == nil {
|
||||
c.queue.Forget(dsKey)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
|
||||
c.queue.AddRateLimited(dsKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *dynamicClientCert) GetClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
return c.loadClientCert()
|
||||
}
|
||||
7
vendor/k8s.io/client-go/transport/config.go
generated
vendored
7
vendor/k8s.io/client-go/transport/config.go
generated
vendored
@@ -115,9 +115,10 @@ func (c *Config) Wrap(fn WrapperFunc) {
|
||||
|
||||
// TLSConfig holds the information needed to set up a TLS transport.
|
||||
type TLSConfig struct {
|
||||
CAFile string // Path of the PEM-encoded server trusted root certificates.
|
||||
CertFile string // Path of the PEM-encoded client certificate.
|
||||
KeyFile string // Path of the PEM-encoded client key.
|
||||
CAFile string // Path of the PEM-encoded server trusted root certificates.
|
||||
CertFile string // Path of the PEM-encoded client certificate.
|
||||
KeyFile string // Path of the PEM-encoded client key.
|
||||
ReloadTLSFiles bool // Set to indicate that the original config provided files, and that they should be reloaded
|
||||
|
||||
Insecure bool // Server should be accessed without verifying the certificate. For testing only.
|
||||
ServerName string // Override for the server name passed to the server for SNI and used to verify certificates.
|
||||
|
||||
60
vendor/k8s.io/client-go/transport/transport.go
generated
vendored
60
vendor/k8s.io/client-go/transport/transport.go
generated
vendored
@@ -23,6 +23,8 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/klog"
|
||||
@@ -81,7 +83,8 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
}
|
||||
|
||||
var staticCert *tls.Certificate
|
||||
if c.HasCertAuth() {
|
||||
// Treat cert as static if either key or cert was data, not a file
|
||||
if c.HasCertAuth() && !c.TLS.ReloadTLSFiles {
|
||||
// If key/cert were provided, verify them before setting up
|
||||
// tlsConfig.GetClientCertificate.
|
||||
cert, err := tls.X509KeyPair(c.TLS.CertData, c.TLS.KeyData)
|
||||
@@ -91,6 +94,11 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
staticCert = &cert
|
||||
}
|
||||
|
||||
var dynamicCertLoader func() (*tls.Certificate, error)
|
||||
if c.TLS.ReloadTLSFiles {
|
||||
dynamicCertLoader = cachingCertificateLoader(c.TLS.CertFile, c.TLS.KeyFile)
|
||||
}
|
||||
|
||||
if c.HasCertAuth() || c.HasCertCallback() {
|
||||
tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
// Note: static key/cert data always take precedence over cert
|
||||
@@ -98,6 +106,10 @@ func TLSConfigFor(c *Config) (*tls.Config, error) {
|
||||
if staticCert != nil {
|
||||
return staticCert, nil
|
||||
}
|
||||
// key/cert files lead to ReloadTLSFiles being set - takes precedence over cert callback
|
||||
if dynamicCertLoader != nil {
|
||||
return dynamicCertLoader()
|
||||
}
|
||||
if c.HasCertCallback() {
|
||||
cert, err := c.TLS.GetCert()
|
||||
if err != nil {
|
||||
@@ -129,6 +141,11 @@ func loadTLSFiles(c *Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check that we are purely loading from files
|
||||
if len(c.TLS.CertFile) > 0 && len(c.TLS.CertData) == 0 && len(c.TLS.KeyFile) > 0 && len(c.TLS.KeyData) == 0 {
|
||||
c.TLS.ReloadTLSFiles = true
|
||||
}
|
||||
|
||||
c.TLS.CertData, err = dataFromSliceOrFile(c.TLS.CertData, c.TLS.CertFile)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -243,3 +260,44 @@ func tryCancelRequest(rt http.RoundTripper, req *http.Request) {
|
||||
klog.Warningf("Unable to cancel request for %T", rt)
|
||||
}
|
||||
}
|
||||
|
||||
type certificateCacheEntry struct {
|
||||
cert *tls.Certificate
|
||||
err error
|
||||
birth time.Time
|
||||
}
|
||||
|
||||
// isStale returns true when this cache entry is too old to be usable
|
||||
func (c *certificateCacheEntry) isStale() bool {
|
||||
return time.Now().Sub(c.birth) > time.Second
|
||||
}
|
||||
|
||||
func newCertificateCacheEntry(certFile, keyFile string) certificateCacheEntry {
|
||||
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||
return certificateCacheEntry{cert: &cert, err: err, birth: time.Now()}
|
||||
}
|
||||
|
||||
// cachingCertificateLoader ensures that we don't hammer the filesystem when opening many connections
|
||||
// the underlying cert files are read at most once every second
|
||||
func cachingCertificateLoader(certFile, keyFile string) func() (*tls.Certificate, error) {
|
||||
current := newCertificateCacheEntry(certFile, keyFile)
|
||||
var currentMtx sync.RWMutex
|
||||
|
||||
return func() (*tls.Certificate, error) {
|
||||
currentMtx.RLock()
|
||||
if current.isStale() {
|
||||
currentMtx.RUnlock()
|
||||
|
||||
currentMtx.Lock()
|
||||
defer currentMtx.Unlock()
|
||||
|
||||
if current.isStale() {
|
||||
current = newCertificateCacheEntry(certFile, keyFile)
|
||||
}
|
||||
} else {
|
||||
defer currentMtx.RUnlock()
|
||||
}
|
||||
|
||||
return current.cert, current.err
|
||||
}
|
||||
}
|
||||
|
||||
259
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
generated
vendored
Normal file
259
vendor/k8s.io/client-go/util/workqueue/default_rate_limiters.go
generated
vendored
Normal file
@@ -0,0 +1,259 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type RateLimiter interface {
|
||||
// When gets an item and gets to decide how long that item should wait
|
||||
When(item interface{}) time.Duration
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
|
||||
// or for success, we'll stop tracking it
|
||||
Forget(item interface{})
|
||||
// NumRequeues returns back how many failures the item has had
|
||||
NumRequeues(item interface{}) int
|
||||
}
|
||||
|
||||
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
|
||||
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
|
||||
func DefaultControllerRateLimiter() RateLimiter {
|
||||
return NewMaxOfRateLimiter(
|
||||
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
|
||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||
)
|
||||
}
|
||||
|
||||
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
|
||||
type BucketRateLimiter struct {
|
||||
*rate.Limiter
|
||||
}
|
||||
|
||||
var _ RateLimiter = &BucketRateLimiter{}
|
||||
|
||||
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
|
||||
return r.Limiter.Reserve().Delay()
|
||||
}
|
||||
|
||||
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *BucketRateLimiter) Forget(item interface{}) {
|
||||
}
|
||||
|
||||
// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter.
|
||||
// Each key is using a separate limiter.
|
||||
type ItemBucketRateLimiter struct {
|
||||
r rate.Limit
|
||||
burst int
|
||||
|
||||
limitersLock sync.Mutex
|
||||
limiters map[interface{}]*rate.Limiter
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemBucketRateLimiter{}
|
||||
|
||||
// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance.
|
||||
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
|
||||
return &ItemBucketRateLimiter{
|
||||
r: r,
|
||||
burst: burst,
|
||||
limiters: make(map[interface{}]*rate.Limiter),
|
||||
}
|
||||
}
|
||||
|
||||
// When returns a time.Duration which we need to wait before item is processed.
|
||||
func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration {
|
||||
r.limitersLock.Lock()
|
||||
defer r.limitersLock.Unlock()
|
||||
|
||||
limiter, ok := r.limiters[item]
|
||||
if !ok {
|
||||
limiter = rate.NewLimiter(r.r, r.burst)
|
||||
r.limiters[item] = limiter
|
||||
}
|
||||
|
||||
return limiter.Reserve().Delay()
|
||||
}
|
||||
|
||||
// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter).
|
||||
func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Forget removes item from the internal state.
|
||||
func (r *ItemBucketRateLimiter) Forget(item interface{}) {
|
||||
r.limitersLock.Lock()
|
||||
defer r.limitersLock.Unlock()
|
||||
|
||||
delete(r.limiters, item)
|
||||
}
|
||||
|
||||
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
|
||||
// dealing with max failures and expiration are up to the caller
|
||||
type ItemExponentialFailureRateLimiter struct {
|
||||
failuresLock sync.Mutex
|
||||
failures map[interface{}]int
|
||||
|
||||
baseDelay time.Duration
|
||||
maxDelay time.Duration
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
|
||||
|
||||
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
|
||||
return &ItemExponentialFailureRateLimiter{
|
||||
failures: map[interface{}]int{},
|
||||
baseDelay: baseDelay,
|
||||
maxDelay: maxDelay,
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultItemBasedRateLimiter() RateLimiter {
|
||||
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
|
||||
}
|
||||
|
||||
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
exp := r.failures[item]
|
||||
r.failures[item] = r.failures[item] + 1
|
||||
|
||||
// The backoff is capped such that 'calculated' value never overflows.
|
||||
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
|
||||
if backoff > math.MaxInt64 {
|
||||
return r.maxDelay
|
||||
}
|
||||
|
||||
calculated := time.Duration(backoff)
|
||||
if calculated > r.maxDelay {
|
||||
return r.maxDelay
|
||||
}
|
||||
|
||||
return calculated
|
||||
}
|
||||
|
||||
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
return r.failures[item]
|
||||
}
|
||||
|
||||
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
delete(r.failures, item)
|
||||
}
|
||||
|
||||
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
|
||||
type ItemFastSlowRateLimiter struct {
|
||||
failuresLock sync.Mutex
|
||||
failures map[interface{}]int
|
||||
|
||||
maxFastAttempts int
|
||||
fastDelay time.Duration
|
||||
slowDelay time.Duration
|
||||
}
|
||||
|
||||
var _ RateLimiter = &ItemFastSlowRateLimiter{}
|
||||
|
||||
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
|
||||
return &ItemFastSlowRateLimiter{
|
||||
failures: map[interface{}]int{},
|
||||
fastDelay: fastDelay,
|
||||
slowDelay: slowDelay,
|
||||
maxFastAttempts: maxFastAttempts,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
r.failures[item] = r.failures[item] + 1
|
||||
|
||||
if r.failures[item] <= r.maxFastAttempts {
|
||||
return r.fastDelay
|
||||
}
|
||||
|
||||
return r.slowDelay
|
||||
}
|
||||
|
||||
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
return r.failures[item]
|
||||
}
|
||||
|
||||
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
|
||||
r.failuresLock.Lock()
|
||||
defer r.failuresLock.Unlock()
|
||||
|
||||
delete(r.failures, item)
|
||||
}
|
||||
|
||||
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
|
||||
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
|
||||
// were separately delayed a longer time.
|
||||
type MaxOfRateLimiter struct {
|
||||
limiters []RateLimiter
|
||||
}
|
||||
|
||||
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
|
||||
ret := time.Duration(0)
|
||||
for _, limiter := range r.limiters {
|
||||
curr := limiter.When(item)
|
||||
if curr > ret {
|
||||
ret = curr
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
|
||||
return &MaxOfRateLimiter{limiters: limiters}
|
||||
}
|
||||
|
||||
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
|
||||
ret := 0
|
||||
for _, limiter := range r.limiters {
|
||||
curr := limiter.NumRequeues(item)
|
||||
if curr > ret {
|
||||
ret = curr
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (r *MaxOfRateLimiter) Forget(item interface{}) {
|
||||
for _, limiter := range r.limiters {
|
||||
limiter.Forget(item)
|
||||
}
|
||||
}
|
||||
271
vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
generated
vendored
Normal file
271
vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
generated
vendored
Normal file
@@ -0,0 +1,271 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
|
||||
// requeue items after failures without ending up in a hot-loop.
|
||||
type DelayingInterface interface {
|
||||
Interface
|
||||
// AddAfter adds an item to the workqueue after the indicated duration has passed
|
||||
AddAfter(item interface{}, duration time.Duration)
|
||||
}
|
||||
|
||||
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
|
||||
func NewDelayingQueue() DelayingInterface {
|
||||
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
|
||||
}
|
||||
|
||||
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
|
||||
func NewNamedDelayingQueue(name string) DelayingInterface {
|
||||
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
|
||||
}
|
||||
|
||||
// NewDelayingQueueWithCustomClock constructs a new named workqueue
|
||||
// with ability to inject real or fake clock for testing purposes
|
||||
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
|
||||
ret := &delayingType{
|
||||
Interface: NewNamed(name),
|
||||
clock: clock,
|
||||
heartbeat: clock.NewTicker(maxWait),
|
||||
stopCh: make(chan struct{}),
|
||||
waitingForAddCh: make(chan *waitFor, 1000),
|
||||
metrics: newRetryMetrics(name),
|
||||
}
|
||||
|
||||
go ret.waitingLoop()
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// delayingType wraps an Interface and provides delayed re-enquing
|
||||
type delayingType struct {
|
||||
Interface
|
||||
|
||||
// clock tracks time for delayed firing
|
||||
clock clock.Clock
|
||||
|
||||
// stopCh lets us signal a shutdown to the waiting loop
|
||||
stopCh chan struct{}
|
||||
// stopOnce guarantees we only signal shutdown a single time
|
||||
stopOnce sync.Once
|
||||
|
||||
// heartbeat ensures we wait no more than maxWait before firing
|
||||
heartbeat clock.Ticker
|
||||
|
||||
// waitingForAddCh is a buffered channel that feeds waitingForAdd
|
||||
waitingForAddCh chan *waitFor
|
||||
|
||||
// metrics counts the number of retries
|
||||
metrics retryMetrics
|
||||
}
|
||||
|
||||
// waitFor holds the data to add and the time it should be added
|
||||
type waitFor struct {
|
||||
data t
|
||||
readyAt time.Time
|
||||
// index in the priority queue (heap)
|
||||
index int
|
||||
}
|
||||
|
||||
// waitForPriorityQueue implements a priority queue for waitFor items.
|
||||
//
|
||||
// waitForPriorityQueue implements heap.Interface. The item occurring next in
|
||||
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
|
||||
// Peek returns this minimum item at index 0. Pop returns the minimum item after
|
||||
// it has been removed from the queue and placed at index Len()-1 by
|
||||
// container/heap. Push adds an item at index Len(), and container/heap
|
||||
// percolates it into the correct location.
|
||||
type waitForPriorityQueue []*waitFor
|
||||
|
||||
func (pq waitForPriorityQueue) Len() int {
|
||||
return len(pq)
|
||||
}
|
||||
func (pq waitForPriorityQueue) Less(i, j int) bool {
|
||||
return pq[i].readyAt.Before(pq[j].readyAt)
|
||||
}
|
||||
func (pq waitForPriorityQueue) Swap(i, j int) {
|
||||
pq[i], pq[j] = pq[j], pq[i]
|
||||
pq[i].index = i
|
||||
pq[j].index = j
|
||||
}
|
||||
|
||||
// Push adds an item to the queue. Push should not be called directly; instead,
|
||||
// use `heap.Push`.
|
||||
func (pq *waitForPriorityQueue) Push(x interface{}) {
|
||||
n := len(*pq)
|
||||
item := x.(*waitFor)
|
||||
item.index = n
|
||||
*pq = append(*pq, item)
|
||||
}
|
||||
|
||||
// Pop removes an item from the queue. Pop should not be called directly;
|
||||
// instead, use `heap.Pop`.
|
||||
func (pq *waitForPriorityQueue) Pop() interface{} {
|
||||
n := len(*pq)
|
||||
item := (*pq)[n-1]
|
||||
item.index = -1
|
||||
*pq = (*pq)[0:(n - 1)]
|
||||
return item
|
||||
}
|
||||
|
||||
// Peek returns the item at the beginning of the queue, without removing the
|
||||
// item or otherwise mutating the queue. It is safe to call directly.
|
||||
func (pq waitForPriorityQueue) Peek() interface{} {
|
||||
return pq[0]
|
||||
}
|
||||
|
||||
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
|
||||
// on Get() will be true. This method may be invoked more than once.
|
||||
func (q *delayingType) ShutDown() {
|
||||
q.stopOnce.Do(func() {
|
||||
q.Interface.ShutDown()
|
||||
close(q.stopCh)
|
||||
q.heartbeat.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// AddAfter adds the given item to the work queue after the given delay
|
||||
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
|
||||
// don't add if we're already shutting down
|
||||
if q.ShuttingDown() {
|
||||
return
|
||||
}
|
||||
|
||||
q.metrics.retry()
|
||||
|
||||
// immediately add things with no delay
|
||||
if duration <= 0 {
|
||||
q.Add(item)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-q.stopCh:
|
||||
// unblock if ShutDown() is called
|
||||
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
|
||||
}
|
||||
}
|
||||
|
||||
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
|
||||
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
|
||||
// expired item sitting for more than 10 seconds.
|
||||
const maxWait = 10 * time.Second
|
||||
|
||||
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
|
||||
func (q *delayingType) waitingLoop() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
// Make a placeholder channel to use when there are no items in our list
|
||||
never := make(<-chan time.Time)
|
||||
|
||||
// Make a timer that expires when the item at the head of the waiting queue is ready
|
||||
var nextReadyAtTimer clock.Timer
|
||||
|
||||
waitingForQueue := &waitForPriorityQueue{}
|
||||
heap.Init(waitingForQueue)
|
||||
|
||||
waitingEntryByData := map[t]*waitFor{}
|
||||
|
||||
for {
|
||||
if q.Interface.ShuttingDown() {
|
||||
return
|
||||
}
|
||||
|
||||
now := q.clock.Now()
|
||||
|
||||
// Add ready entries
|
||||
for waitingForQueue.Len() > 0 {
|
||||
entry := waitingForQueue.Peek().(*waitFor)
|
||||
if entry.readyAt.After(now) {
|
||||
break
|
||||
}
|
||||
|
||||
entry = heap.Pop(waitingForQueue).(*waitFor)
|
||||
q.Add(entry.data)
|
||||
delete(waitingEntryByData, entry.data)
|
||||
}
|
||||
|
||||
// Set up a wait for the first item's readyAt (if one exists)
|
||||
nextReadyAt := never
|
||||
if waitingForQueue.Len() > 0 {
|
||||
if nextReadyAtTimer != nil {
|
||||
nextReadyAtTimer.Stop()
|
||||
}
|
||||
entry := waitingForQueue.Peek().(*waitFor)
|
||||
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
|
||||
nextReadyAt = nextReadyAtTimer.C()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-q.stopCh:
|
||||
return
|
||||
|
||||
case <-q.heartbeat.C():
|
||||
// continue the loop, which will add ready items
|
||||
|
||||
case <-nextReadyAt:
|
||||
// continue the loop, which will add ready items
|
||||
|
||||
case waitEntry := <-q.waitingForAddCh:
|
||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||
} else {
|
||||
q.Add(waitEntry.data)
|
||||
}
|
||||
|
||||
drained := false
|
||||
for !drained {
|
||||
select {
|
||||
case waitEntry := <-q.waitingForAddCh:
|
||||
if waitEntry.readyAt.After(q.clock.Now()) {
|
||||
insert(waitingForQueue, waitingEntryByData, waitEntry)
|
||||
} else {
|
||||
q.Add(waitEntry.data)
|
||||
}
|
||||
default:
|
||||
drained = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
|
||||
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
|
||||
// if the entry already exists, update the time only if it would cause the item to be queued sooner
|
||||
existing, exists := knownEntries[entry.data]
|
||||
if exists {
|
||||
if existing.readyAt.After(entry.readyAt) {
|
||||
existing.readyAt = entry.readyAt
|
||||
heap.Fix(q, existing.index)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
heap.Push(q, entry)
|
||||
knownEntries[entry.data] = entry
|
||||
}
|
||||
26
vendor/k8s.io/client-go/util/workqueue/doc.go
generated
vendored
Normal file
26
vendor/k8s.io/client-go/util/workqueue/doc.go
generated
vendored
Normal file
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package workqueue provides a simple queue that supports the following
|
||||
// features:
|
||||
// * Fair: items processed in the order in which they are added.
|
||||
// * Stingy: a single item will not be processed multiple times concurrently,
|
||||
// and if an item is added multiple times before it can be processed, it
|
||||
// will only be processed once.
|
||||
// * Multiple consumers and producers. In particular, it is allowed for an
|
||||
// item to be reenqueued while it is being processed.
|
||||
// * Shutdown notifications.
|
||||
package workqueue // import "k8s.io/client-go/util/workqueue"
|
||||
261
vendor/k8s.io/client-go/util/workqueue/metrics.go
generated
vendored
Normal file
261
vendor/k8s.io/client-go/util/workqueue/metrics.go
generated
vendored
Normal file
@@ -0,0 +1,261 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
// This file provides abstractions for setting the provider (e.g., prometheus)
|
||||
// of metrics.
|
||||
|
||||
type queueMetrics interface {
|
||||
add(item t)
|
||||
get(item t)
|
||||
done(item t)
|
||||
updateUnfinishedWork()
|
||||
}
|
||||
|
||||
// GaugeMetric represents a single numerical value that can arbitrarily go up
|
||||
// and down.
|
||||
type GaugeMetric interface {
|
||||
Inc()
|
||||
Dec()
|
||||
}
|
||||
|
||||
// SettableGaugeMetric represents a single numerical value that can arbitrarily go up
|
||||
// and down. (Separate from GaugeMetric to preserve backwards compatibility.)
|
||||
type SettableGaugeMetric interface {
|
||||
Set(float64)
|
||||
}
|
||||
|
||||
// CounterMetric represents a single numerical value that only ever
|
||||
// goes up.
|
||||
type CounterMetric interface {
|
||||
Inc()
|
||||
}
|
||||
|
||||
// SummaryMetric captures individual observations.
|
||||
type SummaryMetric interface {
|
||||
Observe(float64)
|
||||
}
|
||||
|
||||
// HistogramMetric counts individual observations.
|
||||
type HistogramMetric interface {
|
||||
Observe(float64)
|
||||
}
|
||||
|
||||
type noopMetric struct{}
|
||||
|
||||
func (noopMetric) Inc() {}
|
||||
func (noopMetric) Dec() {}
|
||||
func (noopMetric) Set(float64) {}
|
||||
func (noopMetric) Observe(float64) {}
|
||||
|
||||
// defaultQueueMetrics expects the caller to lock before setting any metrics.
|
||||
type defaultQueueMetrics struct {
|
||||
clock clock.Clock
|
||||
|
||||
// current depth of a workqueue
|
||||
depth GaugeMetric
|
||||
// total number of adds handled by a workqueue
|
||||
adds CounterMetric
|
||||
// how long an item stays in a workqueue
|
||||
latency HistogramMetric
|
||||
// how long processing an item from a workqueue takes
|
||||
workDuration HistogramMetric
|
||||
addTimes map[t]time.Time
|
||||
processingStartTimes map[t]time.Time
|
||||
|
||||
// how long have current threads been working?
|
||||
unfinishedWorkSeconds SettableGaugeMetric
|
||||
longestRunningProcessor SettableGaugeMetric
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics) add(item t) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.adds.Inc()
|
||||
m.depth.Inc()
|
||||
if _, exists := m.addTimes[item]; !exists {
|
||||
m.addTimes[item] = m.clock.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics) get(item t) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.depth.Dec()
|
||||
m.processingStartTimes[item] = m.clock.Now()
|
||||
if startTime, exists := m.addTimes[item]; exists {
|
||||
m.latency.Observe(m.sinceInSeconds(startTime))
|
||||
delete(m.addTimes, item)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics) done(item t) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if startTime, exists := m.processingStartTimes[item]; exists {
|
||||
m.workDuration.Observe(m.sinceInSeconds(startTime))
|
||||
delete(m.processingStartTimes, item)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultQueueMetrics) updateUnfinishedWork() {
|
||||
// Note that a summary metric would be better for this, but prometheus
|
||||
// doesn't seem to have non-hacky ways to reset the summary metrics.
|
||||
var total float64
|
||||
var oldest float64
|
||||
for _, t := range m.processingStartTimes {
|
||||
age := m.sinceInSeconds(t)
|
||||
total += age
|
||||
if age > oldest {
|
||||
oldest = age
|
||||
}
|
||||
}
|
||||
m.unfinishedWorkSeconds.Set(total)
|
||||
m.longestRunningProcessor.Set(oldest)
|
||||
}
|
||||
|
||||
type noMetrics struct{}
|
||||
|
||||
func (noMetrics) add(item t) {}
|
||||
func (noMetrics) get(item t) {}
|
||||
func (noMetrics) done(item t) {}
|
||||
func (noMetrics) updateUnfinishedWork() {}
|
||||
|
||||
// Gets the time since the specified start in seconds.
|
||||
func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
|
||||
return m.clock.Since(start).Seconds()
|
||||
}
|
||||
|
||||
type retryMetrics interface {
|
||||
retry()
|
||||
}
|
||||
|
||||
type defaultRetryMetrics struct {
|
||||
retries CounterMetric
|
||||
}
|
||||
|
||||
func (m *defaultRetryMetrics) retry() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.retries.Inc()
|
||||
}
|
||||
|
||||
// MetricsProvider generates various metrics used by the queue.
|
||||
type MetricsProvider interface {
|
||||
NewDepthMetric(name string) GaugeMetric
|
||||
NewAddsMetric(name string) CounterMetric
|
||||
NewLatencyMetric(name string) HistogramMetric
|
||||
NewWorkDurationMetric(name string) HistogramMetric
|
||||
NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
|
||||
NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
|
||||
NewRetriesMetric(name string) CounterMetric
|
||||
}
|
||||
|
||||
type noopMetricsProvider struct{}
|
||||
|
||||
func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
|
||||
return noopMetric{}
|
||||
}
|
||||
|
||||
var globalMetricsFactory = queueMetricsFactory{
|
||||
metricsProvider: noopMetricsProvider{},
|
||||
}
|
||||
|
||||
type queueMetricsFactory struct {
|
||||
metricsProvider MetricsProvider
|
||||
|
||||
onlyOnce sync.Once
|
||||
}
|
||||
|
||||
func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
|
||||
f.onlyOnce.Do(func() {
|
||||
f.metricsProvider = mp
|
||||
})
|
||||
}
|
||||
|
||||
func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
|
||||
mp := f.metricsProvider
|
||||
if len(name) == 0 || mp == (noopMetricsProvider{}) {
|
||||
return noMetrics{}
|
||||
}
|
||||
return &defaultQueueMetrics{
|
||||
clock: clock,
|
||||
depth: mp.NewDepthMetric(name),
|
||||
adds: mp.NewAddsMetric(name),
|
||||
latency: mp.NewLatencyMetric(name),
|
||||
workDuration: mp.NewWorkDurationMetric(name),
|
||||
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
|
||||
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
|
||||
addTimes: map[t]time.Time{},
|
||||
processingStartTimes: map[t]time.Time{},
|
||||
}
|
||||
}
|
||||
|
||||
func newRetryMetrics(name string) retryMetrics {
|
||||
var ret *defaultRetryMetrics
|
||||
if len(name) == 0 {
|
||||
return ret
|
||||
}
|
||||
return &defaultRetryMetrics{
|
||||
retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
|
||||
}
|
||||
}
|
||||
|
||||
// SetProvider sets the metrics provider for all subsequently created work
|
||||
// queues. Only the first call has an effect.
|
||||
func SetProvider(metricsProvider MetricsProvider) {
|
||||
globalMetricsFactory.setProvider(metricsProvider)
|
||||
}
|
||||
63
vendor/k8s.io/client-go/util/workqueue/parallelizer.go
generated
vendored
Normal file
63
vendor/k8s.io/client-go/util/workqueue/parallelizer.go
generated
vendored
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
type DoWorkPieceFunc func(piece int)
|
||||
|
||||
// ParallelizeUntil is a framework that allows for parallelizing N
|
||||
// independent pieces of work until done or the context is canceled.
|
||||
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
|
||||
var stop <-chan struct{}
|
||||
if ctx != nil {
|
||||
stop = ctx.Done()
|
||||
}
|
||||
|
||||
toProcess := make(chan int, pieces)
|
||||
for i := 0; i < pieces; i++ {
|
||||
toProcess <- i
|
||||
}
|
||||
close(toProcess)
|
||||
|
||||
if pieces < workers {
|
||||
workers = pieces
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer wg.Done()
|
||||
for piece := range toProcess {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
doWorkPiece(piece)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
212
vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
Normal file
212
vendor/k8s.io/client-go/util/workqueue/queue.go
generated
vendored
Normal file
@@ -0,0 +1,212 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
type Interface interface {
|
||||
Add(item interface{})
|
||||
Len() int
|
||||
Get() (item interface{}, shutdown bool)
|
||||
Done(item interface{})
|
||||
ShutDown()
|
||||
ShuttingDown() bool
|
||||
}
|
||||
|
||||
// New constructs a new work queue (see the package comment).
|
||||
func New() *Type {
|
||||
return NewNamed("")
|
||||
}
|
||||
|
||||
func NewNamed(name string) *Type {
|
||||
rc := clock.RealClock{}
|
||||
return newQueue(
|
||||
rc,
|
||||
globalMetricsFactory.newQueueMetrics(name, rc),
|
||||
defaultUnfinishedWorkUpdatePeriod,
|
||||
)
|
||||
}
|
||||
|
||||
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
|
||||
t := &Type{
|
||||
clock: c,
|
||||
dirty: set{},
|
||||
processing: set{},
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
metrics: metrics,
|
||||
unfinishedWorkUpdatePeriod: updatePeriod,
|
||||
}
|
||||
go t.updateUnfinishedWorkLoop()
|
||||
return t
|
||||
}
|
||||
|
||||
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
|
||||
|
||||
// Type is a work queue (see the package comment).
|
||||
type Type struct {
|
||||
// queue defines the order in which we will work on items. Every
|
||||
// element of queue should be in the dirty set and not in the
|
||||
// processing set.
|
||||
queue []t
|
||||
|
||||
// dirty defines all of the items that need to be processed.
|
||||
dirty set
|
||||
|
||||
// Things that are currently being processed are in the processing set.
|
||||
// These things may be simultaneously in the dirty set. When we finish
|
||||
// processing something and remove it from this set, we'll check if
|
||||
// it's in the dirty set, and if so, add it to the queue.
|
||||
processing set
|
||||
|
||||
cond *sync.Cond
|
||||
|
||||
shuttingDown bool
|
||||
|
||||
metrics queueMetrics
|
||||
|
||||
unfinishedWorkUpdatePeriod time.Duration
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
type empty struct{}
|
||||
type t interface{}
|
||||
type set map[t]empty
|
||||
|
||||
func (s set) has(item t) bool {
|
||||
_, exists := s[item]
|
||||
return exists
|
||||
}
|
||||
|
||||
func (s set) insert(item t) {
|
||||
s[item] = empty{}
|
||||
}
|
||||
|
||||
func (s set) delete(item t) {
|
||||
delete(s, item)
|
||||
}
|
||||
|
||||
// Add marks item as needing processing.
|
||||
func (q *Type) Add(item interface{}) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
if q.shuttingDown {
|
||||
return
|
||||
}
|
||||
if q.dirty.has(item) {
|
||||
return
|
||||
}
|
||||
|
||||
q.metrics.add(item)
|
||||
|
||||
q.dirty.insert(item)
|
||||
if q.processing.has(item) {
|
||||
return
|
||||
}
|
||||
|
||||
q.queue = append(q.queue, item)
|
||||
q.cond.Signal()
|
||||
}
|
||||
|
||||
// Len returns the current queue length, for informational purposes only. You
|
||||
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
|
||||
// value, that can't be synchronized properly.
|
||||
func (q *Type) Len() int {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
return len(q.queue)
|
||||
}
|
||||
|
||||
// Get blocks until it can return an item to be processed. If shutdown = true,
|
||||
// the caller should end their goroutine. You must call Done with item when you
|
||||
// have finished processing it.
|
||||
func (q *Type) Get() (item interface{}, shutdown bool) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
for len(q.queue) == 0 && !q.shuttingDown {
|
||||
q.cond.Wait()
|
||||
}
|
||||
if len(q.queue) == 0 {
|
||||
// We must be shutting down.
|
||||
return nil, true
|
||||
}
|
||||
|
||||
item, q.queue = q.queue[0], q.queue[1:]
|
||||
|
||||
q.metrics.get(item)
|
||||
|
||||
q.processing.insert(item)
|
||||
q.dirty.delete(item)
|
||||
|
||||
return item, false
|
||||
}
|
||||
|
||||
// Done marks item as done processing, and if it has been marked as dirty again
|
||||
// while it was being processed, it will be re-added to the queue for
|
||||
// re-processing.
|
||||
func (q *Type) Done(item interface{}) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
q.metrics.done(item)
|
||||
|
||||
q.processing.delete(item)
|
||||
if q.dirty.has(item) {
|
||||
q.queue = append(q.queue, item)
|
||||
q.cond.Signal()
|
||||
}
|
||||
}
|
||||
|
||||
// ShutDown will cause q to ignore all new items added to it. As soon as the
|
||||
// worker goroutines have drained the existing items in the queue, they will be
|
||||
// instructed to exit.
|
||||
func (q *Type) ShutDown() {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
q.shuttingDown = true
|
||||
q.cond.Broadcast()
|
||||
}
|
||||
|
||||
func (q *Type) ShuttingDown() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
||||
return q.shuttingDown
|
||||
}
|
||||
|
||||
func (q *Type) updateUnfinishedWorkLoop() {
|
||||
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
|
||||
defer t.Stop()
|
||||
for range t.C() {
|
||||
if !func() bool {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
if !q.shuttingDown {
|
||||
q.metrics.updateUnfinishedWork()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
}() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
69
vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
generated
vendored
Normal file
69
vendor/k8s.io/client-go/util/workqueue/rate_limiting_queue.go
generated
vendored
Normal file
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
// RateLimitingInterface is an interface that rate limits items being added to the queue.
|
||||
type RateLimitingInterface interface {
|
||||
DelayingInterface
|
||||
|
||||
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
|
||||
AddRateLimited(item interface{})
|
||||
|
||||
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
|
||||
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
|
||||
// still have to call `Done` on the queue.
|
||||
Forget(item interface{})
|
||||
|
||||
// NumRequeues returns back how many times the item was requeued
|
||||
NumRequeues(item interface{}) int
|
||||
}
|
||||
|
||||
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
|
||||
// Remember to call Forget! If you don't, you may end up tracking failures forever.
|
||||
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
|
||||
return &rateLimitingType{
|
||||
DelayingInterface: NewDelayingQueue(),
|
||||
rateLimiter: rateLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
|
||||
return &rateLimitingType{
|
||||
DelayingInterface: NewNamedDelayingQueue(name),
|
||||
rateLimiter: rateLimiter,
|
||||
}
|
||||
}
|
||||
|
||||
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
|
||||
type rateLimitingType struct {
|
||||
DelayingInterface
|
||||
|
||||
rateLimiter RateLimiter
|
||||
}
|
||||
|
||||
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
|
||||
func (q *rateLimitingType) AddRateLimited(item interface{}) {
|
||||
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
|
||||
}
|
||||
|
||||
func (q *rateLimitingType) NumRequeues(item interface{}) int {
|
||||
return q.rateLimiter.NumRequeues(item)
|
||||
}
|
||||
|
||||
func (q *rateLimitingType) Forget(item interface{}) {
|
||||
q.rateLimiter.Forget(item)
|
||||
}
|
||||
Reference in New Issue
Block a user