From 45eb83583319f78d7d24cb0fe848c76a337204fa Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 23 Feb 2016 18:44:59 -0800 Subject: [PATCH] Lock across item expiration in the ttl store. --- pkg/client/cache/expiration_cache.go | 35 ++++++++----- pkg/client/cache/expiration_cache_fakes.go | 1 + pkg/client/cache/expiration_cache_test.go | 57 +++++++++++++++++++++- 3 files changed, 78 insertions(+), 15 deletions(-) diff --git a/pkg/client/cache/expiration_cache.go b/pkg/client/cache/expiration_cache.go index 5eb996b66bf36..c5797bcd8a1b3 100644 --- a/pkg/client/cache/expiration_cache.go +++ b/pkg/client/cache/expiration_cache.go @@ -17,11 +17,11 @@ limitations under the License. package cache import ( + "sync" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/runtime" ) // ExpirationCache implements the store interface @@ -29,12 +29,20 @@ import ( // a. The key is computed based off the original item/keyFunc // b. The value inserted under that key is the timestamped item // 2. Expiration happens lazily on read based on the expiration policy +// a. No item can be inserted into the store while we're expiring +// *any* item in the cache. // 3. Time-stamps are stripped off unexpired entries before return +// Note that the ExpirationCache is inherently slower than a normal +// threadSafeStore because it takes a write lock everytime it checks if +// an item has expired. type ExpirationCache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc clock util.Clock expirationPolicy ExpirationPolicy + // expirationLock is a write lock used to guarantee that we don't clobber + // newly inserted objects because of a stale expiration timestamp comparison + expirationLock sync.Mutex } // ExpirationPolicy dictates when an object expires. Currently only abstracted out @@ -68,7 +76,6 @@ type timestampedEntry struct { // getTimestampedEntry returnes the timestampedEntry stored under the given key. func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) { item, _ := c.cacheStorage.Get(key) - // TODO: Check the cast instead if tsEntry, ok := item.(*timestampedEntry); ok { return tsEntry, true } @@ -76,24 +83,20 @@ func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bo } // getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't -// already expired. It kicks-off a go routine to delete expired objects from -// the store and sets exists=false. +// already expired. It holds a write lock across deletion. func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { + // Prevent all inserts from the time we deem an item as "expired" to when we + // delete it, so an un-expired item doesn't sneak in under the same key, just + // before the Delete. + c.expirationLock.Lock() + defer c.expirationLock.Unlock() timestampedItem, exists := c.getTimestampedEntry(key) if !exists { return nil, false } if c.expirationPolicy.IsExpired(timestampedItem) { glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj) - // Since expiration happens lazily on read, don't hold up - // the reader trying to acquire a write lock for the delete. - // The next reader will retry the delete even if this one - // fails; as long as we only return un-expired entries a - // reader doesn't need to wait for the result of the delete. - go func() { - defer runtime.HandleCrash() - c.cacheStorage.Delete(key) - }() + c.cacheStorage.Delete(key) return nil, false } return timestampedItem.obj, true @@ -141,6 +144,8 @@ func (c *ExpirationCache) ListKeys() []string { // Add timestamps an item and inserts it into the cache, overwriting entries // that might exist under the same key. func (c *ExpirationCache) Add(obj interface{}) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} @@ -157,6 +162,8 @@ func (c *ExpirationCache) Update(obj interface{}) error { // Delete removes an item from the cache. func (c *ExpirationCache) Delete(obj interface{}) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} @@ -169,6 +176,8 @@ func (c *ExpirationCache) Delete(obj interface{}) error { // before attempting the replace operation. The replace operation will // delete the contents of the ExpirationCache `c`. func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() items := map[string]interface{}{} ts := c.clock.Now() for _, item := range list { diff --git a/pkg/client/cache/expiration_cache_fakes.go b/pkg/client/cache/expiration_cache_fakes.go index 2e9a25d121a82..3b95977050e1e 100644 --- a/pkg/client/cache/expiration_cache_fakes.go +++ b/pkg/client/cache/expiration_cache_fakes.go @@ -28,6 +28,7 @@ type fakeThreadSafeMap struct { func (c *fakeThreadSafeMap) Delete(key string) { if c.deletedKeys != nil { + c.ThreadSafeStore.Delete(key) c.deletedKeys <- key } } diff --git a/pkg/client/cache/expiration_cache_test.go b/pkg/client/cache/expiration_cache_test.go index 2e8cc5b572fe3..04a05786f89c4 100644 --- a/pkg/client/cache/expiration_cache_test.go +++ b/pkg/client/cache/expiration_cache_test.go @@ -28,7 +28,7 @@ import ( func TestTTLExpirationBasic(t *testing.T) { testObj := testStoreObject{id: "foo", val: "bar"} - deleteChan := make(chan string) + deleteChan := make(chan string, 1) ttlStore := NewFakeExpirationStore( testStoreKeyFunc, deleteChan, &FakeExpirationPolicy{ @@ -62,6 +62,59 @@ func TestTTLExpirationBasic(t *testing.T) { close(deleteChan) } +func TestReAddExpiredItem(t *testing.T) { + deleteChan := make(chan string, 1) + exp := &FakeExpirationPolicy{ + NeverExpire: sets.NewString(), + RetrieveKeyFunc: func(obj interface{}) (string, error) { + return obj.(*timestampedEntry).obj.(testStoreObject).id, nil + }, + } + ttlStore := NewFakeExpirationStore( + testStoreKeyFunc, deleteChan, exp, util.RealClock{}) + testKey := "foo" + testObj := testStoreObject{id: testKey, val: "bar"} + err := ttlStore.Add(testObj) + if err != nil { + t.Errorf("Unable to add obj %#v", testObj) + } + + // This get will expire the item. + item, exists, err := ttlStore.Get(testObj) + if err != nil { + t.Errorf("Failed to get from store, %v", err) + } + if exists || item != nil { + t.Errorf("Got unexpected item %#v", item) + } + + key, _ := testStoreKeyFunc(testObj) + differentValue := "different_bar" + err = ttlStore.Add( + testStoreObject{id: testKey, val: differentValue}) + if err != nil { + t.Errorf("Failed to add second value") + } + + select { + case delKey := <-deleteChan: + if delKey != key { + t.Errorf("Unexpected delete for key %s", key) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Unexpected timeout waiting on delete") + } + exp.NeverExpire = sets.NewString(testKey) + item, exists, err = ttlStore.GetByKey(testKey) + if err != nil { + t.Errorf("Failed to get from store, %v", err) + } + if !exists || item == nil || item.(testStoreObject).val != differentValue { + t.Errorf("Got unexpected item %#v", item) + } + close(deleteChan) +} + func TestTTLList(t *testing.T) { testObjs := []testStoreObject{ {id: "foo", val: "bar"}, @@ -69,7 +122,7 @@ func TestTTLList(t *testing.T) { {id: "foo2", val: "bar2"}, } expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id) - deleteChan := make(chan string) + deleteChan := make(chan string, len(testObjs)) defer close(deleteChan) ttlStore := NewFakeExpirationStore(