Skip to content

Commit

Permalink
Wire contexts to Apps controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
damemi committed Oct 13, 2021
1 parent 3874675 commit 41fcb95
Show file tree
Hide file tree
Showing 36 changed files with 404 additions and 375 deletions.
8 changes: 4 additions & 4 deletions cmd/kube-controller-manager/app/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done())
go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
return nil, true, nil
}

Expand All @@ -56,7 +56,7 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs))
return nil, true, nil
}

Expand All @@ -66,7 +66,7 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
return nil, true, nil
}

Expand All @@ -80,6 +80,6 @@ func startDeploymentController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done())
go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
return nil, true, nil
}
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func startReplicationController(ctx context.Context, controllerContext Controlle
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs))
return nil, true, nil
}

Expand Down
59 changes: 30 additions & 29 deletions pkg/controller/controller_ref_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand All @@ -38,13 +39,13 @@ type BaseControllerRefManager struct {

canAdoptErr error
canAdoptOnce sync.Once
CanAdoptFunc func() error
CanAdoptFunc func(ctx context.Context) error
}

func (m *BaseControllerRefManager) CanAdopt() error {
func (m *BaseControllerRefManager) CanAdopt(ctx context.Context) error {
m.canAdoptOnce.Do(func() {
if m.CanAdoptFunc != nil {
m.canAdoptErr = m.CanAdoptFunc()
m.canAdoptErr = m.CanAdoptFunc(ctx)
}
})
return m.canAdoptErr
Expand All @@ -65,7 +66,7 @@ func (m *BaseControllerRefManager) CanAdopt() error {
// own the object.
//
// No reconciliation will be attempted if the controller is being deleted.
func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt func(context.Context, metav1.Object) error, release func(metav1.Object) error) (bool, error) {
controllerRef := metav1.GetControllerOfNoCopy(obj)
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
Expand Down Expand Up @@ -107,7 +108,7 @@ func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(met
return false, nil
}
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
if err := adopt(ctx, obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -143,7 +144,7 @@ func NewPodControllerRefManager(
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
canAdopt func() error,
canAdopt func(ctx context.Context) error,
finalizers ...string,
) *PodControllerRefManager {
return &PodControllerRefManager{
Expand Down Expand Up @@ -173,7 +174,7 @@ func NewPodControllerRefManager(
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Pods that you now own is returned.
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error

Expand All @@ -190,15 +191,15 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.
}
return true
}
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptPod(ctx, obj.(*v1.Pod))
}
release := func(obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod))
}

for _, pod := range pods {
ok, err := m.ClaimObject(pod, match, adopt, release)
ok, err := m.ClaimObject(ctx, pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
Expand All @@ -212,8 +213,8 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.

// AdoptPod sends a patch to take control of the pod. It returns the error if
// the patching fails.
func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
if err := m.CanAdopt(); err != nil {
func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) error {
if err := m.CanAdopt(ctx); err != nil {
return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
Expand Down Expand Up @@ -283,7 +284,7 @@ func NewReplicaSetControllerRefManager(
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
canAdopt func() error,
canAdopt func(ctx context.Context) error,
) *ReplicaSetControllerRefManager {
return &ReplicaSetControllerRefManager{
BaseControllerRefManager: BaseControllerRefManager{
Expand All @@ -309,22 +310,22 @@ func NewReplicaSetControllerRefManager(
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ReplicaSets that you now own is
// returned.
func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) {
func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(ctx context.Context, sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) {
var claimed []*apps.ReplicaSet
var errlist []error

match := func(obj metav1.Object) bool {
return m.Selector.Matches(labels.Set(obj.GetLabels()))
}
adopt := func(obj metav1.Object) error {
return m.AdoptReplicaSet(obj.(*apps.ReplicaSet))
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet))
}
release := func(obj metav1.Object) error {
return m.ReleaseReplicaSet(obj.(*apps.ReplicaSet))
}

for _, rs := range sets {
ok, err := m.ClaimObject(rs, match, adopt, release)
ok, err := m.ClaimObject(ctx, rs, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
Expand All @@ -338,8 +339,8 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*apps.ReplicaSe

// AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns
// the error if the patching fails.
func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *apps.ReplicaSet) error {
if err := m.CanAdopt(); err != nil {
func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs *apps.ReplicaSet) error {
if err := m.CanAdopt(ctx); err != nil {
return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
Expand Down Expand Up @@ -381,9 +382,9 @@ func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.Repl
//
// The CanAdopt() function calls getObject() to fetch the latest value,
// and denies adoption attempts if that object has a non-nil DeletionTimestamp.
func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error {
return func() error {
obj, err := getObject()
func RecheckDeletionTimestamp(getObject func(context.Context) (metav1.Object, error)) func(context.Context) error {
return func(ctx context.Context) error {
obj, err := getObject(ctx)
if err != nil {
return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
}
Expand Down Expand Up @@ -421,7 +422,7 @@ func NewControllerRevisionControllerRefManager(
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
canAdopt func() error,
canAdopt func(ctx context.Context) error,
) *ControllerRevisionControllerRefManager {
return &ControllerRevisionControllerRefManager{
BaseControllerRefManager: BaseControllerRefManager{
Expand All @@ -447,22 +448,22 @@ func NewControllerRevisionControllerRefManager(
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ControllerRevisions that you now own is
// returned.
func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) {
func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(ctx context.Context, histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) {
var claimed []*apps.ControllerRevision
var errlist []error

match := func(obj metav1.Object) bool {
return m.Selector.Matches(labels.Set(obj.GetLabels()))
}
adopt := func(obj metav1.Object) error {
return m.AdoptControllerRevision(obj.(*apps.ControllerRevision))
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision))
}
release := func(obj metav1.Object) error {
return m.ReleaseControllerRevision(obj.(*apps.ControllerRevision))
}

for _, h := range histories {
ok, err := m.ClaimObject(h, match, adopt, release)
ok, err := m.ClaimObject(ctx, h, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
Expand All @@ -476,8 +477,8 @@ func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histor

// AdoptControllerRevision sends a patch to take control of the ControllerRevision. It returns the error if
// the patching fails.
func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(history *apps.ControllerRevision) error {
if err := m.CanAdopt(); err != nil {
func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx context.Context, history *apps.ControllerRevision) error {
if err := m.CanAdopt(ctx); err != nil {
return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
Expand Down
17 changes: 9 additions & 8 deletions pkg/controller/controller_ref_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"strings"
"testing"

Expand Down Expand Up @@ -73,7 +74,7 @@ func TestClaimPods(t *testing.T) {
&v1.ReplicationController{},
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", testLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, nil)},
patches: 1,
Expand All @@ -89,7 +90,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", productionLabel, nil)},
claimed: nil,
}
Expand All @@ -105,7 +106,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", productionLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
}
Expand All @@ -121,7 +122,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", productionLabel, &controller2)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
}
Expand All @@ -135,7 +136,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
patches: 1,
Expand All @@ -156,7 +157,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{podToDelete1, podToDelete2},
claimed: []*v1.Pod{podToDelete1},
}
Expand All @@ -170,7 +171,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil },
func(ctx context.Context) error { return nil },
"foo-finalizer", "bar-finalizer"),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller), newPod("pod3", productionLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod3", productionLabel, nil)},
Expand All @@ -180,7 +181,7 @@ func TestClaimPods(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
claimed, err := test.manager.ClaimPods(test.pods)
claimed, err := test.manager.ClaimPods(context.TODO(), test.pods)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
Loading

0 comments on commit 41fcb95

Please sign in to comment.