diff --git a/go.mod b/go.mod index 3230a9fb47670..6287e12df289d 100644 --- a/go.mod +++ b/go.mod @@ -88,6 +88,7 @@ require ( golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba diff --git a/pkg/credentialprovider/plugin/plugin.go b/pkg/credentialprovider/plugin/plugin.go index eee135b942377..30b5b0439c913 100644 --- a/pkg/credentialprovider/plugin/plugin.go +++ b/pkg/credentialprovider/plugin/plugin.go @@ -28,10 +28,13 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider" @@ -43,7 +46,8 @@ import ( ) const ( - globalCacheKey = "global" + globalCacheKey = "global" + cachePurgeInterval = time.Minute * 15 ) var ( @@ -116,10 +120,14 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion) } + clock := clock.RealClock{} + return &pluginProvider{ + clock: clock, matchImages: provider.MatchImages, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}), defaultCacheDuration: provider.DefaultCacheDuration.Duration, + lastCachePurge: clock.Now(), plugin: &execPlugin{ name: provider.Name, apiVersion: provider.APIVersion, @@ -133,8 +141,12 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro // pluginProvider is the plugin-based implementation of the DockerConfigProvider interface. type pluginProvider struct { + clock clock.Clock + sync.Mutex + group singleflight.Group + // matchImages defines the matching image URLs this plugin should operate against. // The plugin provider will not return any credentials for images that do not match // against this list of match URLs. @@ -149,6 +161,9 @@ type pluginProvider struct { // plugin is the exec implementation of the credential providing plugin. plugin Plugin + + // lastCachePurge is the last time cache is cleaned for expired entries. + lastCachePurge time.Time } // cacheEntry is the cache object that will be stored in cache.Store. @@ -165,12 +180,14 @@ func cacheKeyFunc(obj interface{}) (string, error) { } // cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp. -type cacheExpirationPolicy struct{} +type cacheExpirationPolicy struct { + clock clock.Clock +} // IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the // cache duration returned from the credential provider plugin response. func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool { - return time.Now().After(entry.Obj.(*cacheEntry).expiresAt) + return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt) } // Provide returns a credentialprovider.DockerConfig based on the credentials returned @@ -180,9 +197,6 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig { return credentialprovider.DockerConfig{} } - p.Lock() - defer p.Unlock() - cachedConfig, found, err := p.getCachedCredentials(image) if err != nil { klog.Errorf("Failed to get cached docker config: %v", err) @@ -193,12 +207,27 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig { return cachedConfig } - response, err := p.plugin.ExecPlugin(context.Background(), image) + // ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request. + // The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache + // on per registry/global basis then exec will be called for all requests if requests are made concurrently. + // foo.bar.registry + // foo.bar.registry/image1 + // foo.bar.registry/image2 + res, err, _ := p.group.Do(image, func() (interface{}, error) { + return p.plugin.ExecPlugin(context.Background(), image) + }) + if err != nil { klog.Errorf("Failed getting credential from external registry credential provider: %v", err) return credentialprovider.DockerConfig{} } + response, ok := res.(*credentialproviderapi.CredentialProviderResponse) + if !ok { + klog.Errorf("Invalid response type returned by external credential provider") + return credentialprovider.DockerConfig{} + } + var cacheKey string switch cacheKeyType := response.CacheKeyType; cacheKeyType { case credentialproviderapi.ImagePluginCacheKeyType: @@ -232,10 +261,9 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig { if p.defaultCacheDuration == 0 { return dockerConfig } - - expiresAt = time.Now().Add(p.defaultCacheDuration) + expiresAt = p.clock.Now().Add(p.defaultCacheDuration) } else { - expiresAt = time.Now().Add(response.CacheDuration.Duration) + expiresAt = p.clock.Now().Add(response.CacheDuration.Duration) } cachedEntry := &cacheEntry{ @@ -269,6 +297,16 @@ func (p *pluginProvider) isImageAllowed(image string) bool { // getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin. func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) { + p.Lock() + if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) { + // NewExpirationCache purges expired entries when List() is called + // The expired entry in the cache is removed only when Get or List called on it. + // List() is called on some interval to remove those expired entries on which Get is never called. + _ = p.cache.List() + p.lastCachePurge = p.clock.Now() + } + p.Unlock() + obj, found, err := p.cache.GetByKey(image) if err != nil { return nil, false, err @@ -325,6 +363,8 @@ type execPlugin struct { // The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and // return CredentialProviderResponse via stdout. func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) { + klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name) + authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image} data, err := e.encodeRequest(authRequest) if err != nil { @@ -361,7 +401,6 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp } data = stdout.Bytes() - // check that the response apiVersion matches what is expected gvk, err := json.DefaultMetaFactory.Interpret(data) if err != nil { @@ -369,10 +408,10 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp } if gvk.GroupVersion().String() != e.apiVersion { - return nil, errors.New("apiVersion from credential plugin response did not match") + return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String()) } - response, err := e.decodeResponse(stdout.Bytes()) + response, err := e.decodeResponse(data) if err != nil { // err is explicitly not wrapped since it may contain credentials in the response. return nil, errors.New("error decoding credential provider plugin response from stdout") diff --git a/pkg/credentialprovider/plugin/plugin_test.go b/pkg/credentialprovider/plugin/plugin_test.go index 6c9a9358e56dd..817ad486f0882 100644 --- a/pkg/credentialprovider/plugin/plugin_test.go +++ b/pkg/credentialprovider/plugin/plugin_test.go @@ -18,12 +18,17 @@ package plugin import ( "context" + "fmt" "reflect" + "sync" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/tools/cache" credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider" credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1" @@ -48,6 +53,7 @@ func (f *fakeExecPlugin) ExecPlugin(ctx context.Context, image string) (*credent } func Test_Provide(t *testing.T) { + tclock := clock.RealClock{} testcases := []struct { name string pluginProvider *pluginProvider @@ -57,8 +63,10 @@ func Test_Provide(t *testing.T) { { name: "exact image match, with Registry cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"test.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -80,8 +88,10 @@ func Test_Provide(t *testing.T) { { name: "exact image match, with Image cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"test.registry.io/foo/bar"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test.registry.io/foo/bar"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -103,8 +113,10 @@ func Test_Provide(t *testing.T) { { name: "exact image match, with Global cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"test.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -126,8 +138,10 @@ func Test_Provide(t *testing.T) { { name: "wild card image match, with Registry cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"*.registry.io:8080"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io:8080"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -149,8 +163,10 @@ func Test_Provide(t *testing.T) { { name: "wild card image match, with Image cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"*.*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -172,8 +188,10 @@ func Test_Provide(t *testing.T) { { name: "wild card image match, with Global cache key", pluginProvider: &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, auth: map[string]credentialproviderapi.AuthConfig{ @@ -195,7 +213,9 @@ func Test_Provide(t *testing.T) { } for _, testcase := range testcases { + testcase := testcase t.Run(testcase.name, func(t *testing.T) { + t.Parallel() dockerconfig := testcase.pluginProvider.Provide(testcase.image) if !reflect.DeepEqual(dockerconfig, testcase.dockerconfig) { t.Logf("actual docker config: %v", dockerconfig) @@ -206,6 +226,184 @@ func Test_Provide(t *testing.T) { } } +// This test calls Provide in parallel for different registries and images +// The purpose of this is to detect any race conditions while cache rw. +func Test_ProvideParallel(t *testing.T) { + tclock := clock.RealClock{} + + testcases := []struct { + name string + registry string + }{ + { + name: "provide for registry 1", + registry: "test1.registry.io", + }, + { + name: "provide for registry 2", + registry: "test2.registry.io", + }, + { + name: "provide for registry 3", + registry: "test3.registry.io", + }, + { + name: "provide for registry 4", + registry: "test4.registry.io", + }, + } + + pluginProvider := &pluginProvider{ + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"test1.registry.io", "test2.registry.io", "test3.registry.io", "test4.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), + plugin: &fakeExecPlugin{ + cacheDuration: time.Minute * 1, + cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, + auth: map[string]credentialproviderapi.AuthConfig{ + "test.registry.io": { + Username: "user", + Password: "password", + }, + }, + }, + } + + dockerconfig := credentialprovider.DockerConfig{ + "test.registry.io": credentialprovider.DockerConfigEntry{ + Username: "user", + Password: "password", + }, + } + + for _, testcase := range testcases { + testcase := testcase + t.Run(testcase.name, func(t *testing.T) { + t.Parallel() + var wg sync.WaitGroup + wg.Add(5) + + for i := 0; i < 5; i++ { + go func(w *sync.WaitGroup) { + image := fmt.Sprintf(testcase.registry+"/%s", rand.String(5)) + dockerconfigResponse := pluginProvider.Provide(image) + if !reflect.DeepEqual(dockerconfigResponse, dockerconfig) { + t.Logf("actual docker config: %v", dockerconfigResponse) + t.Logf("expected docker config: %v", dockerconfig) + t.Error("unexpected docker config") + } + w.Done() + }(&wg) + } + wg.Wait() + + }) + } +} + +func Test_getCachedCredentials(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + p := &pluginProvider{ + clock: fakeClock, + lastCachePurge: fakeClock.Now(), + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: fakeClock}), + plugin: &fakeExecPlugin{}, + } + + testcases := []struct { + name string + step time.Duration + cacheEntry cacheEntry + expectedResponse credentialprovider.DockerConfig + keyLength int + getKey string + }{ + { + name: "It should return not expired credential", + step: 1 * time.Second, + keyLength: 1, + getKey: "image1", + expectedResponse: map[string]credentialprovider.DockerConfigEntry{ + "image1": { + Username: "user1", + Password: "pass1", + }, + }, + cacheEntry: cacheEntry{ + key: "image1", + expiresAt: fakeClock.Now().Add(1 * time.Minute), + credentials: map[string]credentialprovider.DockerConfigEntry{ + "image1": { + Username: "user1", + Password: "pass1", + }, + }, + }, + }, + + { + name: "It should not return expired credential", + step: 2 * time.Minute, + getKey: "image2", + keyLength: 1, + cacheEntry: cacheEntry{ + key: "image2", + expiresAt: fakeClock.Now(), + credentials: map[string]credentialprovider.DockerConfigEntry{ + "image2": { + Username: "user2", + Password: "pass2", + }, + }, + }, + }, + + { + name: "It should delete expired credential during purge", + step: 18 * time.Minute, + keyLength: 0, + // while get call for random, cache purge will be called and it will delete expired + // image3 credentials. We cannot use image3 as getKey here, as it will get deleted during + // get only, we will not be able verify the purge call. + getKey: "random", + cacheEntry: cacheEntry{ + key: "image3", + expiresAt: fakeClock.Now().Add(2 * time.Minute), + credentials: map[string]credentialprovider.DockerConfigEntry{ + "image3": { + Username: "user3", + Password: "pass3", + }, + }, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + p.cache.Add(&tc.cacheEntry) + fakeClock.Step(tc.step) + + // getCachedCredentials returns unexpired credentials. + res, _, err := p.getCachedCredentials(tc.getKey) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if !reflect.DeepEqual(res, tc.expectedResponse) { + t.Logf("response %v", res) + t.Logf("expected response %v", tc.expectedResponse) + t.Errorf("Unexpected response") + } + + // Listkeys returns all the keys present in cache including expired keys. + if len(p.cache.ListKeys()) != tc.keyLength { + t.Errorf("Unexpected cache key length") + } + }) + } +} + func Test_encodeRequest(t *testing.T) { testcases := []struct { name string @@ -316,9 +514,12 @@ func Test_decodeResponse(t *testing.T) { } func Test_RegistryCacheKeyType(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.RegistryPluginCacheKeyType, cacheDuration: time.Hour, @@ -366,9 +567,12 @@ func Test_RegistryCacheKeyType(t *testing.T) { } func Test_ImageCacheKeyType(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.ImagePluginCacheKeyType, cacheDuration: time.Hour, @@ -416,9 +620,12 @@ func Test_ImageCacheKeyType(t *testing.T) { } func Test_GlobalCacheKeyType(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, cacheDuration: time.Hour, @@ -466,9 +673,12 @@ func Test_GlobalCacheKeyType(t *testing.T) { } func Test_NoCacheResponse(t *testing.T) { + tclock := clock.RealClock{} pluginProvider := &pluginProvider{ - matchImages: []string{"*.registry.io"}, - cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}), + clock: tclock, + lastCachePurge: tclock.Now(), + matchImages: []string{"*.registry.io"}, + cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: tclock}), plugin: &fakeExecPlugin{ cacheKeyType: credentialproviderapi.GlobalPluginCacheKeyType, cacheDuration: 0, // no cache diff --git a/vendor/modules.txt b/vendor/modules.txt index 83d30878ac352..622b269312a89 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1011,6 +1011,7 @@ golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20210220032951-036812b2e83c => golang.org/x/sync v0.0.0-20210220032951-036812b2e83c +## explicit golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 => golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 ## explicit