Skip to content

Commit

Permalink
New grpclb implementation (#1558)
Browse files Browse the repository at this point in the history
The new grpclb supports fallback to backends if remote balancer is unavailable
  • Loading branch information
menghanl authored Nov 27, 2017
1 parent 10873b3 commit 2ef021f
Show file tree
Hide file tree
Showing 13 changed files with 996 additions and 863 deletions.
4 changes: 4 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ type PickOptions struct{}
type DoneInfo struct {
// Err is the rpc error the RPC finished with. It could be nil.
Err error
// BytesSent indicates if any bytes have been sent to the server.
BytesSent bool
// BytesReceived indicates if any byte has been received from the server.
BytesReceived bool
}

var (
Expand Down
27 changes: 14 additions & 13 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,11 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts)
if err != nil {
if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: true,
bytesReceived: stream.BytesReceived(),
done(balancer.DoneInfo{
Err: err,
BytesSent: true,
BytesReceived: stream.BytesReceived(),
})
done(balancer.DoneInfo{Err: err})
}
// Retry a non-failfast RPC when
// i) the server started to drain before this RPC was initiated.
Expand All @@ -301,11 +301,11 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err = recvResponse(ctx, cc.dopts, t, c, stream, reply)
if err != nil {
if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: true,
bytesReceived: stream.BytesReceived(),
done(balancer.DoneInfo{
Err: err,
BytesSent: true,
BytesReceived: stream.BytesReceived(),
})
done(balancer.DoneInfo{Err: err})
}
if !c.failFast && stream.Unprocessed() {
// In these cases, the server did not receive the data, but we still
Expand All @@ -323,12 +323,13 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
}
t.CloseStream(stream, nil)
err = stream.Status().Err()
if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: true,
bytesReceived: stream.BytesReceived(),
done(balancer.DoneInfo{
Err: err,
BytesSent: true,
BytesReceived: stream.BytesReceived(),
})
done(balancer.DoneInfo{Err: err})
}
if !c.failFast && stream.Unprocessed() {
// In these cases, the server did not receive the data, but we still
Expand All @@ -339,6 +340,6 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
continue
}
}
return stream.Status().Err()
return err
}
}
22 changes: 18 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type dialOptions struct {
callOptions []CallOption
// This is to support v1 balancer.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
}

const (
Expand Down Expand Up @@ -204,6 +206,13 @@ func WithBalancerBuilder(b balancer.Builder) DialOption {
}
}

// withResolverBuilder is only for grpclb.
func withResolverBuilder(b resolver.Builder) DialOption {
return func(o *dialOptions) {
o.resolverBuilder = b
}
}

// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
// DEPRECATED: service config should be received through name resolver, as specified here.
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
Expand Down Expand Up @@ -283,18 +292,23 @@ func WithTimeout(d time.Duration) DialOption {
}
}

func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = f
}
}

// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
return withContextDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
if deadline, ok := ctx.Deadline(); ok {
return f(addr, deadline.Sub(time.Now()))
}
return f(addr, 0)
}
}
})
}

// WithStatsHandler returns a DialOption that specifies the stats handler
Expand Down
Loading

0 comments on commit 2ef021f

Please sign in to comment.