Skip to content

Commit

Permalink
Backoff retry mcp client (istio#20114)
Browse files Browse the repository at this point in the history
* use backoff to reconnect to mcp server

* fix race

* fix ayj's comment
  • Loading branch information
hzxuzhonghu authored and Steven Dake committed Feb 21, 2020
1 parent b2e881b commit d56b5de
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
37 changes: 19 additions & 18 deletions pkg/mcp/sink/client_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
"io"
"time"

"istio.io/istio/pkg/mcp/status"
"github.com/cenkalti/backoff"

mcp "istio.io/api/mcp/v1alpha1"
"istio.io/istio/pkg/mcp/monitoring"

"istio.io/istio/pkg/mcp/status"
)

var (
Expand All @@ -33,43 +34,40 @@ var (
// Client implements the client for the MCP source service. The client is the
// sink and receives configuration from the server.
type Client struct {
// Client receives configuration using the ResourceSource RPC service
stream mcp.ResourceSource_EstablishResourceStreamClient

client mcp.ResourceSourceClient
*Sink
reporter monitoring.Reporter
}

// NewClient returns a new instance of Client.
func NewClient(client mcp.ResourceSourceClient, options *Options) *Client {
return &Client{
Sink: New(options),
reporter: options.Reporter,
client: client,
Sink: New(options),
client: client,
}
}

var reconnectTestProbe = func() {}

func (c *Client) Run(ctx context.Context) {
// The first attempt is immediate.
retryDelay := time.Nanosecond
var err error
var stream Stream

for {
backoffPolicy := backoff.NewExponentialBackOff()
backoffPolicy.InitialInterval = time.Nanosecond
backoffPolicy.MaxElapsedTime = 0
t := backoff.NewTicker(backoffPolicy)
// connect w/retry
for {
select {
case <-ctx.Done():
t.Stop()
return
case <-time.After(retryDelay):
case <-t.C:
}

// slow subsequent reconnection attempts down
retryDelay = reestablishStreamDelay

scope.Info("(re)trying to establish new MCP sink stream")
stream, err := c.client.EstablishResourceStream(ctx)
stream, err = c.client.EstablishResourceStream(ctx)

if reconnectTestProbe != nil {
reconnectTestProbe()
Expand All @@ -78,17 +76,20 @@ func (c *Client) Run(ctx context.Context) {
if err == nil {
c.reporter.RecordStreamCreateSuccess()
scope.Info("New MCP sink stream created")
c.stream = stream
break
}

scope.Errorf("Failed to create a new MCP sink stream: %v", err)
}

err := c.ProcessStream(c.stream)
// stop the ticker
t.Stop()

err := c.ProcessStream(stream)
if err != nil && err != io.EOF {
c.reporter.RecordRecvError(err, status.Code(err))
scope.Errorf("Error receiving MCP response: %v", err)
}
}

}
2 changes: 1 addition & 1 deletion pkg/mcp/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Sink struct {
}

// New creates a new resource sink.
func New(options *Options) *Sink { // nolint: lll
func New(options *Options) *Sink {
nodeInfo := &mcp.SinkNode{
Id: options.ID,
Annotations: options.Metadata,
Expand Down

0 comments on commit d56b5de

Please sign in to comment.