Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubectl apply retry stale resource version #26557

Merged
merged 1 commit into from
Jun 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kubectl/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type debugError interface {

// GetOriginalConfiguration retrieves the original configuration of the object
// from the annotation, or nil if no annotation was found.
func GetOriginalConfiguration(info *resource.Info) ([]byte, error) {
annots, err := info.Mapping.MetadataAccessor.Annotations(info.Object)
func GetOriginalConfiguration(mapping *meta.RESTMapping, obj runtime.Object) ([]byte, error) {
annots, err := mapping.MetadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func GetModifiedConfiguration(info *resource.Info, annotate bool, codec runtime.
// UpdateApplyAnnotation calls CreateApplyAnnotation if the last applied
// configuration annotation is already present. Otherwise, it does nothing.
func UpdateApplyAnnotation(info *resource.Info, codec runtime.Encoder) error {
if original, err := GetOriginalConfiguration(info); err != nil || len(original) <= 0 {
if original, err := GetOriginalConfiguration(info.Mapping, info.Object); err != nil || len(original) <= 0 {
return err
}
return CreateApplyAnnotation(info, codec)
Expand Down
120 changes: 88 additions & 32 deletions pkg/kubectl/cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package cmd
import (
"fmt"
"io"
"time"

"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
Expand All @@ -38,6 +41,15 @@ type ApplyOptions struct {
Recursive bool
}

const (
// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
maxPatchRetry = 5
// backOffPeriod is the period to back off when apply patch resutls in error.
backOffPeriod = 1 * time.Second
// how many times we can retry before back off
triesBeforeBackOff = 1
)

const (
apply_long = `Apply a configuration to a resource by filename or stdin.
The resource will be created if it doesn't exist yet.
Expand Down Expand Up @@ -154,43 +166,16 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap
return nil
}

// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(encoder, info.Object)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", info), info.Source, err)
}

// Retrieve the original configuration of the object from the annotation.
original, err := kubectl.GetOriginalConfiguration(info)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", info), info.Source, err)
}

// Create the versioned struct from the original from the server for
// strategic patch.
// TODO: Move all structs in apply to use raw data. Can be done once
// builder has a RawResult method which delivers raw data instead of
// internal objects.
versionedObject, _, err := decoder.Decode(current, nil, nil)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", info), info.Source, err)
}

// Compute a three way strategic merge patch to send to server.
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
if err != nil {
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfrom:\n%v\nfor:"
return cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current, info), info.Source, err)
}

helper := resource.NewHelper(info.Client, info.Mapping)
_, err = helper.Patch(info.Namespace, info.Name, api.StrategicMergePatchType, patch)
patcher := NewPatcher(encoder, decoder, info.Mapping, helper)

patchBytes, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patch, info), info.Source, err)
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
}

if cmdutil.ShouldRecord(cmd, info) {
patch, err = cmdutil.ChangeResourcePatch(info, f.Command())
patch, err := cmdutil.ChangeResourcePatch(info, f.Command())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own knowledge, why is updating the change cause done in a separate call instead of the same call as patching the object itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's because of different change source, I am not sure. Maybe @janetkuo has more opinions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should at least add a TODO here to figure out if it should also be retried. Since it isn't updated atomically with the changes to the original object, there may be some complications figuring out what to do if there is a conflict at this point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just talked with Janet. Plz add a TODO to do the annotation as part of the original change.

if err != nil {
return err
}
Expand All @@ -215,3 +200,74 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap

return nil
}

type patcher struct {
encoder runtime.Encoder
decoder runtime.Decoder

mapping *meta.RESTMapping
helper *resource.Helper

backOff clockwork.Clock
}

func NewPatcher(encoder runtime.Encoder, decoder runtime.Decoder, mapping *meta.RESTMapping, helper *resource.Helper) *patcher {
return &patcher{
encoder: encoder,
decoder: decoder,
mapping: mapping,
helper: helper,
backOff: clockwork.NewRealClock(),
}
}

func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(p.encoder, obj)
if err != nil {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
}

// Retrieve the original configuration of the object from the annotation.
original, err := kubectl.GetOriginalConfiguration(p.mapping, obj)
if err != nil {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
}

// Create the versioned struct from the original from the server for
// strategic patch.
// TODO: Move all structs in apply to use raw data. Can be done once
// builder has a RawResult method which delivers raw data instead of
// internal objects.
versionedObject, _, err := p.decoder.Decode(current, nil, nil)
if err != nil {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", obj), source, err)
}

// Compute a three way strategic merge patch to send to server.
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
if err != nil {
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
return nil, cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current), source, err)
}

_, err = p.helper.Patch(namespace, name, api.StrategicMergePatchType, patch)
return patch, err
}

func (p *patcher) patch(current runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
var getErr error
patchBytes, err := p.patchSimple(current, modified, source, namespace, name)
for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
if i > triesBeforeBackOff {
p.backOff.Sleep(backOffPeriod)
}
current, getErr = p.helper.Get(namespace, name, false)
if getErr != nil {
return nil, getErr
}
patchBytes, err = p.patchSimple(current, modified, source, namespace, name)
}

return patchBytes, err
}
58 changes: 58 additions & 0 deletions pkg/kubectl/cmd/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
Expand All @@ -29,7 +30,9 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/annotations"
kubeerr "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -213,6 +216,61 @@ func TestApplyObject(t *testing.T) {
}
}

func TestApplyRetry(t *testing.T) {
initTestErrorHandler(t)
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
pathRC := "/namespaces/test/replicationcontrollers/" + nameRC

firstPatch := true
retry := false
getCount := 0
f, tf, codec := NewAPIFactory()
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
Codec: codec,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == pathRC && m == "GET":
getCount++
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
case p == pathRC && m == "PATCH":
if firstPatch {
firstPatch = false
statusErr := kubeerr.NewConflict(unversioned.GroupResource{Group: "", Resource: "rc"}, "test-rc", fmt.Errorf("the object has been modified. Please apply at first."))
bodyBytes, _ := json.Marshal(statusErr)
bodyErr := ioutil.NopCloser(bytes.NewReader(bodyBytes))
return &http.Response{StatusCode: http.StatusConflict, Header: defaultHeader(), Body: bodyErr}, nil
}
retry = true
validatePatchApplication(t, req)
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
}
}),
}
tf.Namespace = "test"
buf := bytes.NewBuffer([]byte{})

cmd := NewCmdApply(f, buf)
cmd.Flags().Set("filename", filenameRC)
cmd.Flags().Set("output", "name")
cmd.Run(cmd, []string{})

if !retry || getCount != 2 {
t.Fatalf("apply didn't retry when get conflict error")
}

// uses the name from the file, not the response
expectRC := "replicationcontroller/" + nameRC + "\n"
if buf.String() != expectRC {
t.Fatalf("unexpected output: %s\nexpected: %s", buf.String(), expectRC)
}
}

func TestApplyNonExistObject(t *testing.T) {
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
pathRC := "/namespaces/test/replicationcontrollers"
Expand Down