Skip to content

Commit

Permalink
reduce conflict retries
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed May 23, 2016
1 parent 8f104a7 commit 02c0181
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 3 deletions.
26 changes: 25 additions & 1 deletion pkg/storage/etcd/api_object_versioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,28 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
}

// APIObjectVersioner implements Versioner
var _ storage.Versioner = APIObjectVersioner{}
var Versioner storage.Versioner = APIObjectVersioner{}

// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
if err != nil {
// coder error
panic(err)
}
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
if err != nil {
// coder error
panic(err)
}

if lhsVersion == rhsVersion {
return 0
}
if lhsVersion < rhsVersion {
return -1
}

return 1
}
17 changes: 17 additions & 0 deletions pkg/storage/etcd/api_object_versioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,20 @@ func TestObjectVersioner(t *testing.T) {
t.Errorf("unexpected resource version: %#v", obj)
}
}

func TestCompareResourceVersion(t *testing.T) {
five := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "5"}}
six := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "6"}}

versioner := APIObjectVersioner{}

if e, a := -1, versioner.CompareResourceVersion(five, six); e != a {
t.Errorf("expected %v got %v", e, a)
}
if e, a := 1, versioner.CompareResourceVersion(six, five); e != a {
t.Errorf("expected %v got %v", e, a)
}
if e, a := 0, versioner.CompareResourceVersion(six, six); e != a {
t.Errorf("expected %v got %v", e, a)
}
}
46 changes: 44 additions & 2 deletions plugin/pkg/admission/resourcequota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
Expand All @@ -52,6 +53,10 @@ type quotaEvaluator struct {
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
liveTTL time.Duration
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
updatedQuotas *lru.Cache

// TODO these are used together to bucket items by namespace and then batch them up for processing.
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
Expand Down Expand Up @@ -101,6 +106,10 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
if err != nil {
return nil, err
}
updatedCache, err := lru.New(100)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
Expand All @@ -118,6 +127,7 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
registry: registry,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
updatedQuotas: updatedCache,

queue: workqueue.New(),
work: map[string][]*admissionWaiter{},
Expand Down Expand Up @@ -247,9 +257,14 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
continue
}

if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
lastErr = err

} else {
// update our cache
e.updateCache(updatedQuota)

}
}

Expand Down Expand Up @@ -472,6 +487,31 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
return ns, []*admissionWaiter{}, false
}

func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) {
key := quota.Namespace + "/" + quota.Name
e.updatedQuotas.Add(key, quota)
}

var etcdVersioner = etcd.APIObjectVersioner{}

// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
// being monotonically increasing integers
func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
key := quota.Namespace + "/" + quota.Name
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
if !ok {
return quota
}
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)

if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
e.updatedQuotas.Remove(key)
return quota
}
return cachedQuota
}

func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
// determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything
Expand Down Expand Up @@ -508,8 +548,10 @@ func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error

resourceQuotas := []api.ResourceQuota{}
for i := range items {
quota := items[i].(*api.ResourceQuota)
quota = e.checkCache(quota)
// always make a copy. We're going to muck around with this and we should never mutate the originals
resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota))
resourceQuotas = append(resourceQuotas, *quota)
}

return resourceQuotas, nil
Expand Down

0 comments on commit 02c0181

Please sign in to comment.