Skip to content

Commit

Permalink
balancer: fix tests not properly updating subconn states (#6501)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 4, 2023
1 parent 8ebe462 commit 4fe8d3d
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 111 deletions.
6 changes: 5 additions & 1 deletion balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,11 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
Expand Down
11 changes: 11 additions & 0 deletions internal/balancer/gracefulswitch/gracefulswitch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,11 @@ func (mb1 *mockBalancer) newSubConn(addrs []resolver.Address, opts balancer.NewS
if opts.StateListener == nil {
opts.StateListener = func(state balancer.SubConnState) { mb1.UpdateSubConnState(sc, state) }
}
defer func() {
if sc != nil {
sc.Connect()
}
}()
return mb1.cc.NewSubConn(addrs, opts)
}

Expand Down Expand Up @@ -1023,6 +1028,7 @@ func (vb *verifyBalancer) newSubConn(addrs []resolver.Address, opts balancer.New
if opts.StateListener == nil {
opts.StateListener = func(state balancer.SubConnState) { vb.UpdateSubConnState(sc, state) }
}
defer func() { sc.Connect() }()
return vb.cc.NewSubConn(addrs, opts)
}

Expand Down Expand Up @@ -1076,6 +1082,11 @@ func (bcb *buildCallbackBal) newSubConn(addrs []resolver.Address, opts balancer.
if opts.StateListener == nil {
opts.StateListener = func(state balancer.SubConnState) { bcb.UpdateSubConnState(sc, state) }
}
defer func() {
if sc != nil {
sc.Connect()
}
}()
return bcb.cc.NewSubConn(addrs, opts)
}

Expand Down
16 changes: 16 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)

Expand All @@ -40,13 +41,26 @@ type TestSubConn struct {
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
connectCalled *grpcsync.Event
}

// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
// should be created via TestClientConn.NewSubConn instead, but can be useful
// for some tests.
func NewTestSubConn(id string) *TestSubConn {
return &TestSubConn{
ConnectCh: make(chan struct{}, 1),
connectCalled: grpcsync.NewEvent(),
id: id,
}
}

// UpdateAddresses is a no-op.
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {}

// Connect is a no-op.
func (tsc *TestSubConn) Connect() {
tsc.connectCalled.Fire()
select {
case tsc.ConnectCh <- struct{}{}:
default:
Expand All @@ -60,6 +74,7 @@ func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.P

// UpdateState pushes the state to the listener, if one is registered.
func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
<-tsc.connectCalled.Done()
if tsc.stateListener != nil {
tsc.stateListener(state)
return
Expand Down Expand Up @@ -109,6 +124,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
connectCalled: grpcsync.NewEvent(),
}
tcc.subConnIdx++
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
Expand Down
32 changes: 18 additions & 14 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ func (s) TestDropByCategory(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.

const rpcCount = 20
Expand Down Expand Up @@ -283,13 +283,13 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const rpcCount = 100
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
Expand Down Expand Up @@ -375,7 +375,11 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
// Create a subConn which will be used later on to test the race
// between UpdateSubConnState() and Close().
bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
Expand Down Expand Up @@ -410,7 +414,7 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
// that we use as the child policy will not send a picker update until the
// parent policy is closed.
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.Close()
close(closeCh)

Expand Down Expand Up @@ -449,7 +453,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
Expand All @@ -464,7 +468,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
Expand Down Expand Up @@ -524,13 +528,13 @@ func (s) TestReResolution(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker.
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err.Error())
Expand All @@ -543,13 +547,13 @@ func (s) TestReResolution(t *testing.T) {
t.Fatalf("timeout waiting for ResolveNow()")
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker.
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err.Error())
Expand Down Expand Up @@ -608,13 +612,13 @@ func (s) TestLoadReporting(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}

b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const successCount = 5
const errorCount = 5
Expand Down
36 changes: 20 additions & 16 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func TestClusterPicks(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
Expand Down Expand Up @@ -247,8 +247,8 @@ func TestConfigUpdateAddCluster(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
Expand Down Expand Up @@ -313,8 +313,8 @@ func TestConfigUpdateAddCluster(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Should have no more newSubConn.
select {
Expand Down Expand Up @@ -404,8 +404,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
Expand Down Expand Up @@ -488,8 +488,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
// Clear the attributes before adding to map.
addrs[0].BalancerAttributes = nil
m2[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p3 := <-cc.NewPickerCh
Expand Down Expand Up @@ -582,7 +582,11 @@ var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
Expand Down Expand Up @@ -632,7 +636,7 @@ func TestInitialIdle(t *testing.T) {
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
}

if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
Expand Down Expand Up @@ -673,8 +677,8 @@ func TestClusterGracefulSwitch(t *testing.T) {
}

sc1 := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
pi := balancer.PickInfo{
Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
Expand Down Expand Up @@ -703,7 +707,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
Expand All @@ -716,7 +720,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
testPick(t, p2, pi, sc2, nil)
// The Graceful Switch process completing for the child should cause the
Expand Down
Loading

0 comments on commit 4fe8d3d

Please sign in to comment.