Skip to content

Commit

Permalink
Add various persistence related metrics (prometheus#2333)
Browse files Browse the repository at this point in the history
Add metrics around checkpointing and persistence

* Add a metric to say if checkpointing is happening,
and another to track total checkpoint time and count.

This breaks the existing prometheus_local_storage_checkpoint_duration_seconds
by renaming it to prometheus_local_storage_checkpoint_last_duration_seconds
as the former name is more appropriate for a summary.

* Add metric for last checkpoint size.

* Add metric for series/chunks processed by checkpoints.

For long checkpoints it'd be useful to see how they're progressing.

* Add metric for dirty series

* Add metric for number of chunks persisted per series.

You can get the number of chunks from chunk_ops,
but not the matching number of series. This helps determine
the size of the writes being made.

* Add metric for chunks queued for persistence

Chunks created includes both chunks that'll need persistence
and chunks read in for queries. This only includes chunks created
for persistence.

* Code review comments on new persistence metrics.
  • Loading branch information
brian-brazil authored Jan 11, 2017
1 parent 6ce9783 commit 1dcb763
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 12 deletions.
2 changes: 1 addition & 1 deletion consoles/prometheus-overview.html
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</tr>
<tr>
<td>Checkpoint Duration</td>
<td>{{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }}</td>
<td>{{ template "prom_query_drilldown" (args (printf "prometheus_local_storage_checkpoint_last_duration_seconds{job='prometheus',instance='%s'}" .Params.instance) "" "humanizeDuration") }}</td>
</tr>

<tr>
Expand Down
83 changes: 72 additions & 11 deletions storage/local/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,18 @@ type persistence struct {
indexingStopped chan struct{}
indexingFlush chan chan int

indexingQueueLength prometheus.Gauge
indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary
indexingBatchDuration prometheus.Summary
checkpointDuration prometheus.Gauge
dirtyCounter prometheus.Counter
startedDirty prometheus.Gauge
indexingQueueLength prometheus.Gauge
indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary
indexingBatchDuration prometheus.Summary
checkpointDuration prometheus.Summary
checkpointLastDuration prometheus.Gauge
checkpointLastSize prometheus.Gauge
checkpointChunksWritten prometheus.Summary
dirtyCounter prometheus.Counter
startedDirty prometheus.Gauge
checkpointing prometheus.Gauge
seriesChunksPersisted prometheus.Histogram

dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state.
Expand Down Expand Up @@ -247,11 +252,31 @@ func newPersistence(
Help: "Quantiles for batch indexing duration in seconds.",
},
),
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_duration_seconds",
Help: "The duration in seconds it took to checkpoint in-memory metrics and head chunks.",
Name: "checkpoint_last_duration_seconds",
Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
}),
checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_duration_seconds",
Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
}),
checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_last_size_bytes",
Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted",
}),
checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_series_chunks_written",
Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
}),
dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Expand All @@ -265,6 +290,21 @@ func newPersistence(
Name: "started_dirty",
Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
}),
checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpointing",
Help: "1 if the storage is checkpointing, 0 otherwise.",
}),
seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_chunks_persisted",
Help: "The number of chunks persisted per series.",
// Even with 4 bytes per sample, you're not going to get more than 85
// chunks in 6 hours for a time series with 1s resolution.
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
}),
dirty: dirty,
pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath,
Expand Down Expand Up @@ -310,8 +350,13 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
p.indexingBatchSizes.Describe(ch)
p.indexingBatchDuration.Describe(ch)
ch <- p.checkpointDuration.Desc()
ch <- p.checkpointLastDuration.Desc()
ch <- p.checkpointLastSize.Desc()
ch <- p.checkpointChunksWritten.Desc()
ch <- p.checkpointing.Desc()
ch <- p.dirtyCounter.Desc()
ch <- p.startedDirty.Desc()
ch <- p.seriesChunksPersisted.Desc()
}

// Collect implements prometheus.Collector.
Expand All @@ -323,8 +368,13 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
p.indexingBatchSizes.Collect(ch)
p.indexingBatchDuration.Collect(ch)
ch <- p.checkpointDuration
ch <- p.checkpointLastDuration
ch <- p.checkpointLastSize
ch <- p.checkpointChunksWritten
ch <- p.checkpointing
ch <- p.dirtyCounter
ch <- p.startedDirty
ch <- p.seriesChunksPersisted
}

// isDirty returns the dirty flag in a goroutine-safe way.
Expand Down Expand Up @@ -559,6 +609,8 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
//
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...")
p.checkpointing.Set(1)
defer p.checkpointing.Set(0)
begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
Expand All @@ -581,7 +633,8 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
err = os.Rename(p.headsTempFileName(), p.headsFileName())
duration := time.Since(begin)
p.checkpointDuration.Set(duration.Seconds())
p.checkpointDuration.Observe(duration.Seconds())
p.checkpointLastDuration.Set(duration.Seconds())
log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
}()

Expand Down Expand Up @@ -677,6 +730,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return
}
}
p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark))
}
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series
Expand Down Expand Up @@ -704,6 +758,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
return err
}
}
info, err := f.Stat()
if err != nil {
return err
}
p.checkpointLastSize.Set(float64(info.Size()))
return err
}

Expand Down Expand Up @@ -1520,6 +1579,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
// would only put back the original buf.
p.bufPool.Put(b)
}()
numChunks := len(chunks)

for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] {
if batchSize > len(chunks) {
Expand All @@ -1543,6 +1603,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
return err
}
}
p.seriesChunksPersisted.Observe(float64(numChunks))
return nil
}

Expand Down
23 changes: 23 additions & 0 deletions storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ type MemorySeriesStorage struct {
quarantineStopping, quarantineStopped chan struct{}

persistErrors prometheus.Counter
queuedChunksToPersist prometheus.Counter
numSeries prometheus.Gauge
dirtySeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
discardedSamplesCount *prometheus.CounterVec
Expand Down Expand Up @@ -240,12 +242,24 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.",
}),
queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queued_chunks_to_persist_total",
Help: "The total number of chunks queued for persistence.",
}),
numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
}),
dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_dirty_series",
Help: "The current number of series that would require a disk seek during crash recovery.",
}),
seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -1260,6 +1274,7 @@ loop:
log.Errorln("Error while checkpointing:", err)
} else {
dirtySeriesCount = 0
s.dirtySeries.Set(0)
}
// If a checkpoint takes longer than checkpointInterval, unluckily timed
// combination with the Reset(0) call below can lead to a case where a
Expand All @@ -1272,6 +1287,7 @@ loop:
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
dirtySeriesCount++
s.dirtySeries.Inc()
// Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint
// would be counterproductive, as it would slow down chunk persisting even more,
Expand Down Expand Up @@ -1531,6 +1547,9 @@ func (s *MemorySeriesStorage) getNumChunksToPersist() int {
// negative 'by' to decrement.
func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.numChunksToPersist, int64(by))
if by > 0 {
s.queuedChunksToPersist.Add(float64(by))
}
}

// calculatePersistenceUrgencyScore calculates and returns an urgency score for
Expand Down Expand Up @@ -1734,9 +1753,11 @@ func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.mapper.Describe(ch)

ch <- s.persistErrors.Desc()
ch <- s.queuedChunksToPersist.Desc()
ch <- maxChunksToPersistDesc
ch <- numChunksToPersistDesc
ch <- s.numSeries.Desc()
ch <- s.dirtySeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
s.discardedSamplesCount.Describe(ch)
Expand All @@ -1753,6 +1774,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.mapper.Collect(ch)

ch <- s.persistErrors
ch <- s.queuedChunksToPersist
ch <- prometheus.MustNewConstMetric(
maxChunksToPersistDesc,
prometheus.GaugeValue,
Expand All @@ -1764,6 +1786,7 @@ func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
float64(s.getNumChunksToPersist()),
)
ch <- s.numSeries
ch <- s.dirtySeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
s.discardedSamplesCount.Collect(ch)
Expand Down

0 comments on commit 1dcb763

Please sign in to comment.