Skip to content

Commit

Permalink
Merge pull request prometheus#1555 from prometheus/beorn7/cd
Browse files Browse the repository at this point in the history
Checkpoint fingerprint mappings only upon shutdown
  • Loading branch information
beorn7 committed Apr 14, 2016
2 parents f6c2984 + a90d645 commit 096a2ef
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 216 deletions.
55 changes: 29 additions & 26 deletions storage/local/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,20 @@ func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) {
return m, nil
}

// checkpoint persists the current mappings. The caller has to ensure that the
// provided mappings are not changed concurrently. This method is only called
// upon shutdown, when no samples are ingested anymore.
func (m *fpMapper) checkpoint() error {
return m.p.checkpointFPMappings(m.mappings)
}

// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
//
// If an error is encountered, it is returned together with the unchanged raw
// fingerprint.
func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Fingerprint, error) {
func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) model.Fingerprint {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
Expand All @@ -92,7 +99,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge
// FP exists in memory, but is it for the same metric?
if metric.Equal(s.metric) {
// Yupp. We are done.
return fp, nil
return fp
}
// Collision detected!
return m.maybeAddMapping(fp, metric)
Expand All @@ -110,28 +117,30 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge
mappedFP, ok := mappedFPs[ms]
if ok {
// Historical mapping found, return the mapped FP.
return mappedFP, nil
return mappedFP
}
}
// If we are here, FP does not exist in memory and is either not mapped
// at all, or existing mappings for FP are not for m. Check if we have
// something for FP in the archive.
archivedMetric, err := m.p.archivedMetric(fp)
if err != nil {
return fp, err
if err != nil || archivedMetric == nil {
// Either the archive lookup has returend an error, or fp does
// not exist in the archive. In the former case, the storage has
// been marked as dirty already. We just carry on for as long as
// it goes, assuming that fp does not exist. In either case,
// since now we know (or assume) now that fp does not exist,
// neither in memory nor in archive, we can safely keep it
// unmapped.
return fp
}
if archivedMetric != nil {
// FP exists in archive, but is it for the same metric?
if metric.Equal(archivedMetric) {
// Yupp. We are done.
return fp, nil
}
// Collision detected!
return m.maybeAddMapping(fp, metric)
// FP exists in archive, but is it for the same metric?
if metric.Equal(archivedMetric) {
// Yupp. We are done.
return fp
}
// As fp does not exist, neither in memory nor in archive, we can safely
// keep it unmapped.
return fp, nil
// Collision detected!
return m.maybeAddMapping(fp, metric)
}

// maybeAddMapping is only used internally. It takes a detected collision and
Expand All @@ -140,7 +149,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge
func (m *fpMapper) maybeAddMapping(
fp model.Fingerprint,
collidingMetric model.Metric,
) (model.Fingerprint, error) {
) model.Fingerprint {
ms := metricToUniqueString(collidingMetric)
m.mtx.RLock()
mappedFPs, ok := m.mappings[fp]
Expand All @@ -149,35 +158,29 @@ func (m *fpMapper) maybeAddMapping(
// fp is locked by the caller, so no further locking required.
mappedFP, ok := mappedFPs[ms]
if ok {
return mappedFP, nil // Existing mapping.
return mappedFP // Existing mapping.
}
// A new mapping has to be created.
mappedFP = m.nextMappedFP()
mappedFPs[ms] = mappedFP
m.mtx.Lock()
// Checkpoint mappings after each change.
err := m.p.checkpointFPMappings(m.mappings)
m.mtx.Unlock()
log.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
return mappedFP
}
// This is the first collision for fp.
mappedFP := m.nextMappedFP()
mappedFPs = map[string]model.Fingerprint{ms: mappedFP}
m.mtx.Lock()
m.mappings[fp] = mappedFPs
m.mappingsCounter.Inc()
// Checkpoint mappings after each change.
err := m.p.checkpointFPMappings(m.mappings)
m.mtx.Unlock()
log.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
return mappedFP
}

func (m *fpMapper) nextMappedFP() model.Fingerprint {
Expand Down
Loading

0 comments on commit 096a2ef

Please sign in to comment.