Merge pull request #102168 from adisky/credential-provider-1

Improve concurrency and cache for kubelet credential provider
This commit is contained in:
Kubernetes Prow Robot 2021-07-02 01:16:12 -07:00 committed by GitHub
commit 3e0432c3e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 285 additions and 34 deletions

1
go.mod
View File

@ -88,6 +88,7 @@ require (
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba

View File

@ -28,10 +28,13 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
@ -44,6 +47,7 @@ import (
const (
globalCacheKey = "global"
cachePurgeInterval = time.Minute * 15
)
var (
@ -116,10 +120,14 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion)
}
clock := clock.RealClock{}
return &pluginProvider{
clock: clock,
matchImages: provider.MatchImages,
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}),
defaultCacheDuration: provider.DefaultCacheDuration.Duration,
lastCachePurge: clock.Now(),
plugin: &execPlugin{
name: provider.Name,
apiVersion: provider.APIVersion,
@ -133,8 +141,12 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
// pluginProvider is the plugin-based implementation of the DockerConfigProvider interface.
type pluginProvider struct {
clock clock.Clock
sync.Mutex
group singleflight.Group
// matchImages defines the matching image URLs this plugin should operate against.
// The plugin provider will not return any credentials for images that do not match
// against this list of match URLs.
@ -149,6 +161,9 @@ type pluginProvider struct {
// plugin is the exec implementation of the credential providing plugin.
plugin Plugin
// lastCachePurge is the last time cache is cleaned for expired entries.
lastCachePurge time.Time
}
// cacheEntry is the cache object that will be stored in cache.Store.
@ -165,12 +180,14 @@ func cacheKeyFunc(obj interface{}) (string, error) {
}
// cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp.
type cacheExpirationPolicy struct{}
type cacheExpirationPolicy struct {
clock clock.Clock
}
// IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the
// cache duration returned from the credential provider plugin response.
func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
return time.Now().After(entry.Obj.(*cacheEntry).expiresAt)
return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
}
// Provide returns a credentialprovider.DockerConfig based on the credentials returned
@ -180,9 +197,6 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return credentialprovider.DockerConfig{}
}
p.Lock()
defer p.Unlock()
cachedConfig, found, err := p.getCachedCredentials(image)
if err != nil {
klog.Errorf("Failed to get cached docker config: %v", err)
@ -193,12 +207,27 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return cachedConfig
}
response, err := p.plugin.ExecPlugin(context.Background(), image)
// ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request.
// The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache
// on per registry/global basis then exec will be called for all requests if requests are made concurrently.
// foo.bar.registry
// foo.bar.registry/image1
// foo.bar.registry/image2
res, err, _ := p.group.Do(image, func() (interface{}, error) {
return p.plugin.ExecPlugin(context.Background(), image)
})
if err != nil {
klog.Errorf("Failed getting credential from external registry credential provider: %v", err)
return credentialprovider.DockerConfig{}
}
response, ok := res.(*credentialproviderapi.CredentialProviderResponse)
if !ok {
klog.Errorf("Invalid response type returned by external credential provider")
return credentialprovider.DockerConfig{}
}
var cacheKey string
switch cacheKeyType := response.CacheKeyType; cacheKeyType {
case credentialproviderapi.ImagePluginCacheKeyType:
@ -232,10 +261,9 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
if p.defaultCacheDuration == 0 {
return dockerConfig
}
expiresAt = time.Now().Add(p.defaultCacheDuration)
expiresAt = p.clock.Now().Add(p.defaultCacheDuration)
} else {
expiresAt = time.Now().Add(response.CacheDuration.Duration)
expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
}
cachedEntry := &cacheEntry{
@ -269,6 +297,16 @@ func (p *pluginProvider) isImageAllowed(image string) bool {
// getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin.
func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
p.Lock()
if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
// NewExpirationCache purges expired entries when List() is called
// The expired entry in the cache is removed only when Get or List called on it.
// List() is called on some interval to remove those expired entries on which Get is never called.
_ = p.cache.List()
p.lastCachePurge = p.clock.Now()
}
p.Unlock()
obj, found, err := p.cache.GetByKey(image)
if err != nil {
return nil, false, err
@ -325,6 +363,8 @@ type execPlugin struct {
// The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and
// return CredentialProviderResponse via stdout.
func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)
authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
data, err := e.encodeRequest(authRequest)
if err != nil {
@ -361,7 +401,6 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp
}
data = stdout.Bytes()
// check that the response apiVersion matches what is expected
gvk, err := json.DefaultMetaFactory.Interpret(data)
if err != nil {
@ -369,10 +408,10 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp
}
if gvk.GroupVersion().String() != e.apiVersion {
return nil, errors.New("apiVersion from credential plugin response did not match")
return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String())
}
response, err := e.decodeResponse(stdout.Bytes())
response, err := e.decodeResponse(data)
if err != nil {
// err is explicitly not wrapped since it may contain credentials in the response.
return nil, errors.New("error decoding credential provider plugin response from stdout")

View File

@ -18,12 +18,17 @@ package plugin
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/tools/cache"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1"
@ -48,6 +53,7 @@ func (f *fakeExecPlugin) ExecPlugin(ctx context.Context, image string) (*credent
}
func Test_Provide(t *testing.T) {
tclock := clock.RealClock{}
testcases := []struct {
name string
pluginProvider *pluginProvider
@ -57,8 +63,10 @@ func Test_Provide(t *testing.T) {
{
name: "exact image match, with Registry cache key",
pluginProvider: &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -80,8 +88,10 @@ func Test_Provide(t *testing.T) {
{
name: "exact image match, with Image cache key",
pluginProvider: &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test.registry.io/foo/bar"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -103,8 +113,10 @@ func Test_Provide(t *testing.T) {
{
name: "exact image match, with Global cache key",
pluginProvider: &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -126,8 +138,10 @@ func Test_Provide(t *testing.T) {
{
name: "wild card image match, with Registry cache key",
pluginProvider: &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io:8080"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -149,8 +163,10 @@ func Test_Provide(t *testing.T) {
{
name: "wild card image match, with Image cache key",
pluginProvider: &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -172,8 +188,10 @@ func Test_Provide(t *testing.T) {
{
name: "wild card image match, with Global cache key",
pluginProvider: &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
@ -195,7 +213,9 @@ func Test_Provide(t *testing.T) {
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
dockerconfig := testcase.pluginProvider.Provide(testcase.image)
if !reflect.DeepEqual(dockerconfig, testcase.dockerconfig) {
t.Logf("actual docker config: %v", dockerconfig)
@ -206,6 +226,184 @@ func Test_Provide(t *testing.T) {
}
}
// This test calls Provide in parallel for different registries and images
// The purpose of this is to detect any race conditions while cache rw.
func Test_ProvideParallel(t *testing.T) {
tclock := clock.RealClock{}
testcases := []struct {
name string
registry string
}{
{
name: "provide for registry 1",
registry: "test1.registry.io",
},
{
name: "provide for registry 2",
registry: "test2.registry.io",
},
{
name: "provide for registry 3",
registry: "test3.registry.io",
},
{
name: "provide for registry 4",
registry: "test4.registry.io",
},
}
pluginProvider := &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"test1.registry.io", "test2.registry.io", "test3.registry.io", "test4.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheDuration: time.Minute * 1,
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
auth: map[string]credentialproviderapi.AuthConfig{
"test.registry.io": {
Username: "user",
Password: "password",
},
},
},
}
dockerconfig := credentialprovider.DockerConfig{
"test.registry.io": credentialprovider.DockerConfigEntry{
Username: "user",
Password: "password",
},
}
for _, testcase := range testcases {
testcase := testcase
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(w *sync.WaitGroup) {
image := fmt.Sprintf(testcase.registry+"/%s", rand.String(5))
dockerconfigResponse := pluginProvider.Provide(image)
if !reflect.DeepEqual(dockerconfigResponse, dockerconfig) {
t.Logf("actual docker config: %v", dockerconfigResponse)
t.Logf("expected docker config: %v", dockerconfig)
t.Error("unexpected docker config")
}
w.Done()
}(&wg)
}
wg.Wait()
})
}
}
func Test_getCachedCredentials(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
p := &pluginProvider{
clock: fakeClock,
lastCachePurge: fakeClock.Now(),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: fakeClock}),
plugin: &fakeExecPlugin{},
}
testcases := []struct {
name string
step time.Duration
cacheEntry cacheEntry
expectedResponse credentialprovider.DockerConfig
keyLength int
getKey string
}{
{
name: "It should return not expired credential",
step: 1 * time.Second,
keyLength: 1,
getKey: "image1",
expectedResponse: map[string]credentialprovider.DockerConfigEntry{
"image1": {
Username: "user1",
Password: "pass1",
},
},
cacheEntry: cacheEntry{
key: "image1",
expiresAt: fakeClock.Now().Add(1 * time.Minute),
credentials: map[string]credentialprovider.DockerConfigEntry{
"image1": {
Username: "user1",
Password: "pass1",
},
},
},
},
{
name: "It should not return expired credential",
step: 2 * time.Minute,
getKey: "image2",
keyLength: 1,
cacheEntry: cacheEntry{
key: "image2",
expiresAt: fakeClock.Now(),
credentials: map[string]credentialprovider.DockerConfigEntry{
"image2": {
Username: "user2",
Password: "pass2",
},
},
},
},
{
name: "It should delete expired credential during purge",
step: 18 * time.Minute,
keyLength: 0,
// while get call for random, cache purge will be called and it will delete expired
// image3 credentials. We cannot use image3 as getKey here, as it will get deleted during
// get only, we will not be able verify the purge call.
getKey: "random",
cacheEntry: cacheEntry{
key: "image3",
expiresAt: fakeClock.Now().Add(2 * time.Minute),
credentials: map[string]credentialprovider.DockerConfigEntry{
"image3": {
Username: "user3",
Password: "pass3",
},
},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
p.cache.Add(&tc.cacheEntry)
fakeClock.Step(tc.step)
// getCachedCredentials returns unexpired credentials.
res, _, err := p.getCachedCredentials(tc.getKey)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if !reflect.DeepEqual(res, tc.expectedResponse) {
t.Logf("response %v", res)
t.Logf("expected response %v", tc.expectedResponse)
t.Errorf("Unexpected response")
}
// Listkeys returns all the keys present in cache including expired keys.
if len(p.cache.ListKeys()) != tc.keyLength {
t.Errorf("Unexpected cache key length")
}
})
}
}
func Test_encodeRequest(t *testing.T) {
testcases := []struct {
name string
@ -316,9 +514,12 @@ func Test_decodeResponse(t *testing.T) {
}
func Test_RegistryCacheKeyType(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType,
cacheDuration: time.Hour,
@ -366,9 +567,12 @@ func Test_RegistryCacheKeyType(t *testing.T) {
}
func Test_ImageCacheKeyType(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType,
cacheDuration: time.Hour,
@ -416,9 +620,12 @@ func Test_ImageCacheKeyType(t *testing.T) {
}
func Test_GlobalCacheKeyType(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
cacheDuration: time.Hour,
@ -466,9 +673,12 @@ func Test_GlobalCacheKeyType(t *testing.T) {
}
func Test_NoCacheResponse(t *testing.T) {
tclock := clock.RealClock{}
pluginProvider := &pluginProvider{
clock: tclock,
lastCachePurge: tclock.Now(),
matchImages: []string{"*.registry.io"},
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}),
plugin: &fakeExecPlugin{
cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType,
cacheDuration: 0, // no cache

1
vendor/modules.txt vendored
View File

@ -1011,6 +1011,7 @@ golang.org/x/oauth2/internal
golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c => golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
## explicit
golang.org/x/sync/singleflight
# golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 => golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
## explicit