Skip to content

Commit

Permalink
Move NamespaceLifecycle to use shared informers
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr committed Aug 4, 2016
1 parent 33239c1 commit 4c37a81
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 105 deletions.
122 changes: 80 additions & 42 deletions plugin/pkg/admission/namespace/lifecycle/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,28 @@ import (
"io"
"time"

lru "github.com/hashicorp/golang-lru"

clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"

"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
)

const PluginName = "NamespaceLifecycle"
const (
// Name of admission plug-in
PluginName = "NamespaceLifecycle"
// how long a namespace stays in the force live lookup cache before expiration.
forceLiveLookupTTL = 30 * time.Second
)

func init() {
admission.RegisterPlugin(PluginName, func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
return NewLifecycle(client, sets.NewString(api.NamespaceDefault, api.NamespaceSystem)), nil
return NewLifecycle(client, sets.NewString(api.NamespaceDefault, api.NamespaceSystem))
})
}

Expand All @@ -45,11 +51,29 @@ func init() {
type lifecycle struct {
*admission.Handler
client clientset.Interface
store cache.Store
immortalNamespaces sets.String
namespaceInformer framework.SharedIndexInformer
// forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache.
// if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server.
forceLiveLookupCache *lru.Cache
}

type forceLiveLookupEntry struct {
expiry time.Time
}

var _ = admission.WantsInformerFactory(&lifecycle{})

func makeNamespaceKey(namespace string) *api.Namespace {
return &api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: namespace,
Namespace: "",
},
}
}

func (l *lifecycle) Admit(a admission.Attributes) (err error) {
func (l *lifecycle) Admit(a admission.Attributes) error {
// prevent deletion of immortal namespaces
if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) {
return errors.NewForbidden(a.GetResource().GroupResource(), a.GetName(), fmt.Errorf("this namespace may not be deleted"))
Expand All @@ -61,31 +85,44 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
if len(a.GetNamespace()) == 0 || a.GetKind().GroupKind() == api.Kind("Namespace") {
// if a namespace is deleted, we want to prevent all further creates into it
// while it is undergoing termination. to reduce incidences where the cache
// is slow to update, we forcefully remove the namespace from our local cache.
// this will cause a live lookup of the namespace to get its latest state even
// before the watch notification is received.
// is slow to update, we add the namespace into a force live lookup list to ensure
// we are not looking at stale state.
if a.GetOperation() == admission.Delete {
l.store.Delete(&api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: a.GetName(),
},
})
newEntry := forceLiveLookupEntry{
expiry: time.Now().Add(forceLiveLookupTTL),
}
l.forceLiveLookupCache.Add(a.GetName(), newEntry)
}
return nil
}

namespaceObj, exists, err := l.store.Get(&api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: a.GetNamespace(),
Namespace: "",
},
})
// we need to wait for our caches to warm
if !l.WaitForReady() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}

var (
namespaceObj interface{}
exists bool
err error
)

key := makeNamespaceKey(a.GetNamespace())
namespaceObj, exists, err = l.namespaceInformer.GetStore().Get(key)
if err != nil {
return errors.NewInternalError(err)
}

// forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server.
forceLiveLookup := false
lruItemObj, ok := l.forceLiveLookupCache.Get(a.GetNamespace())
if ok && lruItemObj.(forceLiveLookupEntry).expiry.Before(time.Now()) {
// we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup.
forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive
}

// refuse to operate on non-existent namespaces
if !exists {
if !exists || forceLiveLookup {
// in case of latency in our caches, make a call direct to storage to verify that it truly exists or not
namespaceObj, err = l.client.Core().Namespaces().Get(a.GetNamespace())
if err != nil {
Expand All @@ -111,26 +148,27 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
}

// NewLifecycle creates a new namespace lifecycle admission control handler
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
5*time.Minute,
)
reflector.Run()
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) {
forceLiveLookupCache, err := lru.New(100)
if err != nil {
panic(err)
}
return &lifecycle{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
client: c,
store: store,
immortalNamespaces: immortalNamespaces,
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
client: c,
immortalNamespaces: immortalNamespaces,
forceLiveLookupCache: forceLiveLookupCache,
}, nil
}

func (l *lifecycle) SetInformerFactory(f informers.SharedInformerFactory) {
l.namespaceInformer = f.Namespaces().Informer()
l.SetReadyFunc(l.namespaceInformer.HasSynced)
}

func (l *lifecycle) Validate() error {
if l.namespaceInformer == nil {
return fmt.Errorf("missing namespaceInformer")
}
return nil
}
164 changes: 101 additions & 63 deletions plugin/pkg/admission/namespace/lifecycle/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,131 @@ package lifecycle

import (
"fmt"
"sync"
"testing"
"time"

"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
)

// TestAdmission
func TestAdmission(t *testing.T) {
namespaceObj := &api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: "test",
Namespace: "",
},
Status: api.NamespaceStatus{
Phase: api.NamespaceActive,
},
// newHandlerForTest returns a configured handler for testing.
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler, err := NewLifecycle(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem))
if err != nil {
return nil, f, err
}
var namespaceLock sync.RWMutex

store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(namespaceObj)
mockClient := fake.NewSimpleClientset()
mockClient.PrependReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceLock.RLock()
defer namespaceLock.RUnlock()
if getAction, ok := action.(testclient.GetAction); ok && getAction.GetName() == namespaceObj.Name {
return true, namespaceObj, nil
plugins := []admission.Interface{handler}
pluginInitializer := admission.NewPluginInitializer(f)
pluginInitializer.Initialize(plugins)
err = admission.Validate(plugins)
return handler, f, err
}

// newMockClientForTest creates a mock client that returns a client configured for the specified list of namespaces with the specified phase.
func newMockClientForTest(namespaces map[string]api.NamespacePhase) *fake.Clientset {
mockClient := &fake.Clientset{}
mockClient.AddReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceList := &api.NamespaceList{
ListMeta: unversioned.ListMeta{
ResourceVersion: fmt.Sprintf("%d", len(namespaces)),
},
}
return true, nil, fmt.Errorf("No result for action %v", action)
})
mockClient.PrependReactor("list", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceLock.RLock()
defer namespaceLock.RUnlock()
return true, &api.NamespaceList{Items: []api.Namespace{*namespaceObj}}, nil
index := 0
for name, phase := range namespaces {
namespaceList.Items = append(namespaceList.Items, api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: name,
ResourceVersion: fmt.Sprintf("%d", index),
},
Status: api.NamespaceStatus{
Phase: phase,
},
})
index++
}
return true, namespaceList, nil
})
return mockClient
}

lfhandler := NewLifecycle(mockClient, sets.NewString("default")).(*lifecycle)
lfhandler.store = store
handler := admission.NewChainHandler(lfhandler)
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespaceObj.Name},
// newPod returns a new pod for the specified namespace
func newPod(namespace string) api.Pod {
return api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
Containers: []api.Container{{Name: "ctr", Image: "image"}},
},
}
badPod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "456", Namespace: "doesnotexist"},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
Containers: []api.Container{{Name: "ctr", Image: "image"}},
},
}

// TestAdmissionNamespaceDoesNotExist verifies pod is not admitted if namespace does not exist.
func TestAdmissionNamespaceDoesNotExist(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{})
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("nope, out of luck")
})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)

pod := newPod(namespace)
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
actions := ""
for _, action := range mockClient.Actions() {
actions = actions + action.GetVerb() + ":" + action.GetResource().Resource + ":" + action.GetSubresource() + ", "
}
t.Errorf("expected error returned from admission handler: %v", actions)
}
}

// TestAdmissionNamespaceActive verifies a resource is admitted when the namespace is active.
func TestAdmissionNamespaceActive(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{
namespace: api.NamespaceActive,
})

handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
err := handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
informerFactory.Start(wait.NeverStop)

pod := newPod(namespace)
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Unexpected error returned from admission handler: %v", err)
t.Errorf("unexpected error returned from admission handler")
}
}

// TestAdmissionNamespaceTerminating verifies a resource is not created when the namespace is active.
func TestAdmissionNamespaceTerminating(t *testing.T) {
namespace := "test"
mockClient := newMockClientForTest(map[string]api.NamespacePhase{
namespace: api.NamespaceTerminating,
})

// change namespace state to terminating
namespaceLock.Lock()
namespaceObj.Status.Phase = api.NamespaceTerminating
namespaceLock.Unlock()
store.Add(namespaceObj)
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)

pod := newPod(namespace)
// verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
Expand Down Expand Up @@ -118,20 +172,4 @@ func TestAdmission(t *testing.T) {
if err != nil {
t.Errorf("Did not expect an error %v", err)
}

// verify create/update/delete of object in non-existent namespace throws error
err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be created in non-existant namespaces", err)
}

err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be updated in non-existant namespaces", err)
}

err = handler.Admit(admission.NewAttributesRecord(&badPod, nil, api.Kind("Pod").WithVersion("version"), badPod.Namespace, badPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Delete, nil))
if err == nil {
t.Errorf("Expected, but didn't get, an error (%v) that objects cannot be deleted in non-existant namespaces", err)
}
}

0 comments on commit 4c37a81

Please sign in to comment.