Skip to content

Commit

Permalink
bump(github.com/coreos/go-etcd/etcd): 7745cfd7f8e619cc9e6be450238e625…
Browse files Browse the repository at this point in the history
…3a57a227f
  • Loading branch information
brendandburns committed Jun 17, 2014
1 parent fe0cb7c commit b1531bb
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 46 deletions.
8 changes: 4 additions & 4 deletions third_party/src/github.com/coreos/go-etcd/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Client struct {
// If CheckRetry is nil, client will call the default one
// `DefaultCheckRetry`.
// Argument cluster is the etcd.Cluster object that these requests have been made on.
// Argument reqs is all of the http.Requests that have been made so far.
// Argument resps is all of the http.Responses from these requests.
// Argument numReqs is the number of http.Requests that have been made so far.
// Argument lastResp is the http.Responses from the last request.
// Argument err is the reason of the failure.
CheckRetry func(cluster *Cluster, reqs []http.Request,
resps []http.Response, err error) error
CheckRetry func(cluster *Cluster, numReqs int,
lastResp http.Response, err error) error
}

// NewClient create a basic client that is configured to be used
Expand Down
29 changes: 15 additions & 14 deletions third_party/src/github.com/coreos/go-etcd/etcd/debug.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package etcd

import (
"fmt"
"io/ioutil"
"log"
"strings"
Expand All @@ -21,31 +22,31 @@ type etcdLogger struct {
}

func (p *etcdLogger) Debug(args ...interface{}) {
args[0] = "DEBUG: " + args[0].(string)
p.log.Println(args)
msg := "DEBUG: " + fmt.Sprint(args)
p.log.Println(msg)
}

func (p *etcdLogger) Debugf(fmt string, args ...interface{}) {
args[0] = "DEBUG: " + args[0].(string)
func (p *etcdLogger) Debugf(f string, args ...interface{}) {
msg := "DEBUG: " + fmt.Sprintf(f, args)
// Append newline if necessary
if !strings.HasSuffix(fmt, "\n") {
fmt = fmt + "\n"
if !strings.HasSuffix(msg, "\n") {
msg = msg + "\n"
}
p.log.Printf(fmt, args)
p.log.Print(msg)
}

func (p *etcdLogger) Warning(args ...interface{}) {
args[0] = "WARNING: " + args[0].(string)
p.log.Println(args)
msg := "WARNING: " + fmt.Sprint(args)
p.log.Println(msg)
}

func (p *etcdLogger) Warningf(fmt string, args ...interface{}) {
func (p *etcdLogger) Warningf(f string, args ...interface{}) {
msg := "WARNING: " + fmt.Sprintf(f, args)
// Append newline if necessary
if !strings.HasSuffix(fmt, "\n") {
fmt = fmt + "\n"
if !strings.HasSuffix(msg, "\n") {
msg = msg + "\n"
}
args[0] = "WARNING: " + args[0].(string)
p.log.Printf(fmt, args)
p.log.Print(msg)
}

func init() {
Expand Down
28 changes: 28 additions & 0 deletions third_party/src/github.com/coreos/go-etcd/etcd/debug_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package etcd

import (
"testing"
)

type Foo struct{}
type Bar struct {
one string
two int
}

// Tests that logs don't panic with arbitrary interfaces
func TestDebug(t *testing.T) {
f := &Foo{}
b := &Bar{"asfd", 3}
for _, test := range []interface{}{
1234,
"asdf",
f,
b,
} {
logger.Debug(test)
logger.Debugf("something, %s", test)
logger.Warning(test)
logger.Warningf("something, %s", test)
}
}
62 changes: 34 additions & 28 deletions third_party/src/github.com/coreos/go-etcd/etcd/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
var err error
var respBody []byte

reqs := make([]http.Request, 0)
resps := make([]http.Response, 0)
var numReqs = 1

checkRetry := c.CheckRetry
if checkRetry == nil {
Expand Down Expand Up @@ -176,15 +175,24 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
}()
}

// if we connect to a follower, we will retry until we find a leader
// If we connect to a follower and consistency is required, retry until
// we connect to a leader
sleep := 25 * time.Millisecond
maxSleep := time.Second
for attempt := 0; ; attempt++ {
select {
case <-cancelled:
return nil, ErrRequestCancelled
default:
if attempt > 0 {
select {
case <-cancelled:
return nil, ErrRequestCancelled
case <-time.After(sleep):
sleep = sleep * 2
if sleep > maxSleep {
sleep = maxSleep
}
}
}

logger.Debug("begin attempt", attempt, "for", rr.RelativePath)
logger.Debug("Connecting to etcd: attempt", attempt+1, "for", rr.RelativePath)

if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
// If it's a GET and consistency level is set to WEAK,
Expand Down Expand Up @@ -223,20 +231,26 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
reqLock.Unlock()

resp, err = c.httpClient.Do(req)
defer func() {
if resp != nil {
resp.Body.Close()
}
}()

// If the request was cancelled, return ErrRequestCancelled directly
select {
case <-cancelled:
return nil, ErrRequestCancelled
default:
}

reqs = append(reqs, *req)
numReqs++

// network error, change a machine!
if err != nil {
logger.Debug("network error:", err.Error())
resps = append(resps, http.Response{})
if checkErr := checkRetry(c.cluster, reqs, resps, err); checkErr != nil {
lastResp := http.Response{}
if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil {
return nil, checkErr
}

Expand All @@ -245,8 +259,6 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
}

// if there is no error, it should receive response
resps = append(resps, *resp)
defer resp.Body.Close()
logger.Debug("recv.response.from", httpPath)

if validHttpStatusCode[resp.StatusCode] {
Expand All @@ -270,13 +282,15 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
c.cluster.updateLeaderFromURL(u)
logger.Debug("recv.response.relocate", u.String())
}
resp.Body.Close()
continue
}

if checkErr := checkRetry(c.cluster, reqs, resps,
if checkErr := checkRetry(c.cluster, numReqs, *resp,
errors.New("Unexpected HTTP status code")); checkErr != nil {
return nil, checkErr
}
resp.Body.Close()
}

r := &RawResponse{
Expand All @@ -288,26 +302,18 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
return r, nil
}

// DefaultCheckRetry checks retry cases
// If it has retried 2 * machine number, stop to retry it anymore
// If resp is nil, sleep for 200ms
// DefaultCheckRetry defines the retrying behaviour for bad HTTP requests
// If we have retried 2 * machine number, stop retrying.
// If status code is InternalServerError, sleep for 200ms.
func DefaultCheckRetry(cluster *Cluster, reqs []http.Request,
resps []http.Response, err error) error {
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
err error) error {

if len(reqs) >= 2*len(cluster.Machines) {
if numReqs >= 2*len(cluster.Machines) {
return newError(ErrCodeEtcdNotReachable,
"Tried to connect to each peer twice and failed", 0)
}

resp := &resps[len(resps)-1]

if resp == nil {
time.Sleep(time.Millisecond * 200)
return nil
}

code := resp.StatusCode
code := lastResp.StatusCode
if code == http.StatusInternalServerError {
time.Sleep(time.Millisecond * 200)

Expand Down

0 comments on commit b1531bb

Please sign in to comment.