Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support a 1:1 mapping with acbws and addrConns #6302

Merged
merged 3 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review comments
  • Loading branch information
dfawley committed May 22, 2023
commit 5021fdf80ad79afa2b70258cc7712e369f0623b9
2 changes: 1 addition & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
}

if len(addrs) <= 0 {
if len(addrs) == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
Expand Down
20 changes: 13 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"math"
"net/url"
"reflect"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1010,12 +1009,12 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
}

if ac.state == connectivity.Ready {
// try to find the connected address.
// Try to find the connected address.
for _, a := range addrs {
a.ServerName = ac.cc.getServerName(a)
if reflect.DeepEqual(ac.curAddr, a) {
// We are connected to a valid address, so do nothing bu update
// the addresses.
if a.Equal(ac.curAddr) {
// We are connected to a valid address, so do nothing but
// update the addresses.
ac.mu.Unlock()
return
}
Expand All @@ -1028,10 +1027,17 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.cancel()
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)

curTr := ac.transport
// We have to defer here because GracefulClose => Close => onClose, which
// requires locking ac.mu.
defer ac.transport.GracefulClose()
ac.transport = nil

if len(addrs) == 0 {
ac.updateConnectivityState(connectivity.Idle, nil)
}

ac.mu.Unlock()
curTr.GracefulClose()

// Since we were connecting/connected, we should start a new connection
// attempt.
go ac.resetTransport()
Expand Down
17 changes: 11 additions & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,14 +1273,19 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
as.p = &parser{r: s}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
// ClientConn or cancels the stream context. In all other cases, an error
// should already be injected into the recv buffer by the transport, which
// the client will eventually receive, and then we will cancel the stream's
// context in clientStream.finish.
// Listen on stream context to cleanup when the stream context is
// canceled. Also listen for the addrConn's context in case the
// addrConn is closed or reconnects to a different address. In all
// other cases, an error should already be injected into the recv
// buffer by the transport, which the client will eventually receive,
// and then we will cancel the stream's context in
// addrConnStream.finish.
go func() {
ac.mu.Lock()
acCtx := ac.ctx
ac.mu.Unlock()
select {
case <-ac.ctx.Done():
case <-acCtx.Done():
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-ctx.Done():
as.finish(toRPCErr(ctx.Err()))
Expand Down