Skip to content

Commit

Permalink
grpclb: some minor cleanups (#6634)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Sep 15, 2023
1 parent 1880bd6 commit 94d8074
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 44 deletions.
39 changes: 23 additions & 16 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/resolver/dns"
"google.golang.org/grpc/resolver"

Expand Down Expand Up @@ -145,20 +148,21 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
manualResolver: r,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
clientStats: newRPCStats(),
backoff: backoff.DefaultExponential, // TODO: make backoff configurable.
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[grpclb %p] ", lb))

var err error
if opt.CredsBundle != nil {
lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
if err != nil {
logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
lb.logger.Warningf("Failed to create credentials used for connecting to grpclb: %v", err)
}
lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
if err != nil {
logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
lb.logger.Warningf("Failed to create credentials used for connecting to backends returned by grpclb: %v", err)
}
}

Expand All @@ -170,6 +174,7 @@ type lbBalancer struct {
dialTarget string // user's dial target
target string // same as dialTarget unless overridden in service config
opt balancer.BuildOptions
logger *internalgrpclog.PrefixLogger

usePickFirst bool

Expand Down Expand Up @@ -236,12 +241,12 @@ type lbBalancer struct {
// Caller must hold lb.mu.
func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
if lb.state == connectivity.TransientFailure {
lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)}
lb.picker = base.NewErrPicker(fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr))
return
}

if lb.state == connectivity.Connecting {
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
return
}

Expand All @@ -268,7 +273,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
//
// This doesn't seem to be necessary after the connecting check above.
// Kept for safety.
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
return
}
if lb.inFallback {
Expand Down Expand Up @@ -322,21 +327,21 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
// UpdateSubConnState is unused; NewSubConn's options always specifies
// updateSubConnState as the listener.
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
lb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
}

func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
if lb.logger.V(2) {
lb.logger.Infof("SubConn state change: %p, %v", sc, s)
}
lb.mu.Lock()
defer lb.mu.Unlock()

oldS, ok := lb.scStates[sc]
if !ok {
if logger.V(2) {
logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
if lb.logger.V(2) {
lb.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
}
return
}
Expand Down Expand Up @@ -441,8 +446,8 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
if lb.usePickFirst == newUsePickFirst {
return
}
if logger.V(2) {
logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
if lb.logger.V(2) {
lb.logger.Infof("Switching mode. Is pick_first used for backends? %v", newUsePickFirst)
}
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}
Expand All @@ -453,8 +458,8 @@ func (lb *lbBalancer) ResolverError(error) {
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if logger.V(2) {
logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
if lb.logger.V(2) {
lb.logger.Infof("UpdateClientConnState: %s", pretty.ToJSON(ccs))
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)
Expand Down Expand Up @@ -482,7 +487,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
} else if lb.ccRemoteLB == nil {
// First time receiving resolved addresses, create a cc to remote
// balancers.
lb.newRemoteBalancerCCWrapper()
if err := lb.newRemoteBalancerCCWrapper(); err != nil {
return err
}
// Start the fallback goroutine.
go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
}
Expand Down
9 changes: 0 additions & 9 deletions balancer/grpclb/grpclb_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ func (s *rpcStats) knownReceived() {
atomic.AddInt64(&s.numCallsFinished, 1)
}

type errPicker struct {
// Pick always returns this err.
err error
}

func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, p.err
}

// rrPicker does roundrobin on subConns. It's typically used when there's no
// response from remote balancer, and grpclb falls back to the resolved
// backends.
Expand Down
49 changes: 31 additions & 18 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,37 @@ import (
"time"

"github.com/golang/protobuf/proto"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/backoff"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"

timestamppb "github.com/golang/protobuf/ptypes/timestamp"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
)

func serverListEqual(a, b []*lbpb.Server) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
if !proto.Equal(a[i], b[i]) {
return false
}
}
return true
}

// processServerList updates balancer's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
if logger.V(2) {
logger.Infof("lbBalancer: processing server list: %+v", l)
if lb.logger.V(2) {
lb.logger.Infof("Processing server list: %#v", l)
}
lb.mu.Lock()
defer lb.mu.Unlock()
Expand All @@ -55,9 +67,9 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
lb.serverListReceived = true

// If the new server list == old server list, do nothing.
if cmp.Equal(lb.fullServerList, l.Servers, cmp.Comparer(proto.Equal)) {
if logger.V(2) {
logger.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
if serverListEqual(lb.fullServerList, l.Servers) {
if lb.logger.V(2) {
lb.logger.Infof("Ignoring new server list as it is the same as the previous one")
}
return
}
Expand All @@ -78,9 +90,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr := imetadata.Set(resolver.Address{Addr: fmt.Sprintf("%s:%d", ipStr, s.Port)}, md)
if logger.V(2) {
logger.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
i, ipStr, s.Port, s.LoadBalanceToken)
if lb.logger.V(2) {
lb.logger.Infof("Server list entry:|%d|, ipStr:|%s|, port:|%d|, load balancer token:|%v|", i, ipStr, s.Port, s.LoadBalanceToken)
}
backendAddrs = append(backendAddrs, addr)
}
Expand Down Expand Up @@ -149,7 +160,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.ClientConn.NewSubConn(backendAddrs, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
lb.logger.Warningf("Failed to create new SubConn: %v", err)
return
}
sc.Connect()
Expand All @@ -174,7 +185,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
lb.logger.Warningf("Failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutAttrs] = sc // Use the addr without MD as key for the map.
Expand Down Expand Up @@ -217,7 +228,7 @@ type remoteBalancerCCWrapper struct {
wg sync.WaitGroup
}

func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
func (lb *lbBalancer) newRemoteBalancerCCWrapper() error {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
Expand Down Expand Up @@ -248,9 +259,10 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
//
// The grpclb server addresses will set field ServerName, and creds will
// receive ServerName as authority.
cc, err := grpc.DialContext(context.Background(), lb.manualResolver.Scheme()+":///grpclb.subClientConn", dopts...)
target := lb.manualResolver.Scheme() + ":///grpclb.subClientConn"
cc, err := grpc.Dial(target, dopts...)
if err != nil {
logger.Fatalf("failed to dial: %v", err)
return fmt.Errorf("grpc.Dial(%s): %v", target, err)
}
ccw := &remoteBalancerCCWrapper{
cc: cc,
Expand All @@ -261,6 +273,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
lb.ccRemoteLB = ccw
ccw.wg.Add(1)
go ccw.watchRemoteBalancer()
return nil
}

// close closed the ClientConn to remote balancer, and waits until all
Expand Down Expand Up @@ -408,9 +421,9 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
default:
if err != nil {
if err == errServerTerminatedConnection {
logger.Info(err)
ccw.lb.logger.Infof("Call to remote balancer failed: %v", err)
} else {
logger.Warning(err)
ccw.lb.logger.Warningf("Call to remote balancer failed: %v", err)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion interop/observability/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
Expand Down

0 comments on commit 94d8074

Please sign in to comment.