Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add recover-forever option #464

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type poptions struct {
backoffResetTime time.Duration
hotStandby bool
recoverAhead bool
recoverForever bool
producerDefaultHeaders Headers

builders struct {
Expand Down Expand Up @@ -274,6 +275,16 @@ func WithRecoverAhead() ProcessorOption {
}
}

// WithRecoverForever configures the processor to recover joins and the processor table forever, without ever joining a group.
// Using this option is highly experimental and is typically used for cluster-migration-scenarios where mirror-maker cannot synchronize
// the consumer-group-offsets.
// If this option is passed, the processor will never actually recover, so `proc.Recovered()` will always return `false`
func WithRecoverForever() ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
o.recoverForever = true
}
}

// WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
Expand Down
23 changes: 21 additions & 2 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
runModePassive
// the processor only recovers once and then stops. This is used for recover-ahead-option
runModeRecoverOnly
// the processor only recovers forever, never stops. This is used for the RecoverForever option to keep the processor tables up to date
// without actcually processing anything.
runModeRecoverForever
)

type visit struct {
Expand Down Expand Up @@ -211,7 +214,15 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
setupErrg.Go(func() error {
pp.log.Debugf("catching up table")
defer pp.log.Debugf("catching up table done")
return pp.table.SetupAndRecover(setupCtx, false)

err := pp.table.SetupAndRecover(setupCtx, false)
if err != nil {
return err
}
if pp.runMode == runModeRecoverForever {
return pp.table.CatchupForever(setupCtx, false)
}
return nil
})
}

Expand All @@ -229,7 +240,15 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
pp.joins[join.Topic()] = table

setupErrg.Go(func() error {
return table.SetupAndRecover(setupCtx, false)
err := table.SetupAndRecover(setupCtx, false)
if err != nil {
return err
}

if pp.runMode == runModeRecoverForever {
return table.CatchupForever(setupCtx, false)
}
return nil
})
}

Expand Down
26 changes: 21 additions & 5 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
return fmt.Errorf("error waiting for start up tables: %w", err)
}

if ctx.Err() != nil {
g.log.Printf("Shutting down processor before it starts, context was cancelled")
return nil
}

// run the main rebalance-consume-loop
errg.Go(func() error {
return g.rebalanceLoop(ctx)
Expand Down Expand Up @@ -466,17 +471,22 @@ func (g *Processor) waitForStartupTables(ctx context.Context) error {
}
g.mTables.RUnlock()

// If we recover ahead, we'll also start all partition processors once in recover-only-mode
// If we recover ahead (or forever), we'll also start all partition processors once in recover-only-mode (or recover-forever-mode)
// and do the same boilerplate to keep the waitmap up to date.
if g.opts.recoverAhead {
if g.opts.recoverAhead || g.opts.recoverForever {

mode := runModeRecoverOnly
if g.opts.recoverForever {
mode = runModeRecoverForever
}
partitions, err := g.findStatefulPartitions()
if err != nil {
return fmt.Errorf("error finding dependent partitions: %w", err)
}
for _, part := range partitions {
part := part
pproc, err := g.createPartitionProcessor(ctx, part, runModeRecoverOnly, func(msg *message, meta string) {
panic("a partition processor in recover-only-mode never commits a message")
pproc, err := g.createPartitionProcessor(ctx, part, mode, func(msg *message, meta string) {
panic("a partition processor in recover-only-mode/recover-forever-mode never commits a message")
})
if err != nil {
return fmt.Errorf("Error creating partition processor for recover-ahead %s/%d: %v", g.Graph().Group(), part, err)
Expand Down Expand Up @@ -517,7 +527,13 @@ func (g *Processor) waitForStartupTables(ctx context.Context) error {
select {
// the context has closed, no point in waiting
case <-ctx.Done():
g.log.Debugf("Stopping to wait for views to get up, context closed")
g.log.Debugf("Stopping to wait for tables to get up, context closed")

// if we're in recover-forever-mode, this is the normal way of shutdown, so no error here.
if g.opts.recoverForever {
return nil
}

return fmt.Errorf("context closed while waiting for startup tables to become ready")

// the error group is done, which means
Expand Down
143 changes: 137 additions & 6 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -241,9 +242,7 @@ func TestRecoverAhead(t *testing.T) {
return proc2.Run(ctx)
})

pollTimed(t, "procs 1&2 recovered", func() bool {
return true
}, proc1.Recovered, proc2.Recovered)
pollTimed(t, "procs 1&2 recovered", proc1.Recovered, proc2.Recovered)

// check the storages that were initalized by the processors:
// both have each 2 storages, because all tables only have 1 partition
Expand All @@ -259,9 +258,6 @@ func TestRecoverAhead(t *testing.T) {
// wait until the keys are present
pollTimed(t, "key-values are present",

func() bool {
return true
},
func() bool {
has, _ := tableStorage1.Has("key1")
return has
Expand Down Expand Up @@ -297,6 +293,141 @@ func TestRecoverAhead(t *testing.T) {
require.NoError(t, errg.Wait().ErrorOrNil())
}

func TestRecoverForever(t *testing.T) {
brokers := initSystemTest(t)
var (
group = goka.Group(fmt.Sprintf("goka-systemtest-recoverforever-%d", time.Now().Unix()))
inputStream = fmt.Sprintf("%s-input", group)
table = string(goka.GroupTable(group))
)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
goka.ReplaceGlobalConfig(cfg)

tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
require.NoError(t, err)

err = tm.EnsureStreamExists(inputStream, 1)
require.NoError(t, err)
err = tm.EnsureTableExists(string(table), 1)
require.NoError(t, err)

// emit something into the state table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
tableEmitter, err := goka.NewEmitter(brokers, goka.Stream(table), new(codec.String))
require.NoError(t, err)
require.NoError(t, tableEmitter.EmitSync("key1", "tableval1"))
require.NoError(t, tableEmitter.Finish())

// emit an input-message
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
require.NoError(t, err)
require.NoError(t, inputEmitter.EmitSync("key1", "input-value"))
require.NoError(t, inputEmitter.Finish())

storageTracker := newStorageTracker()

var (
processed atomic.Int64
itemsRecovered atomic.Int64
)

createProc := func(recoverForever bool) *goka.Processor {
opts := []goka.ProcessorOption{

goka.WithUpdateCallback(func(ctx goka.UpdateContext, s storage.Storage, key string, value []byte) error {
itemsRecovered.Add(1)
return goka.DefaultUpdate(ctx, s, key, value)
}),
goka.WithStorageBuilder(storageTracker.Build),
}
if recoverForever {
opts = append(opts, goka.WithRecoverForever())
}
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
processed.Add(1)
}),
goka.Persist(new(codec.String)),
),
opts...,
)
require.NoError(t, err)
return proc
}

proc1 := createProc(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errg, ctx := multierr.NewErrGroup(ctx)

errg.Go(func() error {
return proc1.Run(ctx)
})

pollTimed(t, "procs 1 storage initialized", func() bool {
return len(storageTracker.storages) == 1
})

// get the corresponding storages for both table and join-partitions
tableStorage1 := storageTracker.storages[storageTracker.key(string(table), 0)]

// wait until the keys are present
pollTimed(t, "key-values are present",
func() bool {
has, _ := tableStorage1.Has("key1")
return has
},
)

// check the table-values
val1, _ := tableStorage1.Get("key1")
require.Equal(t, "tableval1", string(val1))

// stop everything and wait until it's shut down
cancel()
require.NoError(t, errg.Wait().ErrorOrNil())

require.EqualValues(t, 0, processed.Load())
require.EqualValues(t, 1, itemsRecovered.Load())

// run processor a second time, without recover forever
itemsRecovered.Store(0)
proc2 := createProc(false)
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
errg, ctx = multierr.NewErrGroup(ctx)
errg.Go(func() error {
return proc2.Run(ctx)
})

pollTimed(t, "procs 2 storage initialized", func() bool {
return len(storageTracker.storages) == 1
})

pollTimed(t, "procs recovered", proc2.Recovered)

// wait until the input is actually processed
pollTimed(t, "input processed", func() bool {
return processed.Load() == 1
})

// at this point we know the processor started, recovered, and consumed the one message in the input-table.
// Now make sure the processor did not recover again (because it did already in the first run)
require.EqualValues(t, 0, itemsRecovered.Load())

// stop everything and wait until it's shut down
cancel()
require.NoError(t, errg.Wait().ErrorOrNil())
}

// TestRebalance runs some processors to test rebalance. It's merely a
// runs-without-errors test, not a real functional test.
func TestRebalance(t *testing.T) {
Expand Down
Loading