-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Deadlock detector to protect from Kafka driver (sarama-cluster) insta…
…bility (#1087) - Monitors the rate of messages consumed every minute. - If a single partition hasn't made any progress, close the partition to trigger a rebalance. - If all partitions are not consuming any messages, issue a panic. This should cause the service management system (k8s, marathon, etc) to reschedule the container. No messages should be lost.
- v2.3.0
- v2.2.0
- v2.1.0
- v2.0.0
- v2.0.0-rc2
- v2.0.0-rc1
- v2.0.0-rc0
- v1.66.0
- v1.65.0
- v1.64.0
- v1.63.0
- v1.62.0
- v1.61.0
- v1.60.0
- v1.59.0
- v1.58.1
- v1.58.0
- v1.57.0
- v1.56.0
- v1.55.0
- v1.54.0
- v1.53.0
- v1.52.0
- v1.51.0
- v1.50.0
- v1.49.0
- v1.48.0
- v1.47.0
- v1.46.0
- v1.45.0
- v1.44.0
- v1.43.0
- v1.42.0
- v1.41.0
- v1.40.0
- v1.39.0
- v1.38.1
- v1.38.0
- v1.37.0
- v1.36.0
- v1.35.2
- v1.35.1
- v1.35.0
- v1.34.1
- v1.34.0
- v1.33.0
- v1.32.0
- v1.31.0
- v1.30.0
- v1.29.0
- v1.28.0
- v1.27.0
- v1.26.0
- v1.25.0
- v1.24.0
- v1.23.0
- v1.22.0
- v1.21.0
- v1.20.0
- v1.19.2
- v1.19.1
- v1.19.0
- v1.18.1
- v1.18.0
- v1.17.1
- v1.17.0
- v1.16.0
- v1.15.1
- v1.15.0
- v1.14.0
- v1.13.1
- v1.13.0
- v1.12.0
- v1.11.0
- v1.10.1
- v1.10.0
- v1.9.0
- v1.8.2
- v1.8.1
- v1.8.0
Showing
5 changed files
with
382 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
// Copyright (c) 2018 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package consumer | ||
|
||
import ( | ||
"runtime" | ||
"strconv" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/uber/jaeger-lib/metrics" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// deadlockDetector monitors the messages consumed and wither signals for the partition to be closed by sending a | ||
// message on closePartition, or triggers a panic if the close fails. It triggers a panic if there are no messages | ||
// consumed across all partitions. | ||
// | ||
// Closing the partition should result in a rebalance, which alleviates the condition. This means that rebalances can | ||
// happen frequently if there is no traffic on the Kafka topic. This shouldn't affect normal operations. | ||
// | ||
// If the message send isn't processed within the next check interval, a panic is issued.This hack relies on a | ||
// container management system (k8s, aurora, marathon, etc) to reschedule | ||
// the dead instance. | ||
// | ||
// This hack protects jaeger-ingester from issues described in https://github.com/jaegertracing/jaeger/issues/1052 | ||
// | ||
type deadlockDetector struct { | ||
metricsFactory metrics.Factory | ||
logger *zap.Logger | ||
interval time.Duration | ||
allPartitionsDeadlockDetector *allPartitionsDeadlockDetector | ||
panicFunc func(int32) | ||
} | ||
|
||
type partitionDeadlockDetector struct { | ||
msgConsumed *uint64 | ||
logger *zap.Logger | ||
partition int32 | ||
closePartition chan struct{} | ||
done chan struct{} | ||
incrementAllPartitionMsgCount func() | ||
} | ||
|
||
type allPartitionsDeadlockDetector struct { | ||
msgConsumed *uint64 | ||
logger *zap.Logger | ||
done chan struct{} | ||
} | ||
|
||
func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector { | ||
panicFunc := func(partition int32) { | ||
metricsFactory.Counter("deadlockdetector.panic-issued", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1) | ||
time.Sleep(time.Second) // Allow time to flush metric | ||
|
||
buf := make([]byte, 1<<20) | ||
logger.Panic("No messages processed in the last check interval", | ||
zap.Int32("partition", partition), | ||
zap.String("stack", string(buf[:runtime.Stack(buf, true)]))) | ||
} | ||
|
||
return deadlockDetector{ | ||
metricsFactory: metricsFactory, | ||
logger: logger, | ||
interval: interval, | ||
panicFunc: panicFunc, | ||
} | ||
} | ||
|
||
func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partitionDeadlockDetector { | ||
var msgConsumed uint64 | ||
w := &partitionDeadlockDetector{ | ||
msgConsumed: &msgConsumed, | ||
partition: partition, | ||
closePartition: make(chan struct{}, 1), | ||
done: make(chan struct{}), | ||
logger: s.logger, | ||
|
||
incrementAllPartitionMsgCount: func() { | ||
s.allPartitionsDeadlockDetector.incrementMsgCount() | ||
}, | ||
} | ||
|
||
go s.monitorForPartition(w, partition) | ||
|
||
return w | ||
} | ||
|
||
func (s *deadlockDetector) monitorForPartition(w *partitionDeadlockDetector, partition int32) { | ||
ticker := time.NewTicker(s.interval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-w.done: | ||
s.logger.Info("Closing ticker routine", zap.Int32("partition", partition)) | ||
return | ||
case <-ticker.C: | ||
if atomic.LoadUint64(w.msgConsumed) == 0 { | ||
select { | ||
case w.closePartition <- struct{}{}: | ||
s.metricsFactory.Counter("deadlockdetector.close-signalled", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1) | ||
s.logger.Warn("Signalling partition close due to inactivity", zap.Int32("partition", partition)) | ||
default: | ||
// If closePartition is blocked, the consumer might have deadlocked - kill the process | ||
s.panicFunc(partition) | ||
return // For tests | ||
} | ||
} else { | ||
atomic.StoreUint64(w.msgConsumed, 0) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// start monitors that the sum of messages consumed across all partitions is non zero for the given interval | ||
// If it is zero when there are producers producing messages on the topic, it means that sarama-cluster hasn't | ||
// retrieved partition assignments. (This case will not be caught by startMonitoringForPartition because no partitions | ||
// were retrieved). | ||
func (s *deadlockDetector) start() { | ||
var msgConsumed uint64 | ||
detector := &allPartitionsDeadlockDetector{ | ||
msgConsumed: &msgConsumed, | ||
done: make(chan struct{}), | ||
logger: s.logger, | ||
} | ||
|
||
go func() { | ||
s.logger.Debug("Starting global deadlock detector") | ||
ticker := time.NewTicker(s.interval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-detector.done: | ||
s.logger.Debug("Closing global ticker routine") | ||
return | ||
case <-ticker.C: | ||
if atomic.LoadUint64(detector.msgConsumed) == 0 { | ||
s.panicFunc(-1) | ||
return // For tests | ||
} | ||
atomic.StoreUint64(detector.msgConsumed, 0) | ||
} | ||
} | ||
}() | ||
|
||
s.allPartitionsDeadlockDetector = detector | ||
} | ||
|
||
func (s *deadlockDetector) close() { | ||
s.logger.Debug("Closing all partitions deadlock detector") | ||
s.allPartitionsDeadlockDetector.done <- struct{}{} | ||
} | ||
|
||
func (s *allPartitionsDeadlockDetector) incrementMsgCount() { | ||
atomic.AddUint64(s.msgConsumed, 1) | ||
} | ||
|
||
func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} { | ||
return w.closePartition | ||
} | ||
|
||
func (w *partitionDeadlockDetector) close() { | ||
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition)) | ||
w.done <- struct{}{} | ||
} | ||
|
||
func (w *partitionDeadlockDetector) incrementMsgCount() { | ||
w.incrementAllPartitionMsgCount() | ||
atomic.AddUint64(w.msgConsumed, 1) | ||
} |
Oops, something went wrong.