Skip to content

Commit

Permalink
hotfix: rebalance should not shut down processor
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Dec 7, 2021
1 parent d56e4e5 commit 9827bf2
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 1 deletion.
2 changes: 1 addition & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (g *Processor) rebalanceLoop(ctx context.Context) (rerr error) {

select {
case <-time.After(5 * time.Second):
case <-sessionCtx.Done():
case <-ctx.Done():
return nil
}
}
Expand Down
112 changes: 112 additions & 0 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,118 @@ func TestRebalance(t *testing.T) {
test.AssertNil(t, errg.Wait().ErrorOrNil())
}

// TestRebalanceSharePartitions runs two processors one after each other
// and asserts that they rebalance partitions appropriately
func TestRebalanceSharePartitions(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-rebalance-share-partitions"
inputStream string = string(group) + "-input"
numPartitions = 20
)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, numPartitions)
test.AssertNil(t, err)

// start an emitter
cancelEmit, emitDone := runWithContext(func(ctx context.Context) error {
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64))
test.AssertNil(t, err)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
defer em.Finish()
for i := 0; ; i++ {
select {
case <-ticker.C:
_, err := em.Emit(fmt.Sprintf("%d", i%100), int64(i))
if err != nil {
return nil
}
case <-ctx.Done():
return nil
}
}
})

createProc := func() *goka.Processor {
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
goka.Persist(new(codec.Int64)),
),
goka.WithRecoverAhead(),
goka.WithHotStandby(),
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
goka.WithStorageBuilder(storage.MemoryBuilder()),
)

test.AssertNil(t, err)
return proc
}

activePassivePartitions := func(stats *goka.ProcessorStats) (active, passive int) {
for _, part := range stats.Group {
if part.TableStats != nil && part.TableStats.RunMode == 0 {
active++
} else {
passive++
}
}
return
}

p1, cancelP1, p1Done := runProc(createProc())
pollTimed(t, "p1 started", 10, p1.Recovered)

// p1 has all active partitions
p1Stats := p1.Stats()
p1Active, p1Passive := activePassivePartitions(p1Stats)
test.AssertEqual(t, p1Active, numPartitions)
test.AssertEqual(t, p1Passive, 0)

p2, cancelP2, p2Done := runProc(createProc())
pollTimed(t, "p2 started", 10, p2.Recovered)
pollTimed(t, "p1 still running", 10, p1.Recovered)

// now p1 and p2 share the partitions
p2Stats := p2.Stats()
p2Active, p2Passive := activePassivePartitions(p2Stats)
test.AssertEqual(t, p2Active, numPartitions/2)
test.AssertEqual(t, p2Passive, numPartitions/2)
p1Stats = p1.Stats()
p1Active, p1Passive = activePassivePartitions(p1Stats)
test.AssertEqual(t, p1Active, numPartitions/2)
test.AssertEqual(t, p1Passive, numPartitions/2)

// p1 is down
cancelP1()
test.AssertTrue(t, <-p1Done == nil)

// p2 should have all partitions
pollTimed(t, "p2 has all partitions", 10, func() bool {
p2Stats = p2.Stats()
p2Active, p2Passive := activePassivePartitions(p2Stats)
return p2Active == numPartitions && p2Passive == 0
})

cancelP2()
test.AssertTrue(t, <-p2Done == nil)

// stop emitter
cancelEmit()
test.AssertTrue(t, <-emitDone == nil)
}

func TestCallbackFail(t *testing.T) {
t.Parallel()
brokers := initSystemTest(t)
Expand Down
13 changes: 13 additions & 0 deletions systemtest/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ func pollTimed(t *testing.T, what string, secTimeout float64, pollers ...func()
t.Fatalf("waiting for %s timed out", what)
}

// runWithContext runs a context-aware function in another go-routine and returns a function to
// cancel the context and an error channel.
func runWithContext(fun func(ctx context.Context) error) (context.CancelFunc, chan error) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
defer close(done)
done <- fun(ctx)
}()

return cancel, done
}

// runProc runs a processor in a go-routine and returns it along with the cancel-func and an error-channel being closed
// when the processor terminates (with an error that might have been returned)
func runProc(proc *goka.Processor) (*goka.Processor, context.CancelFunc, chan error) {
Expand Down

0 comments on commit 9827bf2

Please sign in to comment.