Skip to content

Commit

Permalink
xds: Fail xDS Server Serve() if called after Stop() or GracefulStop() (
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Jun 27, 2023
1 parent 7eb5727 commit 575a936
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
7 changes: 7 additions & 0 deletions xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
func (s *GRPCServer) initXDSClient() error {
s.clientMu.Lock()
defer s.clientMu.Unlock()
if s.quit.HasFired() {
return grpc.ErrServerStopped
}

if s.xdsC != nil {
return nil
Expand Down Expand Up @@ -332,6 +335,8 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
// corresponding pending RPCs on the client side will get notified by connection
// errors.
func (s *GRPCServer) Stop() {
s.clientMu.Lock()
defer s.clientMu.Unlock()
s.quit.Fire()
s.gs.Stop()
if s.xdsC != nil {
Expand All @@ -343,6 +348,8 @@ func (s *GRPCServer) Stop() {
// from accepting new connections and RPCs and blocks until all the pending RPCs
// are finished.
func (s *GRPCServer) GracefulStop() {
s.clientMu.Lock()
defer s.clientMu.Unlock()
s.quit.Fire()
s.gs.GracefulStop()
if s.xdsC != nil {
Expand Down
66 changes: 66 additions & 0 deletions xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -315,6 +316,24 @@ func (p *fakeProvider) Close() {
p.Distributor.Stop()
}

// setupClientOverride sets up an override for new xdsClient creation.
func setupClientOverride(t *testing.T) func() {
origNewXDSClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c := fakeclient.NewClient()
c.SetBootstrapConfig(&bootstrap.Config{
XDSServer: xdstestutils.ServerConfigForAddress(t, "server-address"),
NodeProto: xdstestutils.EmptyNodeProtoV3,
ServerListenerResourceNameTemplate: testServerListenerResourceNameTemplate,
CertProviderConfigs: certProviderConfigs,
})
return c, func() {}, nil
}
return func() {
newXDSClient = origNewXDSClient
}
}

// setupOverrides sets up overrides for bootstrap config, new xdsClient creation
// and new gRPC.Server creation.
func setupOverrides(t *testing.T) (*fakeGRPCServer, *testutils.Channel, func()) {
Expand Down Expand Up @@ -890,3 +909,50 @@ func verifyCertProviderNotCreated() error {
}
return nil
}

// TestServeReturnsErrorAfterClose tests that the xds Server returns
// grpc.ErrServerStopped if Serve is called after Close on the server.
func (s) TestServeReturnsErrorAfterClose(t *testing.T) {
cancel := setupClientOverride(t)
defer cancel()
server := NewGRPCServer()

lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}

server.Stop()
err = server.Serve(lis)
if err == nil || !strings.Contains(err.Error(), grpc.ErrServerStopped.Error()) {
t.Fatalf("server erorred with wrong error, want: %v, got :%v", grpc.ErrServerStopped, err)
}
}

// TestServeAndCloseDoNotRace tests that Serve and Close on the xDS Server do
// not race and leak the xDS Client. A leak would be found by the leak checker.
func (s) TestServeAndCloseDoNotRace(t *testing.T) {
cleanup := setupClientOverride(t)
defer cleanup()

lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}

wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
server := NewGRPCServer()
wg.Add(1)
go func() {
server.Serve(lis)
wg.Done()
}()
wg.Add(1)
go func() {
server.Stop()
wg.Done()
}()
}
wg.Wait()
}

0 comments on commit 575a936

Please sign in to comment.