Skip to content

Commit

Permalink
Allow checkpoints and maintenance to happen concurrently. (prometheus…
Browse files Browse the repository at this point in the history
…#2321)

This is essential on larger Prometheus servers, as otherwise
checkpoints prevent sufficient persisting of chunks to disk.
  • Loading branch information
brian-brazil authored Jan 13, 2017
1 parent 1dcb763 commit f64c231
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing
func (s *MemorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)

dirtySeriesCount := 0
var dirtySeriesCount int64

defer func() {
checkpointTimer.Stop()
Expand All @@ -1263,38 +1263,52 @@ func (s *MemorySeriesStorage) loop() {
memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints()

// Checkpoints can happen concurrently with maintenance so even with heavy
// checkpointing there will still be sufficient progress on maintenance.
checkpointLoopStopped := make(chan struct{})
go func() {
for {
select {
case <-s.loopStopping:
checkpointLoopStopped <- struct{}{}
return
case <-checkpointTimer.C:
// We clear this before the checkpoint so that dirtySeriesCount
// is an upper bound.
atomic.StoreInt64(&dirtySeriesCount, 0)
s.dirtySeries.Set(0)
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
if err != nil {
log.Errorln("Error while checkpointing:", err)
}
// If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a
// time is lurking in C leading to repeated checkpointing without break.
select {
case <-checkpointTimer.C: // Get rid of the lurking time.
default:
}
checkpointTimer.Reset(s.checkpointInterval)
}
}
}()

loop:
for {
select {
case <-s.loopStopping:
break loop
case <-checkpointTimer.C:
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
if err != nil {
log.Errorln("Error while checkpointing:", err)
} else {
dirtySeriesCount = 0
s.dirtySeries.Set(0)
}
// If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a
// time is lurking in C leading to repeated checkpointing without break.
select {
case <-checkpointTimer.C: // Get rid of the lurking time.
default:
}
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
dirtySeriesCount++
s.dirtySeries.Inc()
dirty := atomic.AddInt64(&dirtySeriesCount, 1)
s.dirtySeries.Set(float64(dirty))
// Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint
// would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the urgency score is < 1.
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
if dirty >= int64(s.checkpointDirtySeriesLimit) &&
s.calculatePersistenceUrgencyScore() < 1 {
checkpointTimer.Reset(0)
}
Expand All @@ -1308,6 +1322,7 @@ loop:
}
for range archivedFingerprints {
}
<-checkpointLoopStopped
}

// maintainMemorySeries maintains a series that is in memory (i.e. not
Expand Down

0 comments on commit f64c231

Please sign in to comment.