Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microoptimise matching #2005

Merged
merged 4 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 130 additions & 53 deletions storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,19 +415,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() {

// LastSampleForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
fps := map[model.Fingerprint]struct{}{}
mergedFPs := map[model.Fingerprint]struct{}{}
for _, matchers := range matcherSets {
fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...)
fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...)
if err != nil {
return nil, err
}
for fp := range fpToMetric {
fps[fp] = struct{}{}
for fp := range fps {
mergedFPs[fp] = struct{}{}
}
}

res := make(model.Vector, 0, len(fps))
for fp := range fps {
res := make(model.Vector, 0, len(mergedFPs))
for fp := range mergedFPs {
s.fpLocker.Lock(fp)

series, ok := s.fpToSeries.get(fp)
Expand Down Expand Up @@ -485,13 +485,13 @@ func (bit *boundedIterator) Close() {

// QueryRange implements Storage.
func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
}
iterators := make([]SeriesIterator, 0, len(fpToMetric))
for fp := range fpToMetric {
it := s.preloadChunksForRange(fp, from, through)
iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
for _, pair := range fpSeriesPairs {
it := s.preloadChunksForRange(pair, from, through)
iterators = append(iterators, it)
}
return iterators, nil
Expand All @@ -502,13 +502,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta
from := ts.Add(-stalenessDelta)
through := ts

fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
if err != nil {
return nil, err
}
iterators := make([]SeriesIterator, 0, len(fpToMetric))
for fp := range fpToMetric {
it := s.preloadChunksForInstant(fp, from, through)
iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
for _, pair := range fpSeriesPairs {
it := s.preloadChunksForInstant(pair, from, through)
iterators = append(iterators, it)
}
return iterators, nil
Expand Down Expand Up @@ -563,55 +563,55 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers(
return metrics, nil
}

func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time,
// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked.
func (s *MemorySeriesStorage) candidateFPsForLabelMatchers(
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) {
) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) {
sort.Sort(metric.LabelMatchers(matchers))

if len(matchers) == 0 || matchers[0].MatchesEmptyString() {
// No matchers at all or even the best matcher matches the empty string.
return nil, nil
return nil, nil, nil
}

var (
matcherIdx int
remainingFPs map[model.Fingerprint]struct{}
candidateFPs map[model.Fingerprint]struct{}
)

// Equal matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ {
for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx]
if m.Type != metric.Equal || m.MatchesEmptyString() {
break
}
remainingFPs = s.fingerprintsForLabelPair(
candidateFPs = s.fingerprintsForLabelPair(
model.LabelPair{
Name: m.Name,
Value: m.Value,
},
nil,
remainingFPs,
candidateFPs,
)
if len(remainingFPs) == 0 {
return nil, nil
if len(candidateFPs) == 0 {
return nil, nil, nil
}
}

// Other matchers.
for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ {
for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ {
m := matchers[matcherIdx]
if m.MatchesEmptyString() {
break
}

lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name)
if err != nil {
return nil, err
return nil, nil, err
}
lvs = m.Filter(lvs)
if len(lvs) == 0 {
return nil, nil
return nil, nil, nil
}
fps := map[model.Fingerprint]struct{}{}
for _, lv := range lvs {
Expand All @@ -621,29 +621,104 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
Value: lv,
},
fps,
remainingFPs,
candidateFPs,
)
}
remainingFPs = fps
if len(remainingFPs) == 0 {
return nil, nil
candidateFPs = fps
if len(candidateFPs) == 0 {
return nil, nil, nil
}
}
return candidateFPs, matchers[matcherIdx:], nil
}

result := map[model.Fingerprint]metric.Metric{}
for fp := range remainingFPs {
func (s *MemorySeriesStorage) seriesForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) ([]fingerprintSeriesPair, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}

result := []fingerprintSeriesPair{}
FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
if met, _, ok := s.metricForRange(fp, from, through); ok {
result[fp] = metric.Metric{Metric: met}
series := s.seriesForRange(fp, from, through)
s.fpLocker.Unlock(fp)

if series == nil {
continue FPLoop
}

for _, m := range matchersToCheck {
if !m.Match(series.metric[m.Name]) {
continue FPLoop
}
}
result = append(result, fingerprintSeriesPair{fp, series})
}
return result, nil
}

func (s *MemorySeriesStorage) fpsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]struct{}, error) {
candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}

FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
met, _, ok := s.metricForRange(fp, from, through)
s.fpLocker.Unlock(fp)

if !ok {
delete(candidateFPs, fp)
continue FPLoop
}

for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) {
delete(candidateFPs, fp)
continue FPLoop
}
}
}
for _, m := range matchers[matcherIdx:] {
for fp, met := range result {
if !m.Match(met.Metric[m.Name]) {
delete(result, fp)
return candidateFPs, nil
}

func (s *MemorySeriesStorage) metricsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) (map[model.Fingerprint]metric.Metric, error) {

candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
if err != nil {
return nil, err
}

result := map[model.Fingerprint]metric.Metric{}
FPLoop:
for fp := range candidateFPs {
s.fpLocker.Lock(fp)
met, _, ok := s.metricForRange(fp, from, through)
s.fpLocker.Unlock(fp)

if !ok {
continue FPLoop
}

for _, m := range matchersToCheck {
if !m.Match(met[m.Name]) {
continue FPLoop
}
}
result[fp] = metric.Metric{Metric: met}
}
return result, nil
}
Expand Down Expand Up @@ -701,14 +776,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa

// DropMetricsForLabelMatchers implements Storage.
func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) {
fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...)
fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...)
if err != nil {
return 0, err
}
for fp := range fpToMetric {
for fp := range fps {
s.purgeSeries(fp, nil, nil)
}
return len(fpToMetric), nil
return len(fps), nil
}

var (
Expand Down Expand Up @@ -869,7 +944,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
return series, nil
}

// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
// seriesForRange is a helper method for seriesForLabelMatchers.
//
// The caller must have locked the fp.
func (s *MemorySeriesStorage) seriesForRange(
Expand All @@ -888,16 +963,17 @@ func (s *MemorySeriesStorage) seriesForRange(
}

func (s *MemorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
pair fingerprintSeriesPair,
from model.Time, through model.Time,
) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

series := s.seriesForRange(fp, from, through)
fp, series := pair.fp, pair.series
if series == nil {
return nopIter
}

s.fpLocker.Lock(fp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the other PR, the performance gain of not going with defer is probably negligible in this context, and will go away with Go1.8 anyway. Better use the cleaner code here (i.e. unlock with defer as before).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, unlock with defer.

defer s.fpLocker.Unlock(fp)

iter, err := series.preloadChunksForRange(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
Expand All @@ -907,16 +983,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange(
}

func (s *MemorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint,
pair fingerprintSeriesPair,
from model.Time, through model.Time,
) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

series := s.seriesForRange(fp, from, through)
fp, series := pair.fp, pair.series
if series == nil {
return nopIter
}

s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)

iter, err := series.preloadChunksForInstant(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
Expand Down
Loading