Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 19, 2023
1 parent b7d2520 commit eeceb9d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
2 changes: 1 addition & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (ccb *ccBalancerWrapper) close() {
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) <= 0 {
if len(addrs) == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
Expand Down
20 changes: 13 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"math"
"net/url"
"reflect"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -876,12 +875,12 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
}

if ac.state == connectivity.Ready {
// try to find the connected address.
// Try to find the connected address.
for _, a := range addrs {
a.ServerName = ac.cc.getServerName(a)
if reflect.DeepEqual(ac.curAddr, a) {
// We are connected to a valid address, so do nothing bu update
// the addresses.
if a.Equal(ac.curAddr) {
// We are connected to a valid address, so do nothing but
// update the addresses.
ac.mu.Unlock()
return
}
Expand All @@ -894,10 +893,17 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.cancel()
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)

curTr := ac.transport
// We have to defer here because GracefulClose => Close => onClose, which
// requires locking ac.mu.
defer ac.transport.GracefulClose()
ac.transport = nil

if len(addrs) == 0 {
ac.updateConnectivityState(connectivity.Idle, nil)
}

ac.mu.Unlock()
curTr.GracefulClose()

// Since we were connecting/connected, we should start a new connection
// attempt.
go ac.resetTransport()
Expand Down
17 changes: 11 additions & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,14 +1268,19 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
as.p = &parser{r: s}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
// ClientConn or cancels the stream context. In all other cases, an error
// should already be injected into the recv buffer by the transport, which
// the client will eventually receive, and then we will cancel the stream's
// context in clientStream.finish.
// Listen on stream context to cleanup when the stream context is
// canceled. Also listen for the addrConn's context in case the
// addrConn is closed or reconnects to a different address. In all
// other cases, an error should already be injected into the recv
// buffer by the transport, which the client will eventually receive,
// and then we will cancel the stream's context in
// addrConnStream.finish.
go func() {
ac.mu.Lock()
acCtx := ac.ctx
ac.mu.Unlock()
select {
case <-ac.ctx.Done():
case <-acCtx.Done():
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-ctx.Done():
as.finish(toRPCErr(ctx.Err()))
Expand Down

0 comments on commit eeceb9d

Please sign in to comment.