Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire contexts to Apps controllers #105377

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/kube-controller-manager/app/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done())
go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
return nil, true, nil
}

Expand All @@ -56,7 +56,7 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs))
return nil, true, nil
}

Expand All @@ -66,7 +66,7 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
return nil, true, nil
}

Expand All @@ -80,6 +80,6 @@ func startDeploymentController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done())
go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
return nil, true, nil
}
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func startReplicationController(ctx context.Context, controllerContext Controlle
controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs))
return nil, true, nil
}

Expand Down
59 changes: 30 additions & 29 deletions pkg/controller/controller_ref_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"fmt"
"sync"
Expand All @@ -38,13 +39,13 @@ type BaseControllerRefManager struct {

canAdoptErr error
canAdoptOnce sync.Once
CanAdoptFunc func() error
CanAdoptFunc func(ctx context.Context) error
}

func (m *BaseControllerRefManager) CanAdopt() error {
func (m *BaseControllerRefManager) CanAdopt(ctx context.Context) error {
m.canAdoptOnce.Do(func() {
if m.CanAdoptFunc != nil {
m.canAdoptErr = m.CanAdoptFunc()
m.canAdoptErr = m.CanAdoptFunc(ctx)
}
})
return m.canAdoptErr
Expand All @@ -65,7 +66,7 @@ func (m *BaseControllerRefManager) CanAdopt() error {
// own the object.
//
// No reconciliation will be attempted if the controller is being deleted.
func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt func(context.Context, metav1.Object) error, release func(metav1.Object) error) (bool, error) {
controllerRef := metav1.GetControllerOfNoCopy(obj)
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
Expand Down Expand Up @@ -107,7 +108,7 @@ func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(met
return false, nil
}
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
if err := adopt(ctx, obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -143,7 +144,7 @@ func NewPodControllerRefManager(
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
canAdopt func() error,
canAdopt func(ctx context.Context) error,
finalizers ...string,
) *PodControllerRefManager {
return &PodControllerRefManager{
Expand Down Expand Up @@ -173,7 +174,7 @@ func NewPodControllerRefManager(
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Pods that you now own is returned.
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error

Expand All @@ -190,15 +191,15 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.
}
return true
}
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptPod(ctx, obj.(*v1.Pod))
}
release := func(obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod))
}

for _, pod := range pods {
ok, err := m.ClaimObject(pod, match, adopt, release)
ok, err := m.ClaimObject(ctx, pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
Expand All @@ -212,8 +213,8 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.

// AdoptPod sends a patch to take control of the pod. It returns the error if
// the patching fails.
func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
if err := m.CanAdopt(); err != nil {
func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) error {
if err := m.CanAdopt(ctx); err != nil {
return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
Expand Down Expand Up @@ -283,7 +284,7 @@ func NewReplicaSetControllerRefManager(
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
canAdopt func() error,
canAdopt func(ctx context.Context) error,
) *ReplicaSetControllerRefManager {
return &ReplicaSetControllerRefManager{
BaseControllerRefManager: BaseControllerRefManager{
Expand All @@ -309,22 +310,22 @@ func NewReplicaSetControllerRefManager(
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ReplicaSets that you now own is
// returned.
func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) {
func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(ctx context.Context, sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) {
var claimed []*apps.ReplicaSet
var errlist []error

match := func(obj metav1.Object) bool {
return m.Selector.Matches(labels.Set(obj.GetLabels()))
}
adopt := func(obj metav1.Object) error {
return m.AdoptReplicaSet(obj.(*apps.ReplicaSet))
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet))
}
release := func(obj metav1.Object) error {
return m.ReleaseReplicaSet(obj.(*apps.ReplicaSet))
}

for _, rs := range sets {
ok, err := m.ClaimObject(rs, match, adopt, release)
ok, err := m.ClaimObject(ctx, rs, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
Expand All @@ -338,8 +339,8 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*apps.ReplicaSe

// AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns
// the error if the patching fails.
func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *apps.ReplicaSet) error {
if err := m.CanAdopt(); err != nil {
func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs *apps.ReplicaSet) error {
if err := m.CanAdopt(ctx); err != nil {
return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
Expand Down Expand Up @@ -381,9 +382,9 @@ func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.Repl
//
// The CanAdopt() function calls getObject() to fetch the latest value,
// and denies adoption attempts if that object has a non-nil DeletionTimestamp.
func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error {
return func() error {
obj, err := getObject()
func RecheckDeletionTimestamp(getObject func(context.Context) (metav1.Object, error)) func(context.Context) error {
return func(ctx context.Context) error {
obj, err := getObject(ctx)
if err != nil {
return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
}
Expand Down Expand Up @@ -421,7 +422,7 @@ func NewControllerRevisionControllerRefManager(
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
canAdopt func() error,
canAdopt func(ctx context.Context) error,
) *ControllerRevisionControllerRefManager {
return &ControllerRevisionControllerRefManager{
BaseControllerRefManager: BaseControllerRefManager{
Expand All @@ -447,22 +448,22 @@ func NewControllerRevisionControllerRefManager(
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ControllerRevisions that you now own is
// returned.
func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) {
func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(ctx context.Context, histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) {
var claimed []*apps.ControllerRevision
var errlist []error

match := func(obj metav1.Object) bool {
return m.Selector.Matches(labels.Set(obj.GetLabels()))
}
adopt := func(obj metav1.Object) error {
return m.AdoptControllerRevision(obj.(*apps.ControllerRevision))
adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision))
}
release := func(obj metav1.Object) error {
return m.ReleaseControllerRevision(obj.(*apps.ControllerRevision))
}

for _, h := range histories {
ok, err := m.ClaimObject(h, match, adopt, release)
ok, err := m.ClaimObject(ctx, h, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
Expand All @@ -476,8 +477,8 @@ func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histor

// AdoptControllerRevision sends a patch to take control of the ControllerRevision. It returns the error if
// the patching fails.
func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(history *apps.ControllerRevision) error {
if err := m.CanAdopt(); err != nil {
func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx context.Context, history *apps.ControllerRevision) error {
if err := m.CanAdopt(ctx); err != nil {
return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
Expand Down
17 changes: 9 additions & 8 deletions pkg/controller/controller_ref_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"strings"
"testing"

Expand Down Expand Up @@ -73,7 +74,7 @@ func TestClaimPods(t *testing.T) {
&v1.ReplicationController{},
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", testLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, nil)},
patches: 1,
Expand All @@ -89,7 +90,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", productionLabel, nil)},
claimed: nil,
}
Expand All @@ -105,7 +106,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", productionLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
}
Expand All @@ -121,7 +122,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", productionLabel, &controller2)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
}
Expand All @@ -135,7 +136,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
patches: 1,
Expand All @@ -156,7 +157,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil }),
func(ctx context.Context) error { return nil }),
pods: []*v1.Pod{podToDelete1, podToDelete2},
claimed: []*v1.Pod{podToDelete1},
}
Expand All @@ -170,7 +171,7 @@ func TestClaimPods(t *testing.T) {
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil },
func(ctx context.Context) error { return nil },
"foo-finalizer", "bar-finalizer"),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller), newPod("pod3", productionLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod3", productionLabel, nil)},
Expand All @@ -180,7 +181,7 @@ func TestClaimPods(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
claimed, err := test.manager.ClaimPods(test.pods)
claimed, err := test.manager.ClaimPods(context.TODO(), test.pods)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
Loading