Skip to content

Commit

Permalink
Deadlock detector to protect from Kafka driver (sarama-cluster) insta…
Browse files Browse the repository at this point in the history
…bility (#1087)

- Monitors the rate of messages consumed every minute. 
- If a single partition hasn't made any progress, close the partition to trigger a rebalance.
- If all partitions are not consuming any messages, issue a panic. This should cause the service management system (k8s, marathon, etc) to reschedule the container. No messages should be lost.
vprithvi authored Oct 9, 2018
1 parent a429d78 commit 7105fa9
Showing 5 changed files with 382 additions and 14 deletions.
46 changes: 35 additions & 11 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ package consumer

import (
"sync"
"time"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
@@ -42,6 +43,8 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
}

@@ -52,17 +55,20 @@ type consumerState struct {

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, time.Minute)
return &Consumer{
metricsFactory: params.Factory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
}, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
go func() {
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
@@ -73,6 +79,7 @@ func (c *Consumer) Start() {
// to the cleanup process not completing
p.wg.Wait()
}
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
@@ -86,6 +93,7 @@ func (c *Consumer) Close() error {
c.closePartition(p.partitionConsumer)
p.wg.Wait()
}
c.deadlockDetector.close()
c.logger.Info("Closing parent consumer")
return c.internalConsumer.Close()
}
@@ -97,27 +105,43 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
defer c.closePartition(pc)

msgMetrics := c.newMsgMetrics(pc.Partition())

var msgProcessor processor.SpanProcessor

for msg := range pc.Messages() {
c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition())
defer deadlockDetector.close()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1)
defer msgProcessor.Close()
}
for {
select {
case msg, ok := <-pc.Messages():
if !ok {
c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition()))
return
}
c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1)
defer msgProcessor.Close()
}

msgProcessor.Process(&saramaMessageWrapper{msg})

msgProcessor.Process(&saramaMessageWrapper{msg})
case <-deadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition()))
return
}
}
c.logger.Info("Finished handling messages", zap.Int32("partition", pc.Partition()))
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

20 changes: 17 additions & 3 deletions cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
@@ -30,8 +30,17 @@ type errMetrics struct {
errCounter metrics.Counter
}

type partitionMetrics struct {
startCounter metrics.Counter
closeCounter metrics.Counter
}

func (c *Consumer) namespace(partition int32) metrics.Factory {
return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
}

func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
f := c.namespace(partition)
return msgMetrics{
counter: f.Counter("messages", nil),
offsetGauge: f.Gauge("current-offset", nil),
@@ -40,7 +49,12 @@ func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
}

func (c *Consumer) newErrMetrics(partition int32) errMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return errMetrics{errCounter: f.Counter("errors", nil)}
return errMetrics{errCounter: c.namespace(partition).Counter("errors", nil)}
}

func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
f := c.namespace(partition)
return partitionMetrics{
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
}
32 changes: 32 additions & 0 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
@@ -94,6 +94,7 @@ func newConsumer(
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
deadlockDetector: newDeadlockDetector(factory, logger, time.Second),

processorFactory: ProcessorFactory{
topic: topic,
@@ -173,6 +174,11 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
Tags: partitionTag,
Value: 0,
})
testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partition-start",
Tags: partitionTag,
Value: 1,
})
}

func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {
@@ -210,3 +216,29 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {

t.Fail()
}

func TestHandleClosePartition(t *testing.T) {
metricsFactory := metrics.NewLocalFactory(0)

mp := &pmocks.SpanProcessor{}
saramaConsumer := smocks.NewConsumer(t, &sarama.Config{})
mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset)
mc.ExpectErrorsDrainedOnClose()
saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset)
require.NoError(t, e)

undertest := newConsumer(metricsFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer))
undertest.deadlockDetector = newDeadlockDetector(metricsFactory, undertest.logger, 200*time.Millisecond)
undertest.Start()
defer undertest.Close()

for i := 0; i < 10; i++ {
undertest.deadlockDetector.allPartitionsDeadlockDetector.incrementMsgCount() // Don't trigger panic on all partitions detector
time.Sleep(100 * time.Millisecond)
c, _ := metricsFactory.Snapshot()
if c["sarama-consumer.partition-close|partition=316"] == 1 {
return
}
}
assert.Fail(t, "Did not close partition")
}
184 changes: 184 additions & 0 deletions cmd/ingester/app/consumer/deadlock_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"runtime"
"strconv"
"sync/atomic"
"time"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
)

// deadlockDetector monitors the messages consumed and wither signals for the partition to be closed by sending a
// message on closePartition, or triggers a panic if the close fails. It triggers a panic if there are no messages
// consumed across all partitions.
//
// Closing the partition should result in a rebalance, which alleviates the condition. This means that rebalances can
// happen frequently if there is no traffic on the Kafka topic. This shouldn't affect normal operations.
//
// If the message send isn't processed within the next check interval, a panic is issued.This hack relies on a
// container management system (k8s, aurora, marathon, etc) to reschedule
// the dead instance.
//
// This hack protects jaeger-ingester from issues described in https://github.com/jaegertracing/jaeger/issues/1052
//
type deadlockDetector struct {
metricsFactory metrics.Factory
logger *zap.Logger
interval time.Duration
allPartitionsDeadlockDetector *allPartitionsDeadlockDetector
panicFunc func(int32)
}

type partitionDeadlockDetector struct {
msgConsumed *uint64
logger *zap.Logger
partition int32
closePartition chan struct{}
done chan struct{}
incrementAllPartitionMsgCount func()
}

type allPartitionsDeadlockDetector struct {
msgConsumed *uint64
logger *zap.Logger
done chan struct{}
}

func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
panicFunc := func(partition int32) {
metricsFactory.Counter("deadlockdetector.panic-issued", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1)
time.Sleep(time.Second) // Allow time to flush metric

buf := make([]byte, 1<<20)
logger.Panic("No messages processed in the last check interval",
zap.Int32("partition", partition),
zap.String("stack", string(buf[:runtime.Stack(buf, true)])))
}

return deadlockDetector{
metricsFactory: metricsFactory,
logger: logger,
interval: interval,
panicFunc: panicFunc,
}
}

func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partitionDeadlockDetector {
var msgConsumed uint64
w := &partitionDeadlockDetector{
msgConsumed: &msgConsumed,
partition: partition,
closePartition: make(chan struct{}, 1),
done: make(chan struct{}),
logger: s.logger,

incrementAllPartitionMsgCount: func() {
s.allPartitionsDeadlockDetector.incrementMsgCount()
},
}

go s.monitorForPartition(w, partition)

return w
}

func (s *deadlockDetector) monitorForPartition(w *partitionDeadlockDetector, partition int32) {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-w.done:
s.logger.Info("Closing ticker routine", zap.Int32("partition", partition))
return
case <-ticker.C:
if atomic.LoadUint64(w.msgConsumed) == 0 {
select {
case w.closePartition <- struct{}{}:
s.metricsFactory.Counter("deadlockdetector.close-signalled", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1)
s.logger.Warn("Signalling partition close due to inactivity", zap.Int32("partition", partition))
default:
// If closePartition is blocked, the consumer might have deadlocked - kill the process
s.panicFunc(partition)
return // For tests
}
} else {
atomic.StoreUint64(w.msgConsumed, 0)
}
}
}
}

// start monitors that the sum of messages consumed across all partitions is non zero for the given interval
// If it is zero when there are producers producing messages on the topic, it means that sarama-cluster hasn't
// retrieved partition assignments. (This case will not be caught by startMonitoringForPartition because no partitions
// were retrieved).
func (s *deadlockDetector) start() {
var msgConsumed uint64
detector := &allPartitionsDeadlockDetector{
msgConsumed: &msgConsumed,
done: make(chan struct{}),
logger: s.logger,
}

go func() {
s.logger.Debug("Starting global deadlock detector")
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-detector.done:
s.logger.Debug("Closing global ticker routine")
return
case <-ticker.C:
if atomic.LoadUint64(detector.msgConsumed) == 0 {
s.panicFunc(-1)
return // For tests
}
atomic.StoreUint64(detector.msgConsumed, 0)
}
}
}()

s.allPartitionsDeadlockDetector = detector
}

func (s *deadlockDetector) close() {
s.logger.Debug("Closing all partitions deadlock detector")
s.allPartitionsDeadlockDetector.done <- struct{}{}
}

func (s *allPartitionsDeadlockDetector) incrementMsgCount() {
atomic.AddUint64(s.msgConsumed, 1)
}

func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} {
return w.closePartition
}

func (w *partitionDeadlockDetector) close() {
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition))
w.done <- struct{}{}
}

func (w *partitionDeadlockDetector) incrementMsgCount() {
w.incrementAllPartitionMsgCount()
atomic.AddUint64(w.msgConsumed, 1)
}
Loading
Oops, something went wrong.

0 comments on commit 7105fa9

Please sign in to comment.