Skip to content

Commit

Permalink
Merge pull request kubernetes#21856 from bprashanth/ttl_race
Browse files Browse the repository at this point in the history
Lock across item expiration in the ttl store.
  • Loading branch information
bgrant0607 committed Feb 24, 2016
2 parents 88056ed + 7e88b3e commit bea349a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 39 deletions.
35 changes: 22 additions & 13 deletions pkg/client/cache/expiration_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@ limitations under the License.
package cache

import (
"sync"
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
)

// ExpirationCache implements the store interface
// 1. All entries are automatically time stamped on insert
// a. The key is computed based off the original item/keyFunc
// b. The value inserted under that key is the timestamped item
// 2. Expiration happens lazily on read based on the expiration policy
// a. No item can be inserted into the store while we're expiring
// *any* item in the cache.
// 3. Time-stamps are stripped off unexpired entries before return
// Note that the ExpirationCache is inherently slower than a normal
// threadSafeStore because it takes a write lock everytime it checks if
// an item has expired.
type ExpirationCache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
clock util.Clock
expirationPolicy ExpirationPolicy
// expirationLock is a write lock used to guarantee that we don't clobber
// newly inserted objects because of a stale expiration timestamp comparison
expirationLock sync.Mutex
}

// ExpirationPolicy dictates when an object expires. Currently only abstracted out
Expand Down Expand Up @@ -68,32 +76,27 @@ type timestampedEntry struct {
// getTimestampedEntry returnes the timestampedEntry stored under the given key.
func (c *ExpirationCache) getTimestampedEntry(key string) (*timestampedEntry, bool) {
item, _ := c.cacheStorage.Get(key)
// TODO: Check the cast instead
if tsEntry, ok := item.(*timestampedEntry); ok {
return tsEntry, true
}
return nil, false
}

// getOrExpire retrieves the object from the timestampedEntry if and only if it hasn't
// already expired. It kicks-off a go routine to delete expired objects from
// the store and sets exists=false.
// already expired. It holds a write lock across deletion.
func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
// Prevent all inserts from the time we deem an item as "expired" to when we
// delete it, so an un-expired item doesn't sneak in under the same key, just
// before the Delete.
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
timestampedItem, exists := c.getTimestampedEntry(key)
if !exists {
return nil, false
}
if c.expirationPolicy.IsExpired(timestampedItem) {
glog.V(4).Infof("Entry %v: %+v has expired", key, timestampedItem.obj)
// Since expiration happens lazily on read, don't hold up
// the reader trying to acquire a write lock for the delete.
// The next reader will retry the delete even if this one
// fails; as long as we only return un-expired entries a
// reader doesn't need to wait for the result of the delete.
go func() {
defer runtime.HandleCrash()
c.cacheStorage.Delete(key)
}()
c.cacheStorage.Delete(key)
return nil, false
}
return timestampedItem.obj, true
Expand Down Expand Up @@ -141,6 +144,8 @@ func (c *ExpirationCache) ListKeys() []string {
// Add timestamps an item and inserts it into the cache, overwriting entries
// that might exist under the same key.
func (c *ExpirationCache) Add(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
Expand All @@ -157,6 +162,8 @@ func (c *ExpirationCache) Update(obj interface{}) error {

// Delete removes an item from the cache.
func (c *ExpirationCache) Delete(obj interface{}) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
Expand All @@ -169,6 +176,8 @@ func (c *ExpirationCache) Delete(obj interface{}) error {
// before attempting the replace operation. The replace operation will
// delete the contents of the ExpirationCache `c`.
func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error {
c.expirationLock.Lock()
defer c.expirationLock.Unlock()
items := map[string]interface{}{}
ts := c.clock.Now()
for _, item := range list {
Expand Down
1 change: 1 addition & 0 deletions pkg/client/cache/expiration_cache_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type fakeThreadSafeMap struct {

func (c *fakeThreadSafeMap) Delete(key string) {
if c.deletedKeys != nil {
c.ThreadSafeStore.Delete(key)
c.deletedKeys <- key
}
}
Expand Down
57 changes: 55 additions & 2 deletions pkg/client/cache/expiration_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestTTLExpirationBasic(t *testing.T) {
testObj := testStoreObject{id: "foo", val: "bar"}
deleteChan := make(chan string)
deleteChan := make(chan string, 1)
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan,
&FakeExpirationPolicy{
Expand Down Expand Up @@ -62,14 +62,67 @@ func TestTTLExpirationBasic(t *testing.T) {
close(deleteChan)
}

func TestReAddExpiredItem(t *testing.T) {
deleteChan := make(chan string, 1)
exp := &FakeExpirationPolicy{
NeverExpire: sets.NewString(),
RetrieveKeyFunc: func(obj interface{}) (string, error) {
return obj.(*timestampedEntry).obj.(testStoreObject).id, nil
},
}
ttlStore := NewFakeExpirationStore(
testStoreKeyFunc, deleteChan, exp, util.RealClock{})
testKey := "foo"
testObj := testStoreObject{id: testKey, val: "bar"}
err := ttlStore.Add(testObj)
if err != nil {
t.Errorf("Unable to add obj %#v", testObj)
}

// This get will expire the item.
item, exists, err := ttlStore.Get(testObj)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if exists || item != nil {
t.Errorf("Got unexpected item %#v", item)
}

key, _ := testStoreKeyFunc(testObj)
differentValue := "different_bar"
err = ttlStore.Add(
testStoreObject{id: testKey, val: differentValue})
if err != nil {
t.Errorf("Failed to add second value")
}

select {
case delKey := <-deleteChan:
if delKey != key {
t.Errorf("Unexpected delete for key %s", key)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Unexpected timeout waiting on delete")
}
exp.NeverExpire = sets.NewString(testKey)
item, exists, err = ttlStore.GetByKey(testKey)
if err != nil {
t.Errorf("Failed to get from store, %v", err)
}
if !exists || item == nil || item.(testStoreObject).val != differentValue {
t.Errorf("Got unexpected item %#v", item)
}
close(deleteChan)
}

func TestTTLList(t *testing.T) {
testObjs := []testStoreObject{
{id: "foo", val: "bar"},
{id: "foo1", val: "bar1"},
{id: "foo2", val: "bar2"},
}
expireKeys := sets.NewString(testObjs[0].id, testObjs[2].id)
deleteChan := make(chan string)
deleteChan := make(chan string, len(testObjs))
defer close(deleteChan)

ttlStore := NewFakeExpirationStore(
Expand Down
18 changes: 2 additions & 16 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
)

const (
CreatedByAnnotation = "kubernetes.io/created-by"

// If a watch drops a delete event for a pod, it'll take this long
// before a dormant controller waiting for those packets is woken up anyway. It is
// specifically targeted at the case where some problem prevents an update
// of expectations, without it the controller could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// Currently a controller can service (create *and* observe the watch events for said
// creation) about 10-20 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 3 * time.Minute
)
const CreatedByAnnotation = "kubernetes.io/created-by"

var (
KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
Expand Down Expand Up @@ -221,7 +207,7 @@ func (e *ControlleeExpectations) GetExpectations() (int64, int64) {

// NewControllerExpectations returns a store for ControlleeExpectations.
func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
}

// PodControlInterface is an interface that knows how to add or delete pods
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before ReplicaSet recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a job recreates a pod", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before job recreates a pod", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a replica set recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before replica set recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,12 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout)
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
return
}
}
Expand Down

0 comments on commit bea349a

Please sign in to comment.