Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Druid error handling #49

Merged
merged 2 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion druid.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package druid

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strings"
"time"

Expand All @@ -28,7 +32,19 @@ const (

var (
defaultBackoff = retryablehttp.DefaultBackoff
defaultRetry = retryablehttp.DefaultRetryPolicy
// A regular expression to match the error returned by net/http when the
// configured number of redirects is exhausted. This error isn't typed
// specifically so we resort to matching on the error string.
redirectsErrorRe = regexp.MustCompile(`stopped after \d+ redirects\z`)

// A regular expression to match the error returned by net/http when the
// scheme specified in the URL is invalid. This error isn't typed
// specifically so we resort to matching on the error string.
schemeErrorRe = regexp.MustCompile(`unsupported protocol scheme`)

// We need to consume response bodies to maintain http connections, but
// limit the size we consume to respReadLimit.
respReadLimit = int64(4096)
)

type Client struct {
Expand All @@ -44,6 +60,7 @@ type clientOptions struct {
username string
password string
backoff retryablehttp.Backoff
errorHandler retryablehttp.ErrorHandler
retry retryablehttp.CheckRetry
retryWaitMin time.Duration
retryWaitMax time.Duration
Expand All @@ -52,10 +69,18 @@ type clientOptions struct {

type ClientOption func(*clientOptions)

type druidErrorReponse struct {
Error string
ErrorMessage string
ErrorClass string
Host string
}

func NewClient(baseURL string, options ...ClientOption) (*Client, error) {
opts := &clientOptions{
httpClient: defaultHTTPClient(),
backoff: defaultBackoff,
errorHandler: defaultErrorHandler,
retry: defaultRetry,
retryWaitMin: defaultRetryWaitMin,
retryWaitMax: defaultRetryWaitMax,
Expand Down Expand Up @@ -159,6 +184,81 @@ func (c *Client) ExecuteRequest(method, path string, opt, result interface{}) (*
return c.Do(req, result)
}

func defaultRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
if ctx.Err() != nil {
return false, ctx.Err()
}

// As explained here https://golang.org/pkg/net/http/#Client.Do,
// An error is returned if caused by client policy (such as CheckRedirect), or failure to speak HTTP (such as a network connectivity problem). A non-2xx status code doesn't cause an error.
if err != nil {
if v, ok := err.(*url.Error); ok {
// Don't retry if the error was due to too many redirects.
if redirectsErrorRe.MatchString(v.Error()) {
return false, v
}

// Don't retry if the error was due to an invalid protocol scheme.
if schemeErrorRe.MatchString(v.Error()) {
return false, v
}

// Don't retry if the error was due to TLS cert verification failure.
if _, ok := v.Err.(x509.UnknownAuthorityError); ok {
return false, v
}
}

return true, nil
}

if resp.StatusCode == http.StatusOK {
return false, nil
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, fmt.Errorf("Failed to read the response from Druid: %w", err)
}
var errResp druidErrorReponse
err = json.Unmarshal(body, &errResp)
if err != nil {
return true, fmt.Errorf("Failed to read the response from Druid: %w", err)
}

// https://druid.apache.org/docs/latest/querying/querying.html#query-execution-failures
switch errResp.Error {
case "SQL parse failed":
goto ABORT
case "Plan validation failed":
goto ABORT
case "Unsupported operation":
goto ABORT
case "Query cancelled":
goto ABORT
case "Unknown exception":
goto ABORT
default:
return true, fmt.Errorf("Error response from Druid: %w", err)
}

ABORT:
// When aborting the retry, the response body should be closed:
// https://pkg.go.dev/github.com/hashicorp/go-retryablehttp#CheckRetry
resp.Body.Close()
err = fmt.Errorf("Failed to query Druid: %+v", errResp)
return false, err
}

func defaultErrorHandler(resp *http.Response, err error, numTries int) (*http.Response, error) {
// Drain and close the response body so the connection can be reused:
// https://pkg.go.dev/github.com/hashicorp/go-retryablehttp#ErrorHandler
defer resp.Body.Close()
io.Copy(ioutil.Discard, io.LimitReader(resp.Body, respReadLimit))

return resp, fmt.Errorf("Failed after %d attempt(s). Last error: %w", numTries, err)
}

func (c *Client) setBaseURL(urlStr string) error {
if !strings.HasSuffix(urlStr, "/") {
urlStr += "/"
Expand Down Expand Up @@ -209,6 +309,12 @@ func WithCustomRetry(retry retryablehttp.CheckRetry) ClientOption {
}
}

func WithCustomErrorHandler(h retryablehttp.ErrorHandler) ClientOption {
return func(opts *clientOptions) {
opts.errorHandler = h
}
}

func WithHTTPClient(httpClient *http.Client) ClientOption {
return func(opts *clientOptions) {
opts.httpClient = httpClient
Expand Down
101 changes: 101 additions & 0 deletions druid_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package druid

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -34,3 +38,100 @@ func TestNewClientWithSkipVerify(t *testing.T) {
}

// TODO: at some point use https://golang.org/src/crypto/tls/example_test.go this to create server with bad cert and test

func TestDefaultRetry(t *testing.T) {
ctx := context.TODO()
var b string
resp := buildMockResp(200, b)
retry, err := defaultRetry(ctx, &resp, nil)
assert.Nil(t, err)
assert.False(t, retry)

b = `{
"error": "SQL parse failed", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(400, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.False(t, retry)

b = `{
"error": "Plan validation failed", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(400, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.False(t, retry)

b = `{
"error": "Resource limit exceeded", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(400, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.True(t, retry)

b = `{
"error": "Query capacity exceeded", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(429, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.True(t, retry)

b = `{
"error": "Unsupported operation", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(501, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.False(t, retry)

b = `{
"error": "Query timeout", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(504, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.True(t, retry)

b = `{
"error": "Query cancelled", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(500, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.False(t, retry)

b = `{
"error": "Unknown exception", "errorMessage" : "Something bad happened."
}`
resp = buildMockResp(500, b)
retry, err = defaultRetry(ctx, &resp, nil)
assert.NotNil(t, err)
assert.False(t, retry)
}

func buildMockResp(statusCode int, body string) http.Response {
var st string
switch statusCode {
case 200:
st = "200 OK"
case 400:
st = "400 Bad Request"
case 429:
st = "429 Too Many Requests"
case 500:
st = "500 Internal Server Error"
case 501:
st = "Not Implemented"
case 504:
st = "Gateway Timeout"
default:
panic(fmt.Errorf("Unsupported mock status code: %d", statusCode))
}
return http.Response{
Status: st, StatusCode: statusCode,
Body: ioutil.NopCloser(strings.NewReader(body)),
}
}