Skip to content

Commit

Permalink
xdsclient: switch more transport tests to e2e style (2/N) (#7693)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Oct 7, 2024
1 parent 9afb232 commit 98d1550
Show file tree
Hide file tree
Showing 11 changed files with 511 additions and 896 deletions.
2 changes: 2 additions & 0 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type authorityArgs struct {
serializer *grpcsync.CallbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
logger *grpclog.PrefixLogger
}

Expand All @@ -123,6 +124,7 @@ func newAuthority(args authorityArgs) (*authority, error) {
OnRecvHandler: ret.handleResourceUpdate,
OnErrorHandler: ret.newConnectionError,
OnSendHandler: ret.transportOnSendHandler,
Backoff: args.backoff,
Logger: args.logger,
NodeProto: args.bootstrapCfg.Node(),
})
Expand Down
16 changes: 13 additions & 3 deletions xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
Expand Down Expand Up @@ -53,16 +54,17 @@ const NameForServer = "#server"
// only when all references are released, and it is safe for the caller to
// invoke this close function multiple times.
func New(name string) (XDSClient, func(), error) {
return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout, backoff.DefaultExponential.Backoff)
}

// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
backoff: streamBackoff,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
Expand Down Expand Up @@ -90,6 +92,11 @@ type OptionsForTesting struct {
// AuthorityIdleTimeout is the timeout before idle authorities are deleted.
// If unspecified, uses the default value used in non-test code.
AuthorityIdleTimeout time.Duration

// StreamBackoffAfterFailure is the backoff function used to determine the
// backoff duration after stream failures. If unspecified, uses the default
// value used in non-test code.
StreamBackoffAfterFailure func(int) time.Duration
}

// NewForTesting returns an xDS client configured with the provided options.
Expand All @@ -111,11 +118,14 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.AuthorityIdleTimeout == 0 {
opts.AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout
}
if opts.StreamBackoffAfterFailure == nil {
opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc
}

if err := bootstrap.SetFallbackBootstrapConfig(opts.Contents); err != nil {
return nil, nil, err
}
client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout)
client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout, opts.StreamBackoffAfterFailure)
return client, func() { bootstrap.UnsetFallbackBootstrapConfigForTesting(); cancel() }, err
}

Expand Down
7 changes: 5 additions & 2 deletions xds/internal/xdsclient/client_refcounted.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
)
Expand All @@ -37,6 +38,8 @@ var (
// overridden in tests to give them visibility into certain events.
xdsClientImplCreateHook = func(string) {}
xdsClientImplCloseHook = func(string) {}

defaultStreamBackoffFunc = backoff.DefaultExponential.Backoff
)

func clientRefCountedClose(name string) {
Expand All @@ -60,7 +63,7 @@ func clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration) (XDSClient, func(), error) {
func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
clientsMu.Lock()
defer clientsMu.Unlock()

Expand All @@ -74,7 +77,7 @@ func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Du
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err)
}
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout)
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout, streamBackoff)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type clientImpl struct {
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
serializer *grpcsync.CallbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/clientimpl_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *auth
serializer: c.serializer,
resourceTypeGetter: c.resourceTypes.get,
watchExpiryTimeout: c.watchExpiryTimeout,
backoff: c.backoff,
logger: grpclog.NewPrefixLogger(logger, authorityPrefix(c, config.ServerURI())),
})
if err != nil {
Expand Down
Loading

0 comments on commit 98d1550

Please sign in to comment.