From d56b5de5dcf99b662040b2046a42eae58be2da5d Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Mon, 17 Feb 2020 15:21:41 +0800 Subject: [PATCH] Backoff retry mcp client (#20114) * use backoff to reconnect to mcp server * fix race * fix ayj's comment --- pkg/mcp/sink/client_sink.go | 37 +++++++++++++++++++------------------ pkg/mcp/sink/sink.go | 2 +- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/mcp/sink/client_sink.go b/pkg/mcp/sink/client_sink.go index 9f13b8fa1d95..784b578ef23c 100644 --- a/pkg/mcp/sink/client_sink.go +++ b/pkg/mcp/sink/client_sink.go @@ -19,10 +19,11 @@ import ( "io" "time" - "istio.io/istio/pkg/mcp/status" + "github.com/cenkalti/backoff" mcp "istio.io/api/mcp/v1alpha1" - "istio.io/istio/pkg/mcp/monitoring" + + "istio.io/istio/pkg/mcp/status" ) var ( @@ -33,43 +34,40 @@ var ( // Client implements the client for the MCP source service. The client is the // sink and receives configuration from the server. type Client struct { - // Client receives configuration using the ResourceSource RPC service - stream mcp.ResourceSource_EstablishResourceStreamClient - client mcp.ResourceSourceClient *Sink - reporter monitoring.Reporter } // NewClient returns a new instance of Client. func NewClient(client mcp.ResourceSourceClient, options *Options) *Client { return &Client{ - Sink: New(options), - reporter: options.Reporter, - client: client, + Sink: New(options), + client: client, } } var reconnectTestProbe = func() {} func (c *Client) Run(ctx context.Context) { - // The first attempt is immediate. - retryDelay := time.Nanosecond + var err error + var stream Stream for { + backoffPolicy := backoff.NewExponentialBackOff() + backoffPolicy.InitialInterval = time.Nanosecond + backoffPolicy.MaxElapsedTime = 0 + t := backoff.NewTicker(backoffPolicy) // connect w/retry for { select { case <-ctx.Done(): + t.Stop() return - case <-time.After(retryDelay): + case <-t.C: } - // slow subsequent reconnection attempts down - retryDelay = reestablishStreamDelay - scope.Info("(re)trying to establish new MCP sink stream") - stream, err := c.client.EstablishResourceStream(ctx) + stream, err = c.client.EstablishResourceStream(ctx) if reconnectTestProbe != nil { reconnectTestProbe() @@ -78,17 +76,20 @@ func (c *Client) Run(ctx context.Context) { if err == nil { c.reporter.RecordStreamCreateSuccess() scope.Info("New MCP sink stream created") - c.stream = stream break } scope.Errorf("Failed to create a new MCP sink stream: %v", err) } - err := c.ProcessStream(c.stream) + // stop the ticker + t.Stop() + + err := c.ProcessStream(stream) if err != nil && err != io.EOF { c.reporter.RecordRecvError(err, status.Code(err)) scope.Errorf("Error receiving MCP response: %v", err) } } + } diff --git a/pkg/mcp/sink/sink.go b/pkg/mcp/sink/sink.go index 8085a6d86e85..959a22ca20d4 100644 --- a/pkg/mcp/sink/sink.go +++ b/pkg/mcp/sink/sink.go @@ -54,7 +54,7 @@ type Sink struct { } // New creates a new resource sink. -func New(options *Options) *Sink { // nolint: lll +func New(options *Options) *Sink { nodeInfo := &mcp.SinkNode{ Id: options.ID, Annotations: options.Metadata,