Skip to content

Commit

Permalink
Add fanin tests and fix uncovered bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
juliusv committed Mar 20, 2017
1 parent 9b33cfc commit 94acd3f
Show file tree
Hide file tree
Showing 3 changed files with 647 additions and 18 deletions.
42 changes: 27 additions & 15 deletions storage/fanin/fanin.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,25 @@ func (q querier) query(qFn func(q local.Querier) ([]local.SeriesIterator, error)
}

func (q querier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
// TODO: implement querying metrics from remote storage.
return q.local.MetricsForLabelMatchers(ctx, from, through, matcherSets...)
}

func (q querier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
// TODO: implement querying last samples from remote storage.
return q.local.LastSampleForLabelMatchers(ctx, cutoff, matcherSets...)
}

func (q querier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
// TODO: implement querying label values from remote storage.
return q.local.LabelValuesForLabelName(ctx, ln)
}

func (q querier) Close() error {
for _, q := range append([]local.Querier{q.local}, q.remotes...) {
if q.local != nil {
if err := q.local.Close(); err != nil {
return err
}
}

for _, q := range q.remotes {
if err := q.Close(); err != nil {
return err
}
Expand All @@ -132,22 +135,29 @@ type mergeIterator struct {

func (mit mergeIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
latest := model.ZeroSamplePair
for _, it := range append(mit.remote, mit.local) {
if it == nil {
// There might not be a local iterator for every remote series.
continue
}
if mit.local != nil {
latest = mit.local.ValueAtOrBeforeTime(t)
}

v := it.ValueAtOrBeforeTime(t)
if v.Timestamp.After(latest.Timestamp) {
latest = v
// We only need to look for a remote last sample if we don't have a local one
// at all. If we have a local one, by definition we would not care about earlier
// "last" samples, and we would not consider later ones as well, because we
// generally only consider remote samples that are older than the oldest
// local sample.
if latest == model.ZeroSamplePair {
for _, it := range mit.remote {
v := it.ValueAtOrBeforeTime(t)
if v.Timestamp.After(latest.Timestamp) {
latest = v
}
}
}

return latest
}

func (mit mergeIterator) RangeValues(interval metric.Interval) []model.SamplePair {
remoteCutoff := model.Earliest
remoteCutoff := model.Latest
var values []model.SamplePair
if mit.local != nil {
values = mit.local.RangeValues(interval)
Expand Down Expand Up @@ -177,7 +187,9 @@ func (mit mergeIterator) Metric() metric.Metric {
}

func (mit mergeIterator) Close() {
mit.local.Close()
if mit.local != nil {
mit.local.Close()
}
for _, it := range mit.remote {
it.Close()
}
Expand Down Expand Up @@ -213,6 +225,6 @@ func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
}
}
result = append(result, a[i:]...)
result = append(result, b[i:]...)
result = append(result, b[j:]...)
return result
}
Loading

0 comments on commit 94acd3f

Please sign in to comment.