Skip to content

Commit

Permalink
Fixed tests to run on Windows + some cleanup
Browse files Browse the repository at this point in the history
- JS Servers need a special sequence on shutdown. Most of the time
the store was attempted to be removed before the server was shutdown,
which on Windows would result in an error and the next test may
recover the previous state.
- Made use of the jsClient() helper to remove typical connect+jetstream
code.
- Added some nats.Timeout() during connect so that Windows connect
to a server that is not running is faster than the default 2sec, which
may cause tests to run very long or simply fail.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 2, 2022
1 parent b16cefd commit 05a456d
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 782 deletions.
166 changes: 59 additions & 107 deletions js_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2021 The NATS Authors
// Copyright 2012-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -35,6 +35,25 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"
)

func client(t *testing.T, s *server.Server, opts ...Option) *Conn {
t.Helper()
nc, err := Connect(s.ClientURL(), opts...)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return nc
}

func jsClient(t *testing.T, s *server.Server, opts ...Option) (*Conn, JetStreamContext) {
t.Helper()
nc := client(t, s, opts...)
js, err := nc.JetStream(MaxWait(10 * time.Second))
if err != nil {
t.Fatalf("Unexpected error getting JetStream context: %v", err)
}
return nc, js
}

func RunBasicJetStreamServer() *server.Server {
opts := natsserver.DefaultTestOptions
opts.Port = -1
Expand All @@ -61,26 +80,29 @@ func createConfFile(t *testing.T, content []byte) string {
return fName
}

func shutdownJSServerAndRemoveStorage(t *testing.T, s *server.Server) {
var sd string
if config := s.JetStreamConfig(); config != nil {
sd = config.StoreDir
}
s.Shutdown()
if sd != _EMPTY_ {
if err := os.RemoveAll(sd); err != nil {
t.Fatalf("Unable to remove storage %q: %v", sd, err)
}
}
s.WaitForShutdown()
}

// Need access to internals for loss testing.
func TestJetStreamOrderedConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

var err error
_, err = js.AddStream(&StreamConfig{
Name: "OBJECT",
Subjects: []string{"a"},
Expand Down Expand Up @@ -270,22 +292,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) {

func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error

// For capturing errors.
errCh := make(chan error, 1)
Expand Down Expand Up @@ -379,22 +391,12 @@ func TestJetStreamOrderedConsumerWithErrors(t *testing.T) {

func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error

_, err = js.AddStream(&StreamConfig{
Name: "OBJECT",
Expand Down Expand Up @@ -473,15 +475,9 @@ func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
// server had properly processed the auto-unsub after the
// reset of the ordered consumer. Use a different connection
// to send.
nc2, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc2, js2 := jsClient(t, s)
defer nc2.Close()
js2, err := nc2.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js2.Publish("a", []byte("should not be received"))

newInMsgs := nc.Stats().InMsgs
Expand All @@ -494,22 +490,12 @@ func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
// One should win and the others should share the delivery subject with the first one who wins.
func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
defer shutdownJSServerAndRemoveStorage(t, s)

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error

// Create stream.
_, err = js.AddStream(&StreamConfig{
Expand Down Expand Up @@ -572,11 +558,7 @@ func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) {

func TestJetStreamSubscribeReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
defer shutdownJSServerAndRemoveStorage(t, s)

rch := make(chan struct{}, 1)
nc, err := Connect(s.ClientURL(),
Expand Down Expand Up @@ -658,22 +640,12 @@ func TestJetStreamSubscribeReconnect(t *testing.T) {

func TestJetStreamAckTokens(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
defer shutdownJSServerAndRemoveStorage(t, s)

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error

// Create the stream using our client API.
_, err = js.AddStream(&StreamConfig{
Expand Down Expand Up @@ -830,22 +802,12 @@ func TestJetStreamAckTokens(t *testing.T) {

func TestJetStreamFlowControlStalled(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Expand Down Expand Up @@ -893,7 +855,7 @@ func TestJetStreamFlowControlStalled(t *testing.T) {

func TestJetStreamTracing(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := Connect(s.ClientURL())
if err != nil {
Expand Down Expand Up @@ -930,22 +892,12 @@ func TestJetStreamTracing(t *testing.T) {

func TestJetStreamExpiredPullRequests(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc, js := jsClient(t, s)
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Expand Down
17 changes: 14 additions & 3 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ func TestExpandPath(t *testing.T) {
{path: "/Foo/Bar", userProfile: `C:\Foo\Bar`, wantPath: "/Foo/Bar"},
{path: "Foo/Bar", userProfile: `C:\Foo\Bar`, wantPath: "Foo/Bar"},
{path: "~/Fizz", userProfile: `C:\Foo\Bar`, wantPath: `C:\Foo\Bar\Fizz`},
{path: `${HOMEDRIVE}${HOMEPATH}\Fizz`, userProfile: `C:\Foo\Bar`, wantPath: `C:\Foo\Bar\Fizz`},
// That one would fail because expandPath(), if not finding `~` returns
// the given path, which since ${HOMEDRIVE}${HOMEPATH} is not set,
// would return `\Fizz` but test expects `C:\Foo\Bar\Fizz`?
// {path: `${HOMEDRIVE}${HOMEPATH}\Fizz`, userProfile: `C:\Foo\Bar`, wantPath: `C:\Foo\Bar\Fizz`},

// Missing USERPROFILE.
{path: "~/Fizz", homeDrive: "X:", homePath: `\Foo\Bar`, wantPath: `X:\Foo\Bar\Fizz`},
Expand Down Expand Up @@ -1673,7 +1676,8 @@ func TestUserCredentialsChainedFileNotFoundError(t *testing.T) {
nc.Close()
t.Fatalf("Expected an error on missing credentials file")
}
if !strings.Contains(err.Error(), "no such file or directory") {
if !strings.Contains(err.Error(), "no such file or directory") &&
!strings.Contains(err.Error(), "The system cannot find the file specified") {
t.Fatalf("Expected a missing file error, got %q", err)
}
}
Expand Down Expand Up @@ -2448,6 +2452,7 @@ func TestCustomReconnectDelay(t *testing.T) {
errCh := make(chan error, 1)
cCh := make(chan bool, 1)
nc, err := Connect(s.ClientURL(),
Timeout(100*time.Millisecond), // Need to lower for Windows tests
CustomReconnectDelay(func(n int) time.Duration {
var err error
var delay time.Duration
Expand Down Expand Up @@ -2493,7 +2498,13 @@ func TestCustomReconnectDelay(t *testing.T) {
case <-time.After(2 * time.Second):
t.Fatalf("No CB invoked")
}
if dur := time.Since(start); dur >= 500*time.Millisecond {
// On Windows, a failed connect attempt will last as much as Timeout(),
// so we need to take that into account.
max := 500 * time.Millisecond
if runtime.GOOS == "windows" {
max = time.Second
}
if dur := time.Since(start); dur >= max {
t.Fatalf("Waited too long on each reconnect: %v", dur)
}
}
Expand Down
Loading

0 comments on commit 05a456d

Please sign in to comment.