Skip to content

Commit

Permalink
Use backoff from util/flowcontroll in federated namespace controller …
Browse files Browse the repository at this point in the history
…and other minor fixes
  • Loading branch information
mwielgus committed Aug 16, 2016
1 parent 378a496 commit c1cbe47
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err

nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller"))
namespaceController := namespacecontroller.NewNamespaceController(nsClientset)
namespaceController.Start()
namespaceController.Run(wait.NeverStop)

select {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/watch"

"github.com/golang/glog"
Expand Down Expand Up @@ -60,31 +61,24 @@ type NamespaceController struct {
// Client to federated api server.
federatedApiClient federation_release_1_4.Interface

stopChan chan struct{}
// Backoff manager for namespaces
namespaceBackoff *flowcontrol.Backoff

namespaceReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
}

// A structure passed by delying deliver. It contains a namespace that should be reconciled and
// the number of trials that were made previously and ended up in some kind of namespace-related
// error (like failure to create).
type namespaceItem struct {
namespace string
trial int64
}

// NewNamespaceController returns a new namespace controller
func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceController {
nc := &NamespaceController{
federatedApiClient: client,
stopChan: make(chan struct{}),
namespaceReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
namespaceBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
}

// Build delivereres for triggering reconcilations.
Expand All @@ -103,7 +97,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
},
&api_v1.Namespace{},
controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, 0) }))
util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, 0, false) }))

// Federated informer on namespaces in members of federation.
nc.namespaceFederatedInformer = util.NewFederatedInformer(
Expand All @@ -123,15 +117,15 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
// Trigger reconcilation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some namespace opration suceeded.
util.NewTriggerOnMetaAndSpecChangesPreproc(
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, 0) },
func(obj pkg_runtime.Object) { nc.deliverNamespaceObj(obj, nc.namespaceReviewDelay, false) },
func(obj pkg_runtime.Object) { util.SetClusterName(obj, cluster.Name) },
))
},

&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the namespaces again.
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay))
nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay)
},
},
)
Expand All @@ -156,30 +150,44 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
return nc
}

func (nc *NamespaceController) Start() {
go nc.namespaceInformerController.Run(nc.stopChan)
func (nc *NamespaceController) Run(stopChan <-chan struct{}) {
go nc.namespaceInformerController.Run(stopChan)
nc.namespaceFederatedInformer.Start()
go func() {
<-stopChan
nc.namespaceFederatedInformer.Stop()
}()
nc.namespaceDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
ni := item.Value.(*namespaceItem)
nc.reconcileNamespace(ni.namespace, ni.trial)
namespace := item.Value.(string)
nc.reconcileNamespace(namespace)
})
nc.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
nc.reconcileNamespacesOnClusterChange()
})
go func() {
select {
case <-time.After(time.Minute):
nc.namespaceBackoff.GC()
case <-stopChan:
return
}
}()
}

func (nc *NamespaceController) Stop() {
nc.namespaceFederatedInformer.Stop()
close(nc.stopChan)
}

func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, trial int64) {
func (nc *NamespaceController) deliverNamespaceObj(obj interface{}, delay time.Duration, failed bool) {
namespace := obj.(*api_v1.Namespace)
nc.deliverNamespace(namespace.Name, delay, trial)
nc.deliverNamespace(namespace.Name, delay, failed)
}

func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, trial int64) {
nc.namespaceDeliverer.DeliverAfter(namespace, &namespaceItem{namespace: namespace, trial: trial}, delay)
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (nc *NamespaceController) deliverNamespace(namespace string, delay time.Duration, failed bool) {
if failed {
nc.namespaceBackoff.Next(namespace, time.Now())
delay = delay + nc.namespaceBackoff.Get(namespace)
} else {
nc.namespaceBackoff.Reset(namespace)
}
nc.namespaceDeliverer.DeliverAfter(namespace, namespace, delay)
}

// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
Expand All @@ -203,30 +211,23 @@ func (nc *NamespaceController) isSynced() bool {
// The function triggers reconcilation of all federated namespaces.
func (nc *NamespaceController) reconcileNamespacesOnClusterChange() {
if !nc.isSynced() {
nc.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(nc.clusterAvailableDelay))
nc.clusterDeliverer.DeliverAfter(allClustersKey, nil, nc.clusterAvailableDelay)
}
for _, obj := range nc.namespaceInformerStore.List() {
namespace := obj.(*api_v1.Namespace)
nc.deliverNamespace(namespace.Name, nc.smallDelay, 0)
}
}

func backoff(trial int64) time.Duration {
if trial > 12 {
return 12 * 5 * time.Second
nc.deliverNamespace(namespace.Name, nc.smallDelay, false)
}
return time.Duration(trial) * 5 * time.Second
}

func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64) {
func (nc *NamespaceController) reconcileNamespace(namespace string) {
if !nc.isSynced() {
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false)
}

baseNamespaceObj, exist, err := nc.namespaceInformerStore.GetByKey(namespace)
if err != nil {
glog.Errorf("Failed to query main namespace store for %v: %v", namespace, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1)
nc.deliverNamespace(namespace, 0, true)
return
}

Expand All @@ -240,15 +241,15 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
err = nc.federatedApiClient.Core().Namespaces().Delete(baseNamespace.Name, &api.DeleteOptions{})
if err != nil {
glog.Errorf("Failed to delete namespace %s: %v", baseNamespace.Name, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1)
nc.deliverNamespace(namespace, 0, true)
}
return
}

clusters, err := nc.namespaceFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, trial)
nc.deliverNamespace(namespace, nc.clusterAvailableDelay, false)
return
}

Expand All @@ -257,7 +258,7 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
clusterNamespaceObj, found, err := nc.namespaceFederatedInformer.GetTargetStore().GetByKey(cluster.Name, namespace)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", namespace, cluster.Name, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1)
nc.deliverNamespace(namespace, 0, true)
return
}
desiredNamespace := &api_v1.Namespace{
Expand Down Expand Up @@ -292,10 +293,10 @@ func (nc *NamespaceController) reconcileNamespace(namespace string, trial int64)
err = nc.federatedUpdater.Update(operations, nc.updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", namespace, err)
nc.deliverNamespace(namespace, backoff(trial+1), trial+1)
nc.deliverNamespace(namespace, 0, true)
return
}

// Evertyhing is in order but lets be double sure
nc.deliverNamespace(namespace, nc.namespaceReviewDelay, 0)
nc.deliverNamespace(namespace, nc.namespaceReviewDelay, false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ func TestNamespaceController(t *testing.T) {
return nil, fmt.Errorf("Unknown cluster")
}
})
namespaceController.clusterAvailableDelay = 1000 * time.Millisecond
namespaceController.clusterAvailableDelay = time.Second
namespaceController.namespaceReviewDelay = 50 * time.Millisecond
namespaceController.smallDelay = 20 * time.Millisecond
namespaceController.updateTimeout = 5 * time.Second
namespaceController.Start()

stop := make(chan struct{})
namespaceController.Run(stop)

ns1 := api_v1.Namespace{
ObjectMeta: api_v1.ObjectMeta{
Expand Down Expand Up @@ -101,7 +103,7 @@ func TestNamespaceController(t *testing.T) {
assert.Equal(t, ns1.Name, createdNamespace2.Name)
// assert.Contains(t, createdNamespace2.Annotations, "A")

namespaceController.Stop()
close(stop)
}

func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly {
Expand Down

0 comments on commit c1cbe47

Please sign in to comment.