Skip to content

Commit

Permalink
Merge pull request #26557 from AdoHe/patch_retry
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

kubectl apply retry stale resource version

```release-note
kubectl apply: retry applying a patch if a version conflict error is encountered
```

[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/.github/PULL_REQUEST_TEMPLATE.md?pixel)]()

fixes #15493 
@pwittrock I just got my original implementation back, ptal.
  • Loading branch information
k8s-merge-robot authored Jun 13, 2016
2 parents 7180b35 + 69bcdc2 commit 0d02f8c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 35 deletions.
6 changes: 3 additions & 3 deletions pkg/kubectl/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type debugError interface {

// GetOriginalConfiguration retrieves the original configuration of the object
// from the annotation, or nil if no annotation was found.
func GetOriginalConfiguration(info *resource.Info) ([]byte, error) {
annots, err := info.Mapping.MetadataAccessor.Annotations(info.Object)
func GetOriginalConfiguration(mapping *meta.RESTMapping, obj runtime.Object) ([]byte, error) {
annots, err := mapping.MetadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func GetModifiedConfiguration(info *resource.Info, annotate bool, codec runtime.
// UpdateApplyAnnotation calls CreateApplyAnnotation if the last applied
// configuration annotation is already present. Otherwise, it does nothing.
func UpdateApplyAnnotation(info *resource.Info, codec runtime.Encoder) error {
if original, err := GetOriginalConfiguration(info); err != nil || len(original) <= 0 {
if original, err := GetOriginalConfiguration(info.Mapping, info.Object); err != nil || len(original) <= 0 {
return err
}
return CreateApplyAnnotation(info, codec)
Expand Down
120 changes: 88 additions & 32 deletions pkg/kubectl/cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package cmd
import (
"fmt"
"io"
"time"

"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
Expand All @@ -38,6 +41,15 @@ type ApplyOptions struct {
Recursive bool
}

const (
// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
maxPatchRetry = 5
// backOffPeriod is the period to back off when apply patch resutls in error.
backOffPeriod = 1 * time.Second
// how many times we can retry before back off
triesBeforeBackOff = 1
)

const (
apply_long = `Apply a configuration to a resource by filename or stdin.
The resource will be created if it doesn't exist yet.
Expand Down Expand Up @@ -154,43 +166,16 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap
return nil
}

// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(encoder, info.Object)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", info), info.Source, err)
}

// Retrieve the original configuration of the object from the annotation.
original, err := kubectl.GetOriginalConfiguration(info)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", info), info.Source, err)
}

// Create the versioned struct from the original from the server for
// strategic patch.
// TODO: Move all structs in apply to use raw data. Can be done once
// builder has a RawResult method which delivers raw data instead of
// internal objects.
versionedObject, _, err := decoder.Decode(current, nil, nil)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", info), info.Source, err)
}

// Compute a three way strategic merge patch to send to server.
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
if err != nil {
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfrom:\n%v\nfor:"
return cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current, info), info.Source, err)
}

helper := resource.NewHelper(info.Client, info.Mapping)
_, err = helper.Patch(info.Namespace, info.Name, api.StrategicMergePatchType, patch)
patcher := NewPatcher(encoder, decoder, info.Mapping, helper)

patchBytes, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patch, info), info.Source, err)
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
}

if cmdutil.ShouldRecord(cmd, info) {
patch, err = cmdutil.ChangeResourcePatch(info, f.Command())
patch, err := cmdutil.ChangeResourcePatch(info, f.Command())
if err != nil {
return err
}
Expand All @@ -215,3 +200,74 @@ func RunApply(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, options *Ap

return nil
}

type patcher struct {
encoder runtime.Encoder
decoder runtime.Decoder

mapping *meta.RESTMapping
helper *resource.Helper

backOff clockwork.Clock
}

func NewPatcher(encoder runtime.Encoder, decoder runtime.Decoder, mapping *meta.RESTMapping, helper *resource.Helper) *patcher {
return &patcher{
encoder: encoder,
decoder: decoder,
mapping: mapping,
helper: helper,
backOff: clockwork.NewRealClock(),
}
}

func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(p.encoder, obj)
if err != nil {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
}

// Retrieve the original configuration of the object from the annotation.
original, err := kubectl.GetOriginalConfiguration(p.mapping, obj)
if err != nil {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
}

// Create the versioned struct from the original from the server for
// strategic patch.
// TODO: Move all structs in apply to use raw data. Can be done once
// builder has a RawResult method which delivers raw data instead of
// internal objects.
versionedObject, _, err := p.decoder.Decode(current, nil, nil)
if err != nil {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("converting encoded server-side object back to versioned struct:\n%v\nfor:", obj), source, err)
}

// Compute a three way strategic merge patch to send to server.
patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, versionedObject, true)
if err != nil {
format := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
return nil, cmdutil.AddSourceToErr(fmt.Sprintf(format, original, modified, current), source, err)
}

_, err = p.helper.Patch(namespace, name, api.StrategicMergePatchType, patch)
return patch, err
}

func (p *patcher) patch(current runtime.Object, modified []byte, source, namespace, name string) ([]byte, error) {
var getErr error
patchBytes, err := p.patchSimple(current, modified, source, namespace, name)
for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
if i > triesBeforeBackOff {
p.backOff.Sleep(backOffPeriod)
}
current, getErr = p.helper.Get(namespace, name, false)
if getErr != nil {
return nil, getErr
}
patchBytes, err = p.patchSimple(current, modified, source, namespace, name)
}

return patchBytes, err
}
58 changes: 58 additions & 0 deletions pkg/kubectl/cmd/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
Expand All @@ -29,7 +30,9 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/annotations"
kubeerr "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -213,6 +216,61 @@ func TestApplyObject(t *testing.T) {
}
}

func TestApplyRetry(t *testing.T) {
initTestErrorHandler(t)
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
pathRC := "/namespaces/test/replicationcontrollers/" + nameRC

firstPatch := true
retry := false
getCount := 0
f, tf, codec := NewAPIFactory()
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
Codec: codec,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == pathRC && m == "GET":
getCount++
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
case p == pathRC && m == "PATCH":
if firstPatch {
firstPatch = false
statusErr := kubeerr.NewConflict(unversioned.GroupResource{Group: "", Resource: "rc"}, "test-rc", fmt.Errorf("the object has been modified. Please apply at first."))
bodyBytes, _ := json.Marshal(statusErr)
bodyErr := ioutil.NopCloser(bytes.NewReader(bodyBytes))
return &http.Response{StatusCode: http.StatusConflict, Header: defaultHeader(), Body: bodyErr}, nil
}
retry = true
validatePatchApplication(t, req)
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
}
}),
}
tf.Namespace = "test"
buf := bytes.NewBuffer([]byte{})

cmd := NewCmdApply(f, buf)
cmd.Flags().Set("filename", filenameRC)
cmd.Flags().Set("output", "name")
cmd.Run(cmd, []string{})

if !retry || getCount != 2 {
t.Fatalf("apply didn't retry when get conflict error")
}

// uses the name from the file, not the response
expectRC := "replicationcontroller/" + nameRC + "\n"
if buf.String() != expectRC {
t.Fatalf("unexpected output: %s\nexpected: %s", buf.String(), expectRC)
}
}

func TestApplyNonExistObject(t *testing.T) {
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
pathRC := "/namespaces/test/replicationcontrollers"
Expand Down

0 comments on commit 0d02f8c

Please sign in to comment.