Skip to content

Commit

Permalink
Fix race conditions in ManagerV2 tests
Browse files Browse the repository at this point in the history
This commit fixes a number of race conditions in the ManagerV2 unit
tests. Most of them were due to the use of Testify's Eventually
function to read some values while some callbacks from the manager
would also modify those values. The simplest solution was to use the
atomic values on those cases.

One test (TestInputReload) had a race condition between the test and
the manager itself, so it was removed. There is an integration tests
that covers the same functionality.
  • Loading branch information
belimawr committed Aug 2, 2023
1 parent c7d5920 commit d34c645
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 223 deletions.
208 changes: 0 additions & 208 deletions x-pack/libbeat/management/input_reload_test.go

This file was deleted.

31 changes: 16 additions & 15 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,44 +35,44 @@ func TestManagerV2(t *testing.T) {
inputs := &reloadableList{}
r.MustRegisterInput(inputs)

configsSet := false
configsCleared := false
logLevelSet := false
fqdnEnabled := false
allStopped := false
configsSet := atomic.Bool{}
configsCleared := atomic.Bool{}
logLevelSet := atomic.Bool{}
fqdnEnabled := atomic.Bool{}
allStopped := atomic.Bool{}
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx == 1 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg != nil && len(iCfgs) == 3 {
configsSet = true
configsSet.Store(true)
t.Logf("output and inputs configuration set")
}
} else if currentIdx == 2 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil || len(iCfgs) != 3 {
// should not happen (config no longer set)
configsSet = false
configsSet.Store(false)
t.Logf("output and inputs configuration cleared (should not happen)")
}
} else {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil && len(iCfgs) == 0 {
configsCleared = true
configsCleared.Store(true)
}
if len(observed.Units) == 0 {
allStopped = true
allStopped.Store(true)
t.Logf("output and inputs configuration cleared (stopping)")
}
}
if logp.GetLevel() == zapcore.DebugLevel {
logLevelSet = true
logLevelSet.Store(true)
t.Logf("debug log level set")
}

fqdnEnabled = features.FQDN()
fqdnEnabled.Store(features.FQDN())
t.Logf("FQDN feature flag set to %v", fqdnEnabled)
}

Expand Down Expand Up @@ -218,7 +219,7 @@ func TestManagerV2(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped
return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load()
}, 15*time.Second, 300*time.Millisecond)
}

Expand All @@ -242,7 +243,7 @@ func TestOutputError(t *testing.T) {
}
r.MustRegisterInput(inputs)

stateReached := false
stateReached := atomic.Bool{}
units := []*proto.UnitExpected{
{
Id: "output-unit",
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestOutputError(t *testing.T) {
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if DoesStateMatch(observed, desiredState, 0) {
stateReached = true
stateReached.Store(true)
}
return &proto.CheckinExpected{
Units: units,
Expand Down Expand Up @@ -345,7 +346,7 @@ func TestOutputError(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return stateReached
return stateReached.Load()
}, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached")
}

Expand Down

0 comments on commit d34c645

Please sign in to comment.