Skip to content

Commit

Permalink
base: update base balancer for new APIs (grpc#6503)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 4, 2023
1 parent 6c0c69e commit e9a4e94
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
14 changes: 12 additions & 2 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
HealthCheckEnabled: b.config.HealthCheck,
StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
}
sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
Expand All @@ -124,7 +129,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
sc.Shutdown()
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
// The entry will be deleted in updateSubConnState.
}
}
// If resolver state contains no addresses, return an error so ClientConn
Expand Down Expand Up @@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() {
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}

// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if logger.V(2) {
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
Expand Down
28 changes: 23 additions & 5 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package base

import (
"context"
"testing"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
Expand All @@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS

func (c *testClientConn) UpdateState(balancer.State) {}

type testSubConn struct{}
type testSubConn struct {
updateState func(balancer.SubConnState)
}

func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

Expand All @@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
}

func TestBaseBalancerReserveAttributes(t *testing.T) {
var v = func(info PickerBuildInfo) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
validated := make(chan struct{}, 1)
v := func(info PickerBuildInfo) {
defer func() { validated <- struct{}{} }()
for _, sc := range info.ReadySCs {
if sc.Address.Addr == "1.1.1.1" {
if sc.Address.Attributes == nil {
Expand All @@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
}
pickBuilder := &testPickBuilder{validate: v}
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{}, nil
newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
return &testSubConn{updateState: opts.StateListener}, nil
},
}, balancer.BuildOptions{}).(*baseBalancer)

Expand All @@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
},
},
})
select {
case <-validated:
case <-ctx.Done():
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
}

for sc := range b.scStates {
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
select {
case <-validated:
case <-ctx.Done():
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
}
}
}

0 comments on commit e9a4e94

Please sign in to comment.