Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set controllerRef in RCs owned by DC #14322

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/cmd/server/bootstrappolicy/controller_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func init() {
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraDeployerControllerServiceAccountName},
Rules: []rbac.PolicyRule{
rbac.NewRule("create", "get", "list", "watch", "patch", "delete").Groups(kapiGroup).Resources("pods").RuleOrDie(),

// "delete" is required here for compatibility with older deployer images
// (see https://github.com/openshift/origin/pull/14322#issuecomment-303968976)
// TODO: remove "delete" rule few releases after 3.6
rbac.NewRule("delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
rbac.NewRule("get", "list", "watch", "update").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
eventsRule(),
},
Expand All @@ -73,7 +78,7 @@ func init() {
addControllerRole(rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraDeploymentConfigControllerServiceAccountName},
Rules: []rbac.PolicyRule{
rbac.NewRule("create", "get", "list", "watch", "update", "delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
rbac.NewRule("create", "get", "list", "watch", "update", "patch", "delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
rbac.NewRule("update").Groups(deployGroup, legacyDeployGroup).Resources("deploymentconfigs/status").RuleOrDie(),
rbac.NewRule("get", "list", "watch").Groups(deployGroup, legacyDeployGroup).Resources("deploymentconfigs").RuleOrDie(),
eventsRule(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/server/bootstrappolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,10 @@ func GetOpenshiftBootstrapClusterRoles() []authorizationapi.ClusterRole {
},
},
Rules: []authorizationapi.PolicyRule{
// "delete" is required here for compatibility with older deployer images
// (see https://github.com/openshift/origin/pull/14322#issuecomment-303968976)
// TODO: remove "delete" rule few releases after 3.6
authorizationapi.NewRule("delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
authorizationapi.NewRule("get", "list", "watch", "update").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
authorizationapi.NewRule("get", "list", "watch", "create").Groups(kapiGroup).Resources("pods").RuleOrDie(),
authorizationapi.NewRule("get").Groups(kapiGroup).Resources("pods/log").RuleOrDie(),
Expand Down
175 changes: 175 additions & 0 deletions pkg/controller/controller_ref_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package controller

import (
"fmt"

"github.com/golang/glog"
kerrors "k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
kschema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
kapi "k8s.io/kubernetes/pkg/api"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcontroller "k8s.io/kubernetes/pkg/controller"
)

// RSControlInterface is an interface that knows how to add or delete
// ReplicationControllers, as well as increment or decrement them. It is used
// by the DeploymentConfig controller to ease testing of actions that it takes.
type RCControlInterface interface {
PatchReplicationController(namespace, name string, data []byte) error
}

// RealRCControl is the default implementation of RCControlInterface.
type RealRCControl struct {
KubeClient kclientset.Interface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this exported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do; otherwise you couldn't initialize RealRCControl in factory.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnozicka we can create initializer func()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could, although I don't think that it would bring any value here. Also I would rather leave the upstream code mostly unchanged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think for many of our controllers we have initializers, that is why I asked for one :-) But if this is copying the upstream pattern i'm ok with it.

Recorder record.EventRecorder
}

// To make sure RealRCControl implements RCControlInterface
var _ RCControlInterface = &RealRCControl{}

// PatchReplicationController executes a strategic merge patch contained in 'data' on RC specified by 'namespace' and 'name'
func (r RealRCControl) PatchReplicationController(namespace, name string, data []byte) error {
_, err := r.KubeClient.Core().ReplicationControllers(namespace).Patch(name, types.StrategicMergePatchType, data)
return err
}

type RCControllerRefManager struct {
kcontroller.BaseControllerRefManager
controllerKind kschema.GroupVersionKind
rcControl RCControlInterface
}

// NewRCControllerRefManager returns a RCControllerRefManager that exposes
// methods to manage the controllerRef of ReplicationControllers.
//
// The CanAdopt() function can be used to perform a potentially expensive check
// (such as a live GET from the API server) prior to the first adoption.
// It will only be called (at most once) if an adoption is actually attempted.
// If CanAdopt() returns a non-nil error, all adoptions will fail.
//
// NOTE: Once CanAdopt() is called, it will not be called again by the same
// RCControllerRefManager instance. Create a new instance if it
// makes sense to check CanAdopt() again (e.g. in a different sync pass).
func NewRCControllerRefManager(
rcControl RCControlInterface,
controller kmetav1.Object,
selector klabels.Selector,
controllerKind kschema.GroupVersionKind,
canAdopt func() error,
) *RCControllerRefManager {
return &RCControllerRefManager{
BaseControllerRefManager: kcontroller.BaseControllerRefManager{
Controller: controller,
Selector: selector,
CanAdoptFunc: canAdopt,
},
controllerKind: controllerKind,
rcControl: rcControl,
}
}

// ClaimReplicationController tries to take ownership of a ReplicationController.
//
// It will reconcile the following:
// * Adopt the ReplicationController if it's an orphan.
// * Release owned ReplicationController if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attempted and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The returned boolean indicates whether you now
// own the object.
func (m *RCControllerRefManager) ClaimReplicationController(rc *kapi.ReplicationController) (bool, error) {
match := func(obj kmetav1.Object) bool {
return m.Selector.Matches(klabels.Set(obj.GetLabels()))
}
adopt := func(obj kmetav1.Object) error {
return m.AdoptReplicationController(obj.(*kapi.ReplicationController))
}
release := func(obj kmetav1.Object) error {
return m.ReleaseReplicationController(obj.(*kapi.ReplicationController))
}

return m.ClaimObject(rc, match, adopt, release)
}

// ClaimReplicationControllers tries to take ownership of a list of ReplicationControllers.
//
// It will reconcile the following:
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attempted and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ReplicationControllers that you now own is
// returned.
func (m *RCControllerRefManager) ClaimReplicationControllers(rcs []*kapi.ReplicationController) ([]*kapi.ReplicationController, error) {
var claimed []*kapi.ReplicationController
var errlist []error

for _, rc := range rcs {
ok, err := m.ClaimReplicationController(rc)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, rc)
}
}
return claimed, kutilerrors.NewAggregate(errlist)
}

// AdoptReplicationController sends a patch to take control of the ReplicationController. It returns the error if
// the patching fails.
func (m *RCControllerRefManager) AdoptReplicationController(rs *kapi.ReplicationController) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt ReplicationController %s/%s (%s): %v", rs.Namespace, rs.Name, rs.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
`{"metadata":{
"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],
"uid":"%s",
"finalizers": ["%s"]
}
}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.Controller.GetName(), m.Controller.GetUID(), rs.UID,
kmetav1.FinalizerDeleteDependents)
return m.rcControl.PatchReplicationController(rs.Namespace, rs.Name, []byte(addControllerPatch))
}

// ReleaseReplicationController sends a patch to free the ReplicationController from the control of the Deployment controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *RCControllerRefManager) ReleaseReplicationController(rc *kapi.ReplicationController) error {
glog.V(4).Infof("patching ReplicationController %s/%s to remove its controllerRef to %s/%s:%s",
rc.Namespace, rc.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), rc.UID)
err := m.rcControl.PatchReplicationController(rc.Namespace, rc.Name, []byte(deleteOwnerRefPatch))
if err != nil {
if kerrors.IsNotFound(err) {
// If the ReplicationController no longer exists, ignore it.
return nil
}
if kerrors.IsInvalid(err) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge this into one if or make a switch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfojtik essentially this whole file is from upstream with a bit of sed s/RS/RC/g. Do we want to deviate here w.r.t. future diffs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine then

// Invalid error will be returned in two cases: 1. the ReplicationController
// has no owner reference, 2. the uid of the ReplicationController doesn't
// match, which means the ReplicationController is deleted and then recreated.
// In both cases, the error can be ignored.
return nil
}
}
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

kapierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -17,9 +18,11 @@ import (
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/client/retry"
kcontroller "k8s.io/kubernetes/pkg/controller"

osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
oscontroller "github.com/openshift/origin/pkg/controller"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
Expand Down Expand Up @@ -68,6 +71,8 @@ type DeploymentConfigController struct {
rcLister kcorelisters.ReplicationControllerLister
// rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config.
rcListerSynced func() bool
// rcControl is used for adopting/releasing replication controllers.
rcControl oscontroller.RCControlInterface

// codec is used to build deployments from configs.
codec runtime.Codec
Expand All @@ -84,11 +89,29 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
return c.updateStatus(config, []*kapi.ReplicationController{})
}

// Find all deployments owned by the deployment config.
// List all ReplicationControllers to find also those we own but that no longer match our selector.
// They will be orphaned by ClaimReplicationControllers().
rcList, err := c.rcLister.ReplicationControllers(config.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("error while deploymentConfigController listing replication controllers: %v", err)
}
selector := deployutil.ConfigSelector(config.Name)
existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector)
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing ReplicationControllers (see Kubernetes #42639).
canAdoptFunc := kcontroller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := c.dn.DeploymentConfigs(config.Namespace).Get(config.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != config.UID {
return nil, fmt.Errorf("original DeploymentConfig %v/%v is gone: got uid %v, wanted %v", config.Namespace, config.Name, fresh.UID, config.UID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%s/%s and uuid's are %s

also error matching the deployment config foo/bar: current uuid UUID does not match UUID... (if this is matching the upstream error, ignore this comment ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, upstream

}
return fresh, nil
})
cm := oscontroller.NewRCControllerRefManager(c.rcControl, config, selector, deployutil.ControllerKind, canAdoptFunc)
existingDeployments, err := cm.ClaimReplicationControllers(rcList)
if err != nil {
return err
return fmt.Errorf("error while deploymentConfigController claiming replication controllers: %v", err)
}

// In case the deployment config has been marked for deletion, merely update its status with
Expand Down Expand Up @@ -125,6 +148,15 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
if err != nil {
return err
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have already claimed those RCs above, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kargakis the one we get with the Get could be a different one. What if it no longer matches the selector and was/should be released or it could have been changed, deleted/+created and we don't own it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about having the RC diverge just a couple of Go code after the initial adoption and this is a cache lookup but maybe this doesn't hurt as a last line of defense? What happens if the cache copy is not yet updated with the owner reference you set above? Will the 2nd patch simply succeed because there is no diff with the server copy or is there an actual error returned?

Now I am thinking that we never supported adopting RCs for DCs, before owner refs, so I am wondering if we are going to break users that have fiddled with RC labels. Maybe warn in the release notes since we are departing from the old way of selecting RCs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am thinking that we never supported adopting RCs for DCs, before owner refs, so I am wondering if we are going to break users that have fiddled with RC labels. Maybe warn in the release notes since we are departing from the old way of selecting RCs.

Nevermind this one, rechecked and you merely add owner refs on top - should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kargakis

What happens if the cache copy is not yet updated with the owner reference you set above?

This is in RetryOnConflict block so in the worst case caches should propagate while doing the backoff, right? We could use an uncached client here.

What happens if the cache copy is not yet updated with the owner reference you set above?

I think that the second patch should result in noop (and succeed).

Not sure about having the RC diverge just a couple of Go code after the initial adoption and this is a cache lookup but maybe this doesn't hurt as a last line of defense?

Well not doing this and patching a RC we don't own due to race condition would break 2. or 3. controller rule

But that also gets me thinking that we can break it under some conditions. There is still race condition in the patches we use (taken from upstream). I think they should really use optimistic concurrency in form of resourceVersion to be completely correct. Hope that works with PATCH. I'll take the discussion upstream to figure it out because the do the same.

if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
if !isOurs {
return nil
}

copied, err := deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
Expand Down Expand Up @@ -157,7 +189,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
return c.updateStatus(config, existingDeployments)
}

return c.reconcileDeployments(existingDeployments, config)
return c.reconcileDeployments(existingDeployments, config, cm)
}
// If the config is paused we shouldn't create new deployments for it.
if config.Spec.Paused {
Expand All @@ -177,10 +209,26 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
}
created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment)
if err != nil {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
// We need to find out if our controller owns that deployment and report error if not
if kapierrors.IsAlreadyExists(err) {
return c.updateStatus(config, existingDeployments)
rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if err != nil {
return fmt.Errorf("error while deploymentConfigController getting the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller: %v", err)
}
if isOurs {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
return c.updateStatus(config, existingDeployments)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, I have changed it 2 times while writing it but somehow I wanted to explicitly state that this is for the case where it isn't ours :) and to be resilient to future changes

err = fmt.Errorf("replication controller %s already exists and deployment config is not allowed to claim it.", deployment.Name)
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %v", config.Status.LatestVersion, err)
return c.updateStatus(config, existingDeployments)
}
}
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err)
// We don't care about this error since we need to report the create failure.
Expand Down Expand Up @@ -208,7 +256,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
// successful deployment, not necessarily the latest in terms of the config
// version. The active deployment replica count should follow the config, and
// all other deployments should be scaled to zero.
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig) error {
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig, cm *oscontroller.RCControllerRefManager) error {
activeDeployment := deployutil.ActiveDeployment(existingDeployments)

// Reconcile deployments. The active deployment follows the config, and all
Expand Down Expand Up @@ -239,6 +287,18 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []
if err != nil {
return err
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as well :)

if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
if !isOurs {
return fmt.Errorf("deployment config %s/%s (%v) no longer owns replication controller %s/%s (%v)",
config.Namespace, config.Name, config.UID,
deployment.Namespace, deployment.Name, deployment.UID,
)
}

copied, err = deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/deploy/controller/deploymentconfig/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
kcontroller "k8s.io/kubernetes/pkg/controller"

osclient "github.com/openshift/origin/pkg/client"
oscontroller "github.com/openshift/origin/pkg/controller"
deployapi "github.com/openshift/origin/pkg/deploy/api"
)

Expand Down Expand Up @@ -51,6 +52,10 @@ func NewDeploymentConfigController(

rcLister: rcInformer.Lister(),
rcListerSynced: rcInformer.Informer().HasSynced,
rcControl: oscontroller.RealRCControl{
KubeClient: internalKubeClientset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would create NewRealRCControl() and pass the client and recorded and make them unexported unless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upstream pattern as per #14322 (comment)

Recorder: recorder,
},

recorder: recorder,
codec: codec,
Expand Down
Loading