Skip to content

Commit

Permalink
orca: create ORCA producer for LB policies to use to receive OOB load…
Browse files Browse the repository at this point in the history
… reports (grpc#5669)
  • Loading branch information
dfawley authored Nov 3, 2022
1 parent 36d14db commit e41e894
Show file tree
Hide file tree
Showing 8 changed files with 880 additions and 3 deletions.
23 changes: 23 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type SubConn interface {
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
// GetOrBuildProducer returns a reference to the existing Producer for this
// ProducerBuilder in this SubConn, or, if one does not currently exist,
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
}

// NewSubConnOptions contains options to create new SubConn.
Expand Down Expand Up @@ -371,3 +376,21 @@ type ClientConnState struct {
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")

// A ProducerBuilder is a simple constructor for a Producer. It is used by the
// SubConn to create producers when needed.
type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as interface{} to avoid a
// dependency cycle. Should also return a close function that will be
// called when all references to the Producer have been given up.
Build(grpcClientConnInterface interface{}) (p Producer, close func())
}

// A Producer is a type shared among potentially many consumers. It is
// associated with a SubConn, and an implementation will typically contain
// other methods to provide additional functionality, e.g. configuration or
// subscription registration.
type Producer interface {
}
4 changes: 4 additions & 0 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}

// testPickBuilder creates balancer.Picker for test.
type testPickBuilder struct {
validate func(info PickerBuildInfo)
Expand Down
71 changes: 68 additions & 3 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@
package grpc

import (
"context"
"fmt"
"strings"
"sync"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
Expand Down Expand Up @@ -305,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
Expand Down Expand Up @@ -359,8 +362,9 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
mu sync.Mutex
ac *addrConn
producers map[balancer.ProducerBuilder]*refCountedProducer
}

func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
Expand Down Expand Up @@ -414,3 +418,64 @@ func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
defer acbw.mu.Unlock()
return acbw.ac
}

var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected")

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, returns errSubConnNotReady.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
transport := acbw.ac.getReadyTransport()
if transport == nil {
return nil, errSubConnNotReady
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}

// Invoke performs a unary RPC. If the addrConn is not ready, returns
// errSubConnNotReady.
func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error {
cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(args); err != nil {
return err
}
return cs.RecvMsg(reply)
}

type refCountedProducer struct {
producer balancer.Producer
refs int // number of current refs to the producer
close func() // underlying producer's close function
}

func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
acbw.mu.Lock()
defer acbw.mu.Unlock()

// Look up existing producer from this builder.
pData := acbw.producers[pb]
if pData == nil {
// Not found; create a new one and add it to the producers map.
p, close := pb.Build(acbw)
pData = &refCountedProducer{producer: p, close: close}
acbw.producers[pb] = pData
}
// Account for this new reference.
pData.refs++

// Return a cleanup function wrapped in a OnceFunc to remove this reference
// and delete the refCountedProducer from the map if the total reference
// count goes to zero.
unref := func() {
acbw.mu.Lock()
pData.refs--
if pData.refs == 0 {
defer pData.close() // Run outside the acbw mutex
delete(acbw.producers, pb)
}
acbw.mu.Unlock()
}
return pData.producer, grpcsync.OnceFunc(unref)
}
5 changes: 5 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (tsc *TestSubConn) Connect() {
}
}

// GetOrBuildProducer is a no-op.
func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}

// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
Expand Down
7 changes: 7 additions & 0 deletions orca/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
// avoid polluting the godoc of the top-level orca package.
package internal

import ibackoff "google.golang.org/grpc/internal/backoff"

// AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval
// configured via ServiceOptions, to a minimum of 30s.
//
// For testing purposes only.
var AllowAnyMinReportingInterval interface{} // func(*ServiceOptions)

// DefaultBackoffFunc is used by the producer to control its backoff behavior.
//
// For testing purposes only.
var DefaultBackoffFunc = ibackoff.DefaultExponential.Backoff
Loading

0 comments on commit e41e894

Please sign in to comment.