Skip to content

Commit

Permalink
orca: update example and interop to use StateListener (#6529)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 11, 2023
1 parent c2bc22c commit 03d32b9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
16 changes: 10 additions & 6 deletions examples/features/orca/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,14 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
}

// Create one SubConn for the address and connect it.
sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
var sc balancer.SubConn
sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
StateListener: func(scs balancer.SubConnState) {
if scs.ConnectivityState == connectivity.Ready {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}})
}
},
})
if err != nil {
return fmt.Errorf("orcaLB: error creating SubConn: %v", err)
}
Expand All @@ -123,11 +130,8 @@ func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {

func (o *orcaLB) ResolverError(error) {}

func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if scs.ConnectivityState == connectivity.Ready {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}})
}
}
// TODO: unused; remove when no longer required.
func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {}

func (o *orcaLB) Close() {}

Expand Down
29 changes: 14 additions & 15 deletions interop/orcalb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
return fmt.Errorf("resolver produced no addresses")
}
var err error
o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{StateListener: o.updateSubConnState})
if err != nil {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
return nil
Expand All @@ -82,20 +82,20 @@ func (o *orcab) ResolverError(err error) {
}
}

func (o *orcab) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) {
if o.sc != sc {
logger.Errorf("received subconn update for unknown subconn: %v vs %v", o.sc, sc)
return
}
switch scState.ConnectivityState {
func (o *orcab) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

func (o *orcab) updateSubConnState(state balancer.SubConnState) {
switch state.ConnectivityState {
case connectivity.Ready:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &scPicker{sc: sc, o: o}})
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &orcaPicker{o: o}})
case connectivity.TransientFailure:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", scState.ConnectionError))})
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", state.ConnectionError))})
case connectivity.Connecting:
// Ignore; picker already set to "connecting".
case connectivity.Idle:
sc.Connect()
o.sc.Connect()
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
case connectivity.Shutdown:
// Ignore; we are closing but handle that in Close instead.
Expand All @@ -113,12 +113,11 @@ func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
o.report = r
}

type scPicker struct {
sc balancer.SubConn
o *orcab
type orcaPicker struct {
o *orcab
}

func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
func (p *orcaPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
doneCB := func(di balancer.DoneInfo) {
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
Expand All @@ -134,7 +133,7 @@ func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
}
}
return balancer.PickResult{SubConn: p.sc, Done: doneCB}, nil
return balancer.PickResult{SubConn: p.o.sc, Done: doneCB}, nil
}

func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
Expand Down

0 comments on commit 03d32b9

Please sign in to comment.