diff --git a/pkg/client/cache/expiration_cache.go b/pkg/client/cache/expiration_cache.go index 5eb996b66bf36..c5797bcd8a1b3 100644 --- a/pkg/client/cache/expiration_cache.go +++ b/pkg/client/cache/expiration_cache.go @@ -17,11 +17,11 @@ limitations under the License. package cache import ( + "sync" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/runtime" ) // ExpirationCache implements the store interface @@ -29,12 +29,20 @@ import ( // a. The key is computed based off the original item/keyFunc // b. The value inserted under that key is the timestamped item // 2. Expiration happens lazily on read based on the expiration policy +// a. No item can be inserted into the store while we're expiring +// *any* item in the cache. // 3. Time-stamps are stripped off unexpired entries before return +// Note that the ExpirationCache is inherently slower than a normal +// threadSafeStore because it takes a write lock everytime it checks if +// an item has expired. type ExpirationCache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc clock util.Clock expirationPolicy ExpirationPolicy + // expirationLock is a write lock used to guarantee that we don't clobber + // newly inserted objects because of a stale expiration timestamp comparison + expirationLock sync.Mutex } // ExpirationPolicy dictates when an object expires. Currently only abstracted out @@ -68,7 +76,6 @@ type timestampedEntry struct { // getTimestampedEntry returnes the timestampedEntry stored under the given key. func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) { item, _ := c.cacheStorage.Get(key) - // TODO: Check the cast instead if tsEntry, ok := item.(*timestampedEntry); ok { return tsEntry, true } @@ -76,24 +83,20 @@ func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bo } // getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't -// already expired. It kicks-off a go routine to delete expired objects from -// the store and sets exists=false. +// already expired. It holds a write lock across deletion. func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) { + // Prevent all inserts from the time we deem an item as "expired" to when we + // delete it, so an un-expired item doesn't sneak in under the same key, just + // before the Delete. + c.expirationLock.Lock() + defer c.expirationLock.Unlock() timestampedItem, exists := c.getTimestampedEntry(key) if !exists { return nil, false } if c.expirationPolicy.IsExpired(timestampedItem) { glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj) - // Since expiration happens lazily on read, don't hold up - // the reader trying to acquire a write lock for the delete. - // The next reader will retry the delete even if this one - // fails; as long as we only return un-expired entries a - // reader doesn't need to wait for the result of the delete. - go func() { - defer runtime.HandleCrash() - c.cacheStorage.Delete(key) - }() + c.cacheStorage.Delete(key) return nil, false } return timestampedItem.obj, true @@ -141,6 +144,8 @@ func (c *ExpirationCache) ListKeys() []string { // Add timestamps an item and inserts it into the cache, overwriting entries // that might exist under the same key. func (c *ExpirationCache) Add(obj interface{}) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} @@ -157,6 +162,8 @@ func (c *ExpirationCache) Update(obj interface{}) error { // Delete removes an item from the cache. func (c *ExpirationCache) Delete(obj interface{}) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} @@ -169,6 +176,8 @@ func (c *ExpirationCache) Delete(obj interface{}) error { // before attempting the replace operation. The replace operation will // delete the contents of the ExpirationCache `c`. func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error { + c.expirationLock.Lock() + defer c.expirationLock.Unlock() items := map[string]interface{}{} ts := c.clock.Now() for _, item := range list { diff --git a/pkg/client/cache/expiration_cache_fakes.go b/pkg/client/cache/expiration_cache_fakes.go index 2e9a25d121a82..3b95977050e1e 100644 --- a/pkg/client/cache/expiration_cache_fakes.go +++ b/pkg/client/cache/expiration_cache_fakes.go @@ -28,6 +28,7 @@ type fakeThreadSafeMap struct { func (c *fakeThreadSafeMap) Delete(key string) { if c.deletedKeys != nil { + c.ThreadSafeStore.Delete(key) c.deletedKeys <- key } } diff --git a/pkg/client/cache/expiration_cache_test.go b/pkg/client/cache/expiration_cache_test.go index 2e8cc5b572fe3..04a05786f89c4 100644 --- a/pkg/client/cache/expiration_cache_test.go +++ b/pkg/client/cache/expiration_cache_test.go @@ -28,7 +28,7 @@ import ( func TestTTLExpirationBasic(t *testing.T) { testObj := testStoreObject{id: "foo", val: "bar"} - deleteChan := make(chan string) + deleteChan := make(chan string, 1) ttlStore := NewFakeExpirationStore( testStoreKeyFunc, deleteChan, &FakeExpirationPolicy{ @@ -62,6 +62,59 @@ func TestTTLExpirationBasic(t *testing.T) { close(deleteChan) } +func TestReAddExpiredItem(t *testing.T) { + deleteChan := make(chan string, 1) + exp := &FakeExpirationPolicy{ + NeverExpire: sets.NewString(), + RetrieveKeyFunc: func(obj interface{}) (string, error) { + return obj.(*timestampedEntry).obj.(testStoreObject).id, nil + }, + } + ttlStore := NewFakeExpirationStore( + testStoreKeyFunc, deleteChan, exp, util.RealClock{}) + testKey := "foo" + testObj := testStoreObject{id: testKey, val: "bar"} + err := ttlStore.Add(testObj) + if err != nil { + t.Errorf("Unable to add obj %#v", testObj) + } + + // This get will expire the item. + item, exists, err := ttlStore.Get(testObj) + if err != nil { + t.Errorf("Failed to get from store, %v", err) + } + if exists || item != nil { + t.Errorf("Got unexpected item %#v", item) + } + + key, _ := testStoreKeyFunc(testObj) + differentValue := "different_bar" + err = ttlStore.Add( + testStoreObject{id: testKey, val: differentValue}) + if err != nil { + t.Errorf("Failed to add second value") + } + + select { + case delKey := <-deleteChan: + if delKey != key { + t.Errorf("Unexpected delete for key %s", key) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Unexpected timeout waiting on delete") + } + exp.NeverExpire = sets.NewString(testKey) + item, exists, err = ttlStore.GetByKey(testKey) + if err != nil { + t.Errorf("Failed to get from store, %v", err) + } + if !exists || item == nil || item.(testStoreObject).val != differentValue { + t.Errorf("Got unexpected item %#v", item) + } + close(deleteChan) +} + func TestTTLList(t *testing.T) { testObjs := []testStoreObject{ {id: "foo", val: "bar"}, @@ -69,7 +122,7 @@ func TestTTLList(t *testing.T) { {id: "foo2", val: "bar2"}, } expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id) - deleteChan := make(chan string) + deleteChan := make(chan string, len(testObjs)) defer close(deleteChan) ttlStore := NewFakeExpirationStore( diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 8524ee4d128c1..127fcc5a7f78e 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -35,21 +35,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" ) -const ( - CreatedByAnnotation = "kubernetes.io/created-by" - - // If a watch drops a delete event for a pod, it'll take this long - // before a dormant controller waiting for those packets is woken up anyway. It is - // specifically targeted at the case where some problem prevents an update - // of expectations, without it the controller could stay asleep forever. This should - // be set based on the expected latency of watch events. - // - // Currently a controller can service (create *and* observe the watch events for said - // creation) about 10-20 pods a second, so it takes about 1 min to service - // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s - // latency/pod at the scale of 3000 pods over 100 nodes. - ExpectationsTimeout = 3 * time.Minute -) +const CreatedByAnnotation = "kubernetes.io/created-by" var ( KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc @@ -221,7 +207,7 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) { // NewControllerExpectations returns a store for ControlleeExpectations. func NewControllerExpectations() *ControllerExpectations { - return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)} + return &ControllerExpectations{cache.NewStore(ExpKeyFunc)} } // PodControlInterface is an interface that knows how to add or delete pods diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index c8f0df10ff97c..72a0797e742f8 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -334,12 +334,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout) + glog.Errorf("Couldn't get object from tombstone %+v", obj) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout) + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) return } } diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index b1d7a6a5838c8..729862aae7375 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -224,12 +224,12 @@ func (jm *JobController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a job recreates a pod", obj, controller.ExpectationsTimeout) + glog.Errorf("Couldn't get object from tombstone %+v", obj) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before job recreates a pod", obj, controller.ExpectationsTimeout) + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) return } } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index ba9bb1acdf229..7132c6198058a 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -351,12 +351,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a replica set recreates a replica", obj, controller.ExpectationsTimeout) + glog.Errorf("Couldn't get object from tombstone %+v", obj) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before replica set recreates a replica", obj, controller.ExpectationsTimeout) + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) return } } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 1c5729622e591..2e5b0aed816f7 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -348,12 +348,12 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout) + glog.Errorf("Couldn't get object from tombstone %+v", obj) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout) + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) return } }