-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set controllerRef in RCs owned by DC #14322
Changes from all commits
48e69a6
3ace21a
eeaf7bc
13326de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
package controller | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/golang/glog" | ||
kerrors "k8s.io/apimachinery/pkg/api/errors" | ||
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
klabels "k8s.io/apimachinery/pkg/labels" | ||
kschema "k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/types" | ||
kutilerrors "k8s.io/apimachinery/pkg/util/errors" | ||
"k8s.io/client-go/tools/record" | ||
kapi "k8s.io/kubernetes/pkg/api" | ||
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||
kcontroller "k8s.io/kubernetes/pkg/controller" | ||
) | ||
|
||
// RSControlInterface is an interface that knows how to add or delete | ||
// ReplicationControllers, as well as increment or decrement them. It is used | ||
// by the DeploymentConfig controller to ease testing of actions that it takes. | ||
type RCControlInterface interface { | ||
PatchReplicationController(namespace, name string, data []byte) error | ||
} | ||
|
||
// RealRCControl is the default implementation of RCControlInterface. | ||
type RealRCControl struct { | ||
KubeClient kclientset.Interface | ||
Recorder record.EventRecorder | ||
} | ||
|
||
// To make sure RealRCControl implements RCControlInterface | ||
var _ RCControlInterface = &RealRCControl{} | ||
|
||
// PatchReplicationController executes a strategic merge patch contained in 'data' on RC specified by 'namespace' and 'name' | ||
func (r RealRCControl) PatchReplicationController(namespace, name string, data []byte) error { | ||
_, err := r.KubeClient.Core().ReplicationControllers(namespace).Patch(name, types.StrategicMergePatchType, data) | ||
return err | ||
} | ||
|
||
type RCControllerRefManager struct { | ||
kcontroller.BaseControllerRefManager | ||
controllerKind kschema.GroupVersionKind | ||
rcControl RCControlInterface | ||
} | ||
|
||
// NewRCControllerRefManager returns a RCControllerRefManager that exposes | ||
// methods to manage the controllerRef of ReplicationControllers. | ||
// | ||
// The CanAdopt() function can be used to perform a potentially expensive check | ||
// (such as a live GET from the API server) prior to the first adoption. | ||
// It will only be called (at most once) if an adoption is actually attempted. | ||
// If CanAdopt() returns a non-nil error, all adoptions will fail. | ||
// | ||
// NOTE: Once CanAdopt() is called, it will not be called again by the same | ||
// RCControllerRefManager instance. Create a new instance if it | ||
// makes sense to check CanAdopt() again (e.g. in a different sync pass). | ||
func NewRCControllerRefManager( | ||
rcControl RCControlInterface, | ||
controller kmetav1.Object, | ||
selector klabels.Selector, | ||
controllerKind kschema.GroupVersionKind, | ||
canAdopt func() error, | ||
) *RCControllerRefManager { | ||
return &RCControllerRefManager{ | ||
BaseControllerRefManager: kcontroller.BaseControllerRefManager{ | ||
Controller: controller, | ||
Selector: selector, | ||
CanAdoptFunc: canAdopt, | ||
}, | ||
controllerKind: controllerKind, | ||
rcControl: rcControl, | ||
} | ||
} | ||
|
||
// ClaimReplicationController tries to take ownership of a ReplicationController. | ||
// | ||
// It will reconcile the following: | ||
// * Adopt the ReplicationController if it's an orphan. | ||
// * Release owned ReplicationController if the selector no longer matches. | ||
// | ||
// A non-nil error is returned if some form of reconciliation was attempted and | ||
// failed. Usually, controllers should try again later in case reconciliation | ||
// is still needed. | ||
// | ||
// If the error is nil, either the reconciliation succeeded, or no | ||
// reconciliation was necessary. The returned boolean indicates whether you now | ||
// own the object. | ||
func (m *RCControllerRefManager) ClaimReplicationController(rc *kapi.ReplicationController) (bool, error) { | ||
match := func(obj kmetav1.Object) bool { | ||
return m.Selector.Matches(klabels.Set(obj.GetLabels())) | ||
} | ||
adopt := func(obj kmetav1.Object) error { | ||
return m.AdoptReplicationController(obj.(*kapi.ReplicationController)) | ||
} | ||
release := func(obj kmetav1.Object) error { | ||
return m.ReleaseReplicationController(obj.(*kapi.ReplicationController)) | ||
} | ||
|
||
return m.ClaimObject(rc, match, adopt, release) | ||
} | ||
|
||
// ClaimReplicationControllers tries to take ownership of a list of ReplicationControllers. | ||
// | ||
// It will reconcile the following: | ||
// * Adopt orphans if the selector matches. | ||
// * Release owned objects if the selector no longer matches. | ||
// | ||
// A non-nil error is returned if some form of reconciliation was attempted and | ||
// failed. Usually, controllers should try again later in case reconciliation | ||
// is still needed. | ||
// | ||
// If the error is nil, either the reconciliation succeeded, or no | ||
// reconciliation was necessary. The list of ReplicationControllers that you now own is | ||
// returned. | ||
func (m *RCControllerRefManager) ClaimReplicationControllers(rcs []*kapi.ReplicationController) ([]*kapi.ReplicationController, error) { | ||
var claimed []*kapi.ReplicationController | ||
var errlist []error | ||
|
||
for _, rc := range rcs { | ||
ok, err := m.ClaimReplicationController(rc) | ||
if err != nil { | ||
errlist = append(errlist, err) | ||
continue | ||
} | ||
if ok { | ||
claimed = append(claimed, rc) | ||
} | ||
} | ||
return claimed, kutilerrors.NewAggregate(errlist) | ||
} | ||
|
||
// AdoptReplicationController sends a patch to take control of the ReplicationController. It returns the error if | ||
// the patching fails. | ||
func (m *RCControllerRefManager) AdoptReplicationController(rs *kapi.ReplicationController) error { | ||
if err := m.CanAdopt(); err != nil { | ||
return fmt.Errorf("can't adopt ReplicationController %s/%s (%s): %v", rs.Namespace, rs.Name, rs.UID, err) | ||
} | ||
// Note that ValidateOwnerReferences() will reject this patch if another | ||
// OwnerReference exists with controller=true. | ||
addControllerPatch := fmt.Sprintf( | ||
`{"metadata":{ | ||
"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}], | ||
"uid":"%s", | ||
"finalizers": ["%s"] | ||
} | ||
}`, | ||
m.controllerKind.GroupVersion(), m.controllerKind.Kind, | ||
m.Controller.GetName(), m.Controller.GetUID(), rs.UID, | ||
kmetav1.FinalizerDeleteDependents) | ||
return m.rcControl.PatchReplicationController(rs.Namespace, rs.Name, []byte(addControllerPatch)) | ||
} | ||
|
||
// ReleaseReplicationController sends a patch to free the ReplicationController from the control of the Deployment controller. | ||
// It returns the error if the patching fails. 404 and 422 errors are ignored. | ||
func (m *RCControllerRefManager) ReleaseReplicationController(rc *kapi.ReplicationController) error { | ||
glog.V(4).Infof("patching ReplicationController %s/%s to remove its controllerRef to %s/%s:%s", | ||
rc.Namespace, rc.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) | ||
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), rc.UID) | ||
err := m.rcControl.PatchReplicationController(rc.Namespace, rc.Name, []byte(deleteOwnerRefPatch)) | ||
if err != nil { | ||
if kerrors.IsNotFound(err) { | ||
// If the ReplicationController no longer exists, ignore it. | ||
return nil | ||
} | ||
if kerrors.IsInvalid(err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. merge this into one if or make a switch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mfojtik essentially this whole file is from upstream with a bit of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is fine then |
||
// Invalid error will be returned in two cases: 1. the ReplicationController | ||
// has no owner reference, 2. the uid of the ReplicationController doesn't | ||
// match, which means the ReplicationController is deleted and then recreated. | ||
// In both cases, the error can be ignored. | ||
return nil | ||
} | ||
} | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import ( | |
|
||
kapierrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
kutilerrors "k8s.io/apimachinery/pkg/util/errors" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
|
@@ -17,9 +18,11 @@ import ( | |
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" | ||
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" | ||
"k8s.io/kubernetes/pkg/client/retry" | ||
kcontroller "k8s.io/kubernetes/pkg/controller" | ||
|
||
osclient "github.com/openshift/origin/pkg/client" | ||
oscache "github.com/openshift/origin/pkg/client/cache" | ||
oscontroller "github.com/openshift/origin/pkg/controller" | ||
deployapi "github.com/openshift/origin/pkg/deploy/api" | ||
deployutil "github.com/openshift/origin/pkg/deploy/util" | ||
) | ||
|
@@ -68,6 +71,8 @@ type DeploymentConfigController struct { | |
rcLister kcorelisters.ReplicationControllerLister | ||
// rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config. | ||
rcListerSynced func() bool | ||
// rcControl is used for adopting/releasing replication controllers. | ||
rcControl oscontroller.RCControlInterface | ||
|
||
// codec is used to build deployments from configs. | ||
codec runtime.Codec | ||
|
@@ -84,11 +89,29 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) | |
return c.updateStatus(config, []*kapi.ReplicationController{}) | ||
} | ||
|
||
// Find all deployments owned by the deployment config. | ||
// List all ReplicationControllers to find also those we own but that no longer match our selector. | ||
// They will be orphaned by ClaimReplicationControllers(). | ||
rcList, err := c.rcLister.ReplicationControllers(config.Namespace).List(labels.Everything()) | ||
if err != nil { | ||
return fmt.Errorf("error while deploymentConfigController listing replication controllers: %v", err) | ||
} | ||
selector := deployutil.ConfigSelector(config.Name) | ||
existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector) | ||
// If any adoptions are attempted, we should first recheck for deletion with | ||
// an uncached quorum read sometime after listing ReplicationControllers (see Kubernetes #42639). | ||
canAdoptFunc := kcontroller.RecheckDeletionTimestamp(func() (metav1.Object, error) { | ||
fresh, err := c.dn.DeploymentConfigs(config.Namespace).Get(config.Name, metav1.GetOptions{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if fresh.UID != config.UID { | ||
return nil, fmt.Errorf("original DeploymentConfig %v/%v is gone: got uid %v, wanted %v", config.Namespace, config.Name, fresh.UID, config.UID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. %s/%s and uuid's are %s also There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, upstream |
||
} | ||
return fresh, nil | ||
}) | ||
cm := oscontroller.NewRCControllerRefManager(c.rcControl, config, selector, deployutil.ControllerKind, canAdoptFunc) | ||
existingDeployments, err := cm.ClaimReplicationControllers(rcList) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("error while deploymentConfigController claiming replication controllers: %v", err) | ||
} | ||
|
||
// In case the deployment config has been marked for deletion, merely update its status with | ||
|
@@ -125,6 +148,15 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) | |
if err != nil { | ||
return err | ||
} | ||
// We need to make sure we own that RC or adopt it if possible | ||
isOurs, err := cm.ClaimReplicationController(rc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have already claimed those RCs above, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Kargakis the one we get with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about having the RC diverge just a couple of Go code after the initial adoption and this is a cache lookup but maybe this doesn't hurt as a last line of defense? What happens if the cache copy is not yet updated with the owner reference you set above? Will the 2nd patch simply succeed because there is no diff with the server copy or is there an actual error returned? Now I am thinking that we never supported adopting RCs for DCs, before owner refs, so I am wondering if we are going to break users that have fiddled with RC labels. Maybe warn in the release notes since we are departing from the old way of selecting RCs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Nevermind this one, rechecked and you merely add owner refs on top - should be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is in
I think that the second patch should result in noop (and succeed).
Well not doing this and patching a RC we don't own due to race condition would break 2. or 3. controller rule But that also gets me thinking that we can break it under some conditions. There is still race condition in the patches we use (taken from upstream). I think they should really use optimistic concurrency in form of resourceVersion to be completely correct. Hope that works with |
||
if err != nil { | ||
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err) | ||
} | ||
if !isOurs { | ||
return nil | ||
} | ||
|
||
copied, err := deployutil.DeploymentDeepCopy(rc) | ||
if err != nil { | ||
return err | ||
|
@@ -157,7 +189,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) | |
return c.updateStatus(config, existingDeployments) | ||
} | ||
|
||
return c.reconcileDeployments(existingDeployments, config) | ||
return c.reconcileDeployments(existingDeployments, config, cm) | ||
} | ||
// If the config is paused we shouldn't create new deployments for it. | ||
if config.Spec.Paused { | ||
|
@@ -177,10 +209,26 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) | |
} | ||
created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment) | ||
if err != nil { | ||
// If the deployment was already created, just move on. The cache could be | ||
// stale, or another process could have already handled this update. | ||
// We need to find out if our controller owns that deployment and report error if not | ||
if kapierrors.IsAlreadyExists(err) { | ||
return c.updateStatus(config, existingDeployments) | ||
rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name) | ||
if err != nil { | ||
return fmt.Errorf("error while deploymentConfigController getting the replication controller %s/%s: %v", rc.Namespace, rc.Name, err) | ||
} | ||
// We need to make sure we own that RC or adopt it if possible | ||
isOurs, err := cm.ClaimReplicationController(rc) | ||
if err != nil { | ||
return fmt.Errorf("error while deploymentConfigController claiming the replication controller: %v", err) | ||
} | ||
if isOurs { | ||
// If the deployment was already created, just move on. The cache could be | ||
// stale, or another process could have already handled this update. | ||
return c.updateStatus(config, existingDeployments) | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for else There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know, I have changed it 2 times while writing it but somehow I wanted to explicitly state that this is for the case where it isn't ours :) and to be resilient to future changes |
||
err = fmt.Errorf("replication controller %s already exists and deployment config is not allowed to claim it.", deployment.Name) | ||
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %v", config.Status.LatestVersion, err) | ||
return c.updateStatus(config, existingDeployments) | ||
} | ||
} | ||
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err) | ||
// We don't care about this error since we need to report the create failure. | ||
|
@@ -208,7 +256,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) | |
// successful deployment, not necessarily the latest in terms of the config | ||
// version. The active deployment replica count should follow the config, and | ||
// all other deployments should be scaled to zero. | ||
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig) error { | ||
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig, cm *oscontroller.RCControllerRefManager) error { | ||
activeDeployment := deployutil.ActiveDeployment(existingDeployments) | ||
|
||
// Reconcile deployments. The active deployment follows the config, and all | ||
|
@@ -239,6 +287,18 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments [] | |
if err != nil { | ||
return err | ||
} | ||
// We need to make sure we own that RC or adopt it if possible | ||
isOurs, err := cm.ClaimReplicationController(rc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as well :) |
||
if err != nil { | ||
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err) | ||
} | ||
if !isOurs { | ||
return fmt.Errorf("deployment config %s/%s (%v) no longer owns replication controller %s/%s (%v)", | ||
config.Namespace, config.Name, config.UID, | ||
deployment.Namespace, deployment.Name, deployment.UID, | ||
) | ||
} | ||
|
||
copied, err = deployutil.DeploymentDeepCopy(rc) | ||
if err != nil { | ||
return err | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
kcontroller "k8s.io/kubernetes/pkg/controller" | ||
|
||
osclient "github.com/openshift/origin/pkg/client" | ||
oscontroller "github.com/openshift/origin/pkg/controller" | ||
deployapi "github.com/openshift/origin/pkg/deploy/api" | ||
) | ||
|
||
|
@@ -51,6 +52,10 @@ func NewDeploymentConfigController( | |
|
||
rcLister: rcInformer.Lister(), | ||
rcListerSynced: rcInformer.Informer().HasSynced, | ||
rcControl: oscontroller.RealRCControl{ | ||
KubeClient: internalKubeClientset, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i would create There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. upstream pattern as per #14322 (comment) |
||
Recorder: recorder, | ||
}, | ||
|
||
recorder: recorder, | ||
codec: codec, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do; otherwise you couldn't initialize
RealRCControl
infactory.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tnozicka we can create initializer func()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could, although I don't think that it would bring any value here. Also I would rather leave the upstream code mostly unchanged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I think for many of our controllers we have initializers, that is why I asked for one :-) But if this is copying the upstream pattern i'm ok with it.