Skip to content

Commit

Permalink
Merge pull request kubernetes#102168 from adisky/credential-provider-1
Browse files Browse the repository at this point in the history
Improve concurrency and cache for kubelet credential provider
  • Loading branch information
k8s-ci-robot authored Jul 2, 2021
2 parents 659c7e7 + def9331 commit 3e0432c
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 34 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ require (
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
Expand Down
67 changes: 53 additions & 14 deletions pkg/credentialprovider/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (
"sync"
"time"

"golang.org/x/sync/singleflight"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
Expand All @@ -43,7 +46,8 @@ import (
)

const (
globalCacheKey = "global"
globalCacheKey = "global"
cachePurgeInterval = time.Minute * 15
)

var (
Expand Down Expand Up @@ -116,10 +120,14 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro
return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion)
}

clock := clock.RealClock{}

return &pluginProvider{
clock: clock,
matchImages: provider.MatchImages,
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{}),
cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}),
defaultCacheDuration: provider.DefaultCacheDuration.Duration,
lastCachePurge: clock.Now(),
plugin: &execPlugin{
name: provider.Name,
apiVersion: provider.APIVersion,
Expand All @@ -133,8 +141,12 @@ func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialPro

// pluginProvider is the plugin-based implementation of the DockerConfigProvider interface.
type pluginProvider struct {
clock clock.Clock

sync.Mutex

group singleflight.Group

// matchImages defines the matching image URLs this plugin should operate against.
// The plugin provider will not return any credentials for images that do not match
// against this list of match URLs.
Expand All @@ -149,6 +161,9 @@ type pluginProvider struct {

// plugin is the exec implementation of the credential providing plugin.
plugin Plugin

// lastCachePurge is the last time cache is cleaned for expired entries.
lastCachePurge time.Time
}

// cacheEntry is the cache object that will be stored in cache.Store.
Expand All @@ -165,12 +180,14 @@ func cacheKeyFunc(obj interface{}) (string, error) {
}

// cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp.
type cacheExpirationPolicy struct{}
type cacheExpirationPolicy struct {
clock clock.Clock
}

// IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the
// cache duration returned from the credential provider plugin response.
func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
return time.Now().After(entry.Obj.(*cacheEntry).expiresAt)
return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
}

// Provide returns a credentialprovider.DockerConfig based on the credentials returned
Expand All @@ -180,9 +197,6 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return credentialprovider.DockerConfig{}
}

p.Lock()
defer p.Unlock()

cachedConfig, found, err := p.getCachedCredentials(image)
if err != nil {
klog.Errorf("Failed to get cached docker config: %v", err)
Expand All @@ -193,12 +207,27 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
return cachedConfig
}

response, err := p.plugin.ExecPlugin(context.Background(), image)
// ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request.
// The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache
// on per registry/global basis then exec will be called for all requests if requests are made concurrently.
// foo.bar.registry
// foo.bar.registry/image1
// foo.bar.registry/image2
res, err, _ := p.group.Do(image, func() (interface{}, error) {
return p.plugin.ExecPlugin(context.Background(), image)
})

if err != nil {
klog.Errorf("Failed getting credential from external registry credential provider: %v", err)
return credentialprovider.DockerConfig{}
}

response, ok := res.(*credentialproviderapi.CredentialProviderResponse)
if !ok {
klog.Errorf("Invalid response type returned by external credential provider")
return credentialprovider.DockerConfig{}
}

var cacheKey string
switch cacheKeyType := response.CacheKeyType; cacheKeyType {
case credentialproviderapi.ImagePluginCacheKeyType:
Expand Down Expand Up @@ -232,10 +261,9 @@ func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
if p.defaultCacheDuration == 0 {
return dockerConfig
}

expiresAt = time.Now().Add(p.defaultCacheDuration)
expiresAt = p.clock.Now().Add(p.defaultCacheDuration)
} else {
expiresAt = time.Now().Add(response.CacheDuration.Duration)
expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
}

cachedEntry := &cacheEntry{
Expand Down Expand Up @@ -269,6 +297,16 @@ func (p *pluginProvider) isImageAllowed(image string) bool {

// getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin.
func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
p.Lock()
if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
// NewExpirationCache purges expired entries when List() is called
// The expired entry in the cache is removed only when Get or List called on it.
// List() is called on some interval to remove those expired entries on which Get is never called.
_ = p.cache.List()
p.lastCachePurge = p.clock.Now()
}
p.Unlock()

obj, found, err := p.cache.GetByKey(image)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -325,6 +363,8 @@ type execPlugin struct {
// The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and
// return CredentialProviderResponse via stdout.
func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)

authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
data, err := e.encodeRequest(authRequest)
if err != nil {
Expand Down Expand Up @@ -361,18 +401,17 @@ func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialp
}

data = stdout.Bytes()

// check that the response apiVersion matches what is expected
gvk, err := json.DefaultMetaFactory.Interpret(data)
if err != nil {
return nil, fmt.Errorf("error reading GVK from response: %w", err)
}

if gvk.GroupVersion().String() != e.apiVersion {
return nil, errors.New("apiVersion from credential plugin response did not match")
return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String())
}

response, err := e.decodeResponse(stdout.Bytes())
response, err := e.decodeResponse(data)
if err != nil {
// err is explicitly not wrapped since it may contain credentials in the response.
return nil, errors.New("error decoding credential provider plugin response from stdout")
Expand Down
Loading

0 comments on commit 3e0432c

Please sign in to comment.