Skip to content

Commit

Permalink
Merge pull request prometheus#2005 from redbaron/microoptimise-matching
Browse files Browse the repository at this point in the history
Microoptimise matching
  • Loading branch information
beorn7 authored Oct 5, 2016
2 parents 6e8f87a + e6db9f8 commit 1e2f03f
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 68 deletions.
183 changes: 130 additions & 53 deletions storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,19 +410,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() {

// LastSampleForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
fps := map[model.Fingerprint]struct{}{}
mergedFPs := map[model.Fingerprint]struct{}{}
for _, matchers := range matcherSets {
fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...)
fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...)
if err != nil {
return nil, err
}
for fp := range fpToMetric {
fps[fp] = struct{}{}
for fp := range fps {
mergedFPs[fp] = struct{}{}
}
}

res := make(model.Vector, 0, len(fps))
for fp := range fps {
res := make(model.Vector, 0, len(mergedFPs))
for fp := range mergedFPs {
s.fpLocker.Lock(fp)

series, ok := s.fpToSeries.get(fp)
Expand Down Expand Up @@ -480,13 +480,13 @@ func (bit *boundedIterator) Close() {

// QueryRange implements Storage.
func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
}
iterators := make([]SeriesIterator, 0, len(fpToMetric))
for fp := range fpToMetric {
it := s.preloadChunksForRange(fp, from, through)
iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
for _, pair := range fpSeriesPairs {
it := s.preloadChunksForRange(pair, from, through)
iterators = append(iterators, it)
}
return iterators, nil
Expand All @@ -497,13 +497,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta
from := ts.Add(-stalenessDelta)
through := ts

fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
}
iterators := make([]SeriesIterator, 0, len(fpToMetric))
for fp := range fpToMetric {
it := s.preloadChunksForInstant(fp, from, through)
iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
for _, pair := range fpSeriesPairs {
it := s.preloadChunksForInstant(pair, from, through)
iterators = append(iterators, it)
}
return iterators, nil
Expand Down Expand Up @@ -558,55 +558,55 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
return metrics, nil
}

func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time,
// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked.
func (s *MemorySeriesStorage) candidateFPsForLabelMatchers(
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) {
) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) {
sort.Sort(metric.LabelMatchers(matchers))

if len(matchers) == 0 || matchers[0].MatchesEmptyString() {
// No matchers at all or even the best matcher matches the empty string.
return nil, nil
return nil, nil, nil
}

var (
matcherIdx int
remainingFPs map[model.Fingerprint]struct{}
candidateFPs map[model.Fingerprint]struct{}
)

// Equal matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ {
for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx]
if m.Type != metric.Equal || m.MatchesEmptyString() {
break
}
remainingFPs = s.fingerprintsForLabelPair(
candidateFPs = s.fingerprintsForLabelPair(
model.LabelPair{
Name: m.Name,
Value: m.Value,
},
nil,
remainingFPs,
candidateFPs,
)
if len(remainingFPs) == 0 {
return nil, nil
if len(candidateFPs) == 0 {
return nil, nil, nil
}
}

// Other matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ {
for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx]
if m.MatchesEmptyString() {
break
}

lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name)
if err != nil {
return nil, err
return nil, nil, err
}
lvs = m.Filter(lvs)
if len(lvs) == 0 {
return nil, nil
return nil, nil, nil
}
fps := map[model.Fingerprint]struct{}{}
for _, lv := range lvs {
Expand All @@ -616,29 +616,104 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
Value: lv,
},
fps,
remainingFPs,
candidateFPs,
)
}
remainingFPs = fps
if len(remainingFPs) == 0 {
return nil, nil
candidateFPs = fps
if len(candidateFPs) == 0 {
return nil, nil, nil
}
}
return candidateFPs, matchers[matcherIdx:], nil
}

result := map[model.Fingerprint]metric.Metric{}
for fp := range remainingFPs {
func (s *MemorySeriesStorage) seriesForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) ([]fingerprintSeriesPair, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}

result := []fingerprintSeriesPair{}
FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
if met, _, ok := s.metricForRange(fp, from, through); ok {
result[fp] = metric.Metric{Metric: met}
series := s.seriesForRange(fp, from, through)
s.fpLocker.Unlock(fp)

if series == nil {
continue FPLoop
}

for _, m := range matchersToCheck {
if !m.Match(series.metric[m.Name]) {
continue FPLoop
}
}
result = append(result, fingerprintSeriesPair{fp, series})
}
return result, nil
}

func (s *MemorySeriesStorage) fpsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]struct{}, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}

FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
met, _, ok := s.metricForRange(fp, from, through)
s.fpLocker.Unlock(fp)

if !ok {
delete(candidateFPs, fp)
continue FPLoop
}

for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) {
delete(candidateFPs, fp)
continue FPLoop
}
}
}
for _, m := range matchers[matcherIdx:] {
for fp, met := range result {
if !m.Match(met.Metric[m.Name]) {
delete(result, fp)
return candidateFPs, nil
}

func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) {

candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}

result := map[model.Fingerprint]metric.Metric{}
FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
met, _, ok := s.metricForRange(fp, from, through)
s.fpLocker.Unlock(fp)

if !ok {
continue FPLoop
}

for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) {
continue FPLoop
}
}
result[fp] = metric.Metric{Metric: met}
}
return result, nil
}
Expand Down Expand Up @@ -696,14 +771,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa

// DropMetricsForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) {
fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...)
fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...)
if err != nil {
return 0, err
}
for fp := range fpToMetric {
for fp := range fps {
s.purgeSeries(fp, nil, nil)
}
return len(fpToMetric), nil
return len(fps), nil
}

var (
Expand Down Expand Up @@ -864,7 +939,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
return series, nil
}

// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
// seriesForRange is a helper method for seriesForLabelMatchers.
//
// The caller must have locked the fp.
func (s *MemorySeriesStorage) seriesForRange(
Expand All @@ -883,16 +958,17 @@ func (s *MemorySeriesStorage) seriesForRange(
}

func (s *MemorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
pair fingerprintSeriesPair,
from model.Time, through model.Time,
) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

series := s.seriesForRange(fp, from, through)
fp, series := pair.fp, pair.series
if series == nil {
return nopIter
}

s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

iter, err := series.preloadChunksForRange(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
Expand All @@ -902,16 +978,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange(
}

func (s *MemorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint,
pair fingerprintSeriesPair,
from model.Time, through model.Time,
) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

series := s.seriesForRange(fp, from, through)
fp, series := pair.fp, pair.series
if series == nil {
return nopIter
}

s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

iter, err := series.preloadChunksForInstant(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
Expand Down
Loading

0 comments on commit 1e2f03f

Please sign in to comment.