Skip to content

Commit

Permalink
Merge pull request prometheus#2591 from prometheus/beorn7/storage
Browse files Browse the repository at this point in the history
storage: Several optimizations of checkpointing
  • Loading branch information
beorn7 authored Apr 7, 2017
2 parents cffb1ac + f20b84e commit acd72ae
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 25 deletions.
4 changes: 2 additions & 2 deletions cmd/prometheus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ func init() {
)
cfg.fs.DurationVar(
&cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute,
"The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.",
"The time to wait between checkpoints of in-memory metrics and chunks not yet persisted to series files. Note that a checkpoint is never triggered before at least as much time has passed as the last checkpoint took.",
)
cfg.fs.IntVar(
&cfg.storage.CheckpointDirtySeriesLimit, "storage.local.checkpoint-dirty-series-limit", 5000,
"If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.",
"If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints. Also note that a checkpoint is never triggered before at least as much time has passed as the last checkpoint took.",
)
cfg.fs.Var(
&cfg.storage.SyncStrategy, "storage.local.series-sync-strategy",
Expand Down
19 changes: 16 additions & 3 deletions storage/local/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package local

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -626,7 +627,9 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
// NOTE: Above, varint encoding is used consistently although uvarint would have
// made more sense in many cases. This was simply a glitch while designing the
// format.
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
func (p *persistence) checkpointSeriesMapAndHeads(
ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker,
) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...")
p.checkpointing.Set(1)
defer p.checkpointing.Set(0)
Expand All @@ -637,11 +640,16 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}

defer func() {
syncErr := f.Sync()
closeErr := f.Close()
defer os.Remove(p.headsTempFileName()) // Just in case it was left behind.

if err != nil {
// If we already had an error, do not bother to sync,
// just close, ignoring any further error.
f.Close()
return
}
syncErr := f.Sync()
closeErr := f.Close()
err = syncErr
if err != nil {
return
Expand Down Expand Up @@ -683,6 +691,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap

var realNumberOfSeries uint64
for m := range iter {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
func() { // Wrapped in function to use defer for unlocking the fp.
fpLocker.Lock(m.fp)
defer fpLocker.Unlock(m.fp)
Expand Down
24 changes: 23 additions & 1 deletion storage/local/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package local

import (
"bufio"
"context"
"errors"
"os"
"path/filepath"
Expand Down Expand Up @@ -547,6 +548,27 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1)
}

func TestCancelCheckpoint(t *testing.T) {
p, closer := newTestPersistence(t, 2)
defer closer.Close()

fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s, _ := newMemorySeries(m1, nil, time.Time{})
sm.put(m1.FastFingerprint(), s)
sm.put(m2.FastFingerprint(), s)
sm.put(m3.FastFingerprint(), s)
sm.put(m4.FastFingerprint(), s)
sm.put(m5.FastFingerprint(), s)

ctx, cancel := context.WithCancel(context.Background())
// Cancel right now to avoid races.
cancel()
if err := p.checkpointSeriesMapAndHeads(ctx, sm, fpLocker); err != context.Canceled {
t.Fatalf("expected error %v, got %v", context.Canceled, err)
}
}

func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
Expand Down Expand Up @@ -584,7 +606,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
sm.put(m4.FastFingerprint(), s4)
sm.put(m5.FastFingerprint(), s5)

if err := p.checkpointSeriesMapAndHeads(sm, fpLocker); err != nil {
if err := p.checkpointSeriesMapAndHeads(context.Background(), sm, fpLocker); err != nil {
t.Fatal(err)
}

Expand Down
73 changes: 54 additions & 19 deletions storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ func (s *MemorySeriesStorage) Stop() error {
<-s.evictStopped

// One final checkpoint of the series map and the head chunks.
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil {
if err := s.persistence.checkpointSeriesMapAndHeads(
context.Background(), s.fpToSeries, s.fpLocker,
); err != nil {
return err
}
if err := s.mapper.checkpoint(); err != nil {
Expand Down Expand Up @@ -1421,44 +1423,71 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing

func (s *MemorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)
checkpointMinTimer := time.NewTimer(0)

var dirtySeriesCount int64

defer func() {
checkpointTimer.Stop()
checkpointMinTimer.Stop()
log.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()

memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints()

checkpointCtx, checkpointCancel := context.WithCancel(context.Background())
checkpointNow := make(chan struct{}, 1)

doCheckpoint := func() time.Duration {
start := time.Now()
// We clear this before the checkpoint so that dirtySeriesCount
// is an upper bound.
atomic.StoreInt64(&dirtySeriesCount, 0)
s.dirtySeries.Set(0)
select {
case <-checkpointNow:
// Signal cleared.
default:
// No signal pending.
}
err := s.persistence.checkpointSeriesMapAndHeads(
checkpointCtx, s.fpToSeries, s.fpLocker,
)
if err == context.Canceled {
log.Info("Checkpoint canceled.")
} else if err != nil {
s.persistErrors.Inc()
log.Errorln("Error while checkpointing:", err)
}
return time.Since(start)
}

// Checkpoints can happen concurrently with maintenance so even with heavy
// checkpointing there will still be sufficient progress on maintenance.
checkpointLoopStopped := make(chan struct{})
go func() {
for {
select {
case <-s.loopStopping:
case <-checkpointCtx.Done():
checkpointLoopStopped <- struct{}{}
return
case <-checkpointTimer.C:
// We clear this before the checkpoint so that dirtySeriesCount
// is an upper bound.
atomic.StoreInt64(&dirtySeriesCount, 0)
s.dirtySeries.Set(0)
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
if err != nil {
s.persistErrors.Inc()
log.Errorln("Error while checkpointing:", err)
}
// If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a
// time is lurking in C leading to repeated checkpointing without break.
case <-checkpointMinTimer.C:
var took time.Duration
select {
case <-checkpointTimer.C: // Get rid of the lurking time.
default:
case <-checkpointCtx.Done():
checkpointLoopStopped <- struct{}{}
return
case <-checkpointTimer.C:
took = doCheckpoint()
case <-checkpointNow:
if !checkpointTimer.Stop() {
<-checkpointTimer.C
}
took = doCheckpoint()
}
checkpointMinTimer.Reset(took)
checkpointTimer.Reset(s.checkpointInterval)
}
}
Expand All @@ -1468,6 +1497,7 @@ loop:
for {
select {
case <-s.loopStopping:
checkpointCancel()
break loop
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
Expand All @@ -1478,10 +1508,15 @@ loop:
// would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the urgency score is < 1.
// quickly as possible. So only checkpoint if we are not in rushed mode.
if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
dirty >= int64(s.checkpointDirtySeriesLimit) {
checkpointTimer.Reset(0)
select {
case checkpointNow <- struct{}{}:
// Signal sent.
default:
// Signal already pending.
}
}
}
case fp := <-archivedFingerprints:
Expand Down

0 comments on commit acd72ae

Please sign in to comment.