From d8061d5666230793feeaacc43fc81c87258df170 Mon Sep 17 00:00:00 2001 From: Anthony Mirabella Date: Fri, 10 May 2024 04:41:21 -0400 Subject: [PATCH 1/8] Implement histogram statistics decoder This commit is a POC of how we can speed up histogram_count and histogram_sum functions on native histograms. The idea is to have separate decoders which can be used by the engine to only read count/sum values from histogram objects. This should help with reducing allocations when decoding histograms, as well as with speeding up aggregations like sum since they will be done on floats and not on histogram objects. Signed-off-by: Filip Petkovski --- tsdb/chunkenc/float_histogram.go | 56 +++++++++++++++++++++++++++ tsdb/chunkenc/float_histogram_test.go | 47 ++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 1eed46ca87f..57bfd8b8ddf 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -965,3 +965,59 @@ func (it *floatHistogramIterator) readXor(v *float64, leading, trailing *uint8) } return true } + +type histogramStat int + +const ( + statSum = iota + statCount = 1 +) + +type floatHistogramStatsDecoder struct { + it *floatHistogramIterator + stat histogramStat +} + +func newFloatHistogramStatsDecoder(it *floatHistogramIterator, stat histogramStat) *floatHistogramStatsDecoder { + return &floatHistogramStatsDecoder{ + it: it, + stat: stat, + } +} + +func (f floatHistogramStatsDecoder) Next() ValueType { + if f.it.Next() == ValFloatHistogram { + return ValFloat + } + return ValNone +} + +func (f floatHistogramStatsDecoder) Seek(t int64) ValueType { + if f.it.Seek(t) == ValFloatHistogram { + return ValFloat + } + return ValNone +} + +func (f floatHistogramStatsDecoder) At() (int64, float64) { + switch f.stat { + case statCount: + return f.it.t, f.it.cnt.value + case statSum: + return f.it.t, f.it.sum.value + default: + panic(fmt.Sprintf("unknown stat type %d", f.stat)) + } +} + +func (f floatHistogramStatsDecoder) AtT() int64 { return f.it.t } + +func (f floatHistogramStatsDecoder) Err() error { return f.it.err } + +func (f floatHistogramStatsDecoder) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { + panic("cannot call floatHistogramStatsDecoder.AtHistogram") +} + +func (f floatHistogramStatsDecoder) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + panic("cannot call floatHistogramStatsDecoder.AtFloatHistogram") +} diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 054c17f7d99..87cb96f3a3d 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -976,3 +976,50 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) { require.EqualError(t, err, "float histogram counter reset") }) } + +func TestFloatHistogramCountDecoding(t *testing.T) { + numHistograms := 20 + chk, err := createHistogramChunk(numHistograms) + require.NoError(t, err) + + decodedHistograms := make([]*histogram.FloatHistogram, 0) + decodedCounts := make([]float64, 0) + decodedSums := make([]float64, 0) + + it := chk.Iterator(nil) + for it.Next() != ValNone { + _, fh := it.AtFloatHistogram(nil) + decodedHistograms = append(decodedHistograms, fh) + } + + countIterator := newFloatHistogramStatsDecoder(chk.Iterator(nil).(*floatHistogramIterator), statCount) + for countIterator.Next() != ValNone { + _, count := countIterator.At() + decodedCounts = append(decodedCounts, count) + } + sumIterator := newFloatHistogramStatsDecoder(chk.Iterator(nil).(*floatHistogramIterator), statSum) + for sumIterator.Next() != ValNone { + _, sum := sumIterator.At() + decodedSums = append(decodedSums, sum) + } + for i := 0; i < len(decodedHistograms); i++ { + require.Equal(t, decodedHistograms[i].Count, decodedCounts[i]) + require.Equal(t, decodedHistograms[i].Sum, decodedSums[i]) + } +} + +func createHistogramChunk(n int) (*FloatHistogramChunk, error) { + chunk := NewFloatHistogramChunk() + appender, err := chunk.Appender() + if err != nil { + return nil, err + } + fhAppender := appender.(*FloatHistogramAppender) + + for i := 0; i < n; i++ { + if _, _, _, err := fhAppender.AppendFloatHistogram(nil, int64(i*1000), tsdbutil.GenerateTestFloatHistogram(i), true); err != nil { + return nil, err + } + } + return chunk, nil +} From e6f9cd1b9534ec74a1521d0c4ec32f5ebb96bcf5 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 15 May 2024 07:55:39 +0200 Subject: [PATCH 2/8] Return partial histograms Signed-off-by: Filip Petkovski --- promql/bench_test.go | 8 ++++ promql/engine.go | 44 +++++++++++++++++ promql/parser/ast.go | 9 ++-- tsdb/chunkenc/float_histogram.go | 56 ---------------------- tsdb/chunkenc/float_histogram_test.go | 47 ------------------ tsdb/chunkenc/histogram.go | 68 +++++++++++++++++++++++++++ tsdb/chunkenc/histogram_test.go | 43 +++++++++++++++++ 7 files changed, 168 insertions(+), 107 deletions(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index 9a85290915e..fb3b6ac74b5 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -323,6 +323,14 @@ func BenchmarkNativeHistograms(b *testing.B) { name: "sum rate with long rate interval", query: "sum(rate(native_histogram_series[20m]))", }, + { + name: "histogram_count with short rate interval", + query: "histogram_count(sum(rate(native_histogram_series[2m])))", + }, + { + name: "histogram_count with long rate interval", + query: "histogram_count(sum(rate(native_histogram_series[20m])))", + }, } opts := promql.EngineOpts{ diff --git a/promql/engine.go b/promql/engine.go index ea4bc1af85e..7be3e525887 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -980,6 +980,11 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations return nil, nil } series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet) + if e.SkipHistogramBuckets { + for i := range series { + series[i] = newHistogramStatsSeries(series[i]) + } + } e.Series = series return ws, err } @@ -3179,6 +3184,8 @@ func unwrapStepInvariantExpr(e parser.Expr) parser.Expr { // PreprocessExpr wraps all possible step invariant parts of the given expression with // StepInvariantExpr. It also resolves the preprocessors. func PreprocessExpr(expr parser.Expr, start, end time.Time) parser.Expr { + detectHistogramStatsDecoding(expr) + isStepInvariant := preprocessExprHelper(expr, start, end) if isStepInvariant { return newStepInvariantExpr(expr) @@ -3313,8 +3320,45 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { }) } +func detectHistogramStatsDecoding(expr parser.Expr) { + var readHistogramStats bool + parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { + switch n := node.(type) { + case *parser.VectorSelector: + n.SkipHistogramBuckets = readHistogramStats + + case *parser.MatrixSelector: + vs := n.VectorSelector.(*parser.VectorSelector) + vs.SkipHistogramBuckets = readHistogramStats + + case *parser.Call: + if n.Func.Name == "histogram_count" || n.Func.Name == "histogram_sum" { + readHistogramStats = true + return nil + } + if n.Func.Name == "histogram_quantile" || n.Func.Name == "histogram_fraction" { + readHistogramStats = false + return nil + } + } + return nil + }) +} + func makeInt64Pointer(val int64) *int64 { valp := new(int64) *valp = val return valp } + +type histogramStatsSeries struct { + storage.Series +} + +func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries { + return &histogramStatsSeries{Series: series} +} + +func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + return chunkenc.NewHistogramStatsIterator(s.Series.Iterator(it)) +} diff --git a/promql/parser/ast.go b/promql/parser/ast.go index 379352599da..f0dc816e917 100644 --- a/promql/parser/ast.go +++ b/promql/parser/ast.go @@ -198,10 +198,11 @@ type VectorSelector struct { // Offset is the offset used during the query execution // which is calculated using the original offset, at modifier time, // eval time, and subquery offsets in the AST tree. - Offset time.Duration - Timestamp *int64 - StartOrEnd ItemType // Set when @ is used with start() or end() - LabelMatchers []*labels.Matcher + Offset time.Duration + Timestamp *int64 + SkipHistogramBuckets bool // Set when do not need to decode native histogram buckets for evaluating a query. + StartOrEnd ItemType // Set when @ is used with start() or end() + LabelMatchers []*labels.Matcher // The unexpanded seriesSet populated at query preparation time. UnexpandedSeriesSet storage.SeriesSet diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 57bfd8b8ddf..1eed46ca87f 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -965,59 +965,3 @@ func (it *floatHistogramIterator) readXor(v *float64, leading, trailing *uint8) } return true } - -type histogramStat int - -const ( - statSum = iota - statCount = 1 -) - -type floatHistogramStatsDecoder struct { - it *floatHistogramIterator - stat histogramStat -} - -func newFloatHistogramStatsDecoder(it *floatHistogramIterator, stat histogramStat) *floatHistogramStatsDecoder { - return &floatHistogramStatsDecoder{ - it: it, - stat: stat, - } -} - -func (f floatHistogramStatsDecoder) Next() ValueType { - if f.it.Next() == ValFloatHistogram { - return ValFloat - } - return ValNone -} - -func (f floatHistogramStatsDecoder) Seek(t int64) ValueType { - if f.it.Seek(t) == ValFloatHistogram { - return ValFloat - } - return ValNone -} - -func (f floatHistogramStatsDecoder) At() (int64, float64) { - switch f.stat { - case statCount: - return f.it.t, f.it.cnt.value - case statSum: - return f.it.t, f.it.sum.value - default: - panic(fmt.Sprintf("unknown stat type %d", f.stat)) - } -} - -func (f floatHistogramStatsDecoder) AtT() int64 { return f.it.t } - -func (f floatHistogramStatsDecoder) Err() error { return f.it.err } - -func (f floatHistogramStatsDecoder) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { - panic("cannot call floatHistogramStatsDecoder.AtHistogram") -} - -func (f floatHistogramStatsDecoder) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - panic("cannot call floatHistogramStatsDecoder.AtFloatHistogram") -} diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 87cb96f3a3d..054c17f7d99 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -976,50 +976,3 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) { require.EqualError(t, err, "float histogram counter reset") }) } - -func TestFloatHistogramCountDecoding(t *testing.T) { - numHistograms := 20 - chk, err := createHistogramChunk(numHistograms) - require.NoError(t, err) - - decodedHistograms := make([]*histogram.FloatHistogram, 0) - decodedCounts := make([]float64, 0) - decodedSums := make([]float64, 0) - - it := chk.Iterator(nil) - for it.Next() != ValNone { - _, fh := it.AtFloatHistogram(nil) - decodedHistograms = append(decodedHistograms, fh) - } - - countIterator := newFloatHistogramStatsDecoder(chk.Iterator(nil).(*floatHistogramIterator), statCount) - for countIterator.Next() != ValNone { - _, count := countIterator.At() - decodedCounts = append(decodedCounts, count) - } - sumIterator := newFloatHistogramStatsDecoder(chk.Iterator(nil).(*floatHistogramIterator), statSum) - for sumIterator.Next() != ValNone { - _, sum := sumIterator.At() - decodedSums = append(decodedSums, sum) - } - for i := 0; i < len(decodedHistograms); i++ { - require.Equal(t, decodedHistograms[i].Count, decodedCounts[i]) - require.Equal(t, decodedHistograms[i].Sum, decodedSums[i]) - } -} - -func createHistogramChunk(n int) (*FloatHistogramChunk, error) { - chunk := NewFloatHistogramChunk() - appender, err := chunk.Appender() - if err != nil { - return nil, err - } - fhAppender := appender.(*FloatHistogramAppender) - - for i := 0; i < n; i++ { - if _, _, _, err := fhAppender.AppendFloatHistogram(nil, int64(i*1000), tsdbutil.GenerateTestFloatHistogram(i), true); err != nil { - return nil, err - } - } - return chunk, nil -} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index e12aec4dcd4..9bfc5d950be 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -1121,3 +1121,71 @@ func resize[T any](items []T, n int) []T { } return items[:n] } + +type histogramStatsIterator struct { + Iterator + hReader *histogram.Histogram + fhReader *histogram.FloatHistogram +} + +func NewHistogramStatsIterator(it Iterator) Iterator { + return histogramStatsIterator{ + Iterator: it, + hReader: &histogram.Histogram{}, + fhReader: &histogram.FloatHistogram{}, + } +} + +func (f histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + var t int64 + t, f.hReader = f.Iterator.AtHistogram(f.hReader) + if value.IsStaleNaN(f.hReader.Sum) { + return t, &histogram.Histogram{Sum: f.hReader.Sum} + } + + if h == nil { + return t, &histogram.Histogram{ + CounterResetHint: f.hReader.CounterResetHint, + Count: f.hReader.Count, + ZeroCount: f.hReader.ZeroCount, + Sum: f.hReader.Sum, + ZeroThreshold: f.hReader.ZeroThreshold, + Schema: f.hReader.Schema, + } + } + + h.CounterResetHint = f.fhReader.CounterResetHint + h.Count = f.hReader.Count + h.ZeroCount = f.hReader.ZeroCount + h.Sum = f.hReader.Sum + h.ZeroThreshold = f.hReader.ZeroThreshold + h.Schema = f.hReader.Schema + return t, h +} + +func (f histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + var t int64 + t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) + if value.IsStaleNaN(f.fhReader.Sum) { + return f.Iterator.AtT(), &histogram.FloatHistogram{Sum: f.fhReader.Sum} + } + + if fh == nil { + return t, &histogram.FloatHistogram{ + CounterResetHint: f.fhReader.CounterResetHint, + Count: f.fhReader.Count, + ZeroCount: f.fhReader.ZeroCount, + Sum: f.fhReader.Sum, + ZeroThreshold: f.fhReader.ZeroThreshold, + Schema: f.fhReader.Schema, + } + } + + fh.CounterResetHint = f.fhReader.CounterResetHint + fh.Schema = f.fhReader.Schema + fh.ZeroThreshold = f.fhReader.ZeroThreshold + fh.ZeroCount = f.fhReader.ZeroCount + fh.Count = f.fhReader.Count + fh.Sum = f.fhReader.Sum + return t, fh +} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index f7609c1936d..4e4bff33969 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -1177,3 +1177,46 @@ func TestHistogramAppendOnlyErrors(t *testing.T) { require.EqualError(t, err, "histogram counter reset") }) } + +func TestHistogramStatsDecoding(t *testing.T) { + numHistograms := 20 + chk, err := createHistogramChunk(numHistograms) + require.NoError(t, err) + + decodedHistograms := make([]*histogram.Histogram, 0) + decodedCounts := make([]uint64, 0) + decodedSums := make([]float64, 0) + + it := chk.Iterator(nil) + for it.Next() != ValNone { + _, fh := it.AtHistogram(nil) + decodedHistograms = append(decodedHistograms, fh) + } + + statsIterator := NewHistogramStatsIterator(chk.Iterator(nil).(*histogramIterator)) + for statsIterator.Next() != ValNone { + _, h := statsIterator.AtHistogram(nil) + decodedCounts = append(decodedCounts, h.Count) + decodedSums = append(decodedSums, h.Sum) + } + for i := 0; i < len(decodedHistograms); i++ { + require.Equal(t, decodedHistograms[i].Count, decodedCounts[i]) + require.Equal(t, decodedHistograms[i].Sum, decodedSums[i]) + } +} + +func createHistogramChunk(n int) (*HistogramChunk, error) { + chunk := NewHistogramChunk() + appender, err := chunk.Appender() + if err != nil { + return nil, err + } + fhAppender := appender.(*HistogramAppender) + + for i := 0; i < n; i++ { + if _, _, _, err := fhAppender.AppendHistogram(nil, int64(i*1000), tsdbutil.GenerateTestHistogram(i), true); err != nil { + return nil, err + } + } + return chunk, nil +} From 743ce408214047a45cabc76e28cd9cce8664a715 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 17 May 2024 15:00:05 +0200 Subject: [PATCH 3/8] Exclude unnecessary fields Signed-off-by: Filip Petkovski --- promql/engine.go | 34 +++++++++++++++++++--------------- tsdb/chunkenc/histogram.go | 14 +------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 7be3e525887..235d6fc64a9 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3320,26 +3320,30 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { }) } +// detectHistogramStatsDecoding modifies the expression by setting the +// SkipHistogramBuckets field in those vector selectors for which it is safe to +// return only histogram statistics (sum and count), excluding histogram spans +// and buckets. The function can be treated as an optimization and does is not +// required for correctness. func detectHistogramStatsDecoding(expr parser.Expr) { - var readHistogramStats bool parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { case *parser.VectorSelector: - n.SkipHistogramBuckets = readHistogramStats - - case *parser.MatrixSelector: - vs := n.VectorSelector.(*parser.VectorSelector) - vs.SkipHistogramBuckets = readHistogramStats - - case *parser.Call: - if n.Func.Name == "histogram_count" || n.Func.Name == "histogram_sum" { - readHistogramStats = true - return nil - } - if n.Func.Name == "histogram_quantile" || n.Func.Name == "histogram_fraction" { - readHistogramStats = false - return nil + for _, p := range path { + call, ok := p.(*parser.Call) + if !ok { + continue + } + if call.Func.Name == "histogram_count" || call.Func.Name == "histogram_sum" { + n.SkipHistogramBuckets = true + break + } + if call.Func.Name == "histogram_quantile" || call.Func.Name == "histogram_fraction" { + n.SkipHistogramBuckets = false + break + } } + return fmt.Errorf("stop") } return nil }) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 9bfc5d950be..f6726bbf6b1 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -1147,19 +1147,13 @@ func (f histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *his return t, &histogram.Histogram{ CounterResetHint: f.hReader.CounterResetHint, Count: f.hReader.Count, - ZeroCount: f.hReader.ZeroCount, Sum: f.hReader.Sum, - ZeroThreshold: f.hReader.ZeroThreshold, - Schema: f.hReader.Schema, } } - h.CounterResetHint = f.fhReader.CounterResetHint + h.CounterResetHint = f.hReader.CounterResetHint h.Count = f.hReader.Count - h.ZeroCount = f.hReader.ZeroCount h.Sum = f.hReader.Sum - h.ZeroThreshold = f.hReader.ZeroThreshold - h.Schema = f.hReader.Schema return t, h } @@ -1174,17 +1168,11 @@ func (f histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) ( return t, &histogram.FloatHistogram{ CounterResetHint: f.fhReader.CounterResetHint, Count: f.fhReader.Count, - ZeroCount: f.fhReader.ZeroCount, Sum: f.fhReader.Sum, - ZeroThreshold: f.fhReader.ZeroThreshold, - Schema: f.fhReader.Schema, } } fh.CounterResetHint = f.fhReader.CounterResetHint - fh.Schema = f.fhReader.Schema - fh.ZeroThreshold = f.fhReader.ZeroThreshold - fh.ZeroCount = f.fhReader.ZeroCount fh.Count = f.fhReader.Count fh.Sum = f.fhReader.Sum return t, fh From c7111d0546bba701981db6e8ffab6c9075352817 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Fri, 17 May 2024 15:10:41 +0200 Subject: [PATCH 4/8] Fix lint Signed-off-by: Filip Petkovski --- promql/engine.go | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 235d6fc64a9..65f75f13932 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3327,25 +3327,26 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { // required for correctness. func detectHistogramStatsDecoding(expr parser.Expr) { parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { - switch n := node.(type) { - case *parser.VectorSelector: - for _, p := range path { - call, ok := p.(*parser.Call) - if !ok { - continue - } - if call.Func.Name == "histogram_count" || call.Func.Name == "histogram_sum" { - n.SkipHistogramBuckets = true - break - } - if call.Func.Name == "histogram_quantile" || call.Func.Name == "histogram_fraction" { - n.SkipHistogramBuckets = false - break - } + n, ok := (node).(*parser.VectorSelector) + if !ok { + return nil + } + + for _, p := range path { + call, ok := p.(*parser.Call) + if !ok { + continue + } + if call.Func.Name == "histogram_count" || call.Func.Name == "histogram_sum" { + n.SkipHistogramBuckets = true + break + } + if call.Func.Name == "histogram_quantile" || call.Func.Name == "histogram_fraction" { + n.SkipHistogramBuckets = false + break } - return fmt.Errorf("stop") } - return nil + return fmt.Errorf("stop") }) } From b1623506b61deee6717b7b4b4f2897531d8be43a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 26 May 2024 10:19:07 +0200 Subject: [PATCH 5/8] Detect histogram reset Signed-off-by: Filip Petkovski --- promql/engine.go | 2 +- promql/histogram_stats_iterator.go | 119 ++++++++++++++++++++++++ promql/histogram_stats_iterator_test.go | 108 +++++++++++++++++++++ tsdb/chunkenc/histogram.go | 56 ----------- tsdb/chunkenc/histogram_test.go | 43 --------- 5 files changed, 228 insertions(+), 100 deletions(-) create mode 100644 promql/histogram_stats_iterator.go create mode 100644 promql/histogram_stats_iterator_test.go diff --git a/promql/engine.go b/promql/engine.go index 65f75f13932..dd3c19e36cd 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3365,5 +3365,5 @@ func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries { } func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { - return chunkenc.NewHistogramStatsIterator(s.Series.Iterator(it)) + return NewHistogramStatsIterator(s.Series.Iterator(it)) } diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go new file mode 100644 index 00000000000..46680d27c10 --- /dev/null +++ b/promql/histogram_stats_iterator.go @@ -0,0 +1,119 @@ +package promql + +import ( + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +type histogramStatsIterator struct { + chunkenc.Iterator + + hReader *histogram.Histogram + lastH *histogram.Histogram + + fhReader *histogram.FloatHistogram + lastFH *histogram.FloatHistogram +} + +func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator { + return &histogramStatsIterator{ + Iterator: it, + hReader: &histogram.Histogram{}, + fhReader: &histogram.FloatHistogram{}, + } +} + +func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + var t int64 + t, f.hReader = f.Iterator.AtHistogram(f.hReader) + if value.IsStaleNaN(f.hReader.Sum) { + f.setLastH(f.hReader) + h = &histogram.Histogram{Sum: f.hReader.Sum} + return t, h + } + + if h == nil { + f.setLastH(f.hReader) + h = &histogram.Histogram{ + CounterResetHint: f.getResetHint(f.hReader), + Count: f.hReader.Count, + Sum: f.hReader.Sum, + } + return t, h + } + + h.CounterResetHint = f.getResetHint(f.hReader) + h.Count = f.hReader.Count + h.Sum = f.hReader.Sum + return t, h +} + +func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + var t int64 + t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) + if value.IsStaleNaN(f.fhReader.Sum) { + f.setLastFH(f.fhReader) + return f.Iterator.AtT(), &histogram.FloatHistogram{Sum: f.fhReader.Sum} + } + + if fh == nil { + return t, &histogram.FloatHistogram{ + CounterResetHint: f.getFloatResetHint(f.fhReader.CounterResetHint), + Count: f.fhReader.Count, + Sum: f.fhReader.Sum, + } + } + + fh.CounterResetHint = f.getFloatResetHint(f.fhReader.CounterResetHint) + fh.Count = f.fhReader.Count + fh.Sum = f.fhReader.Sum + return t, fh +} + +func (f *histogramStatsIterator) setLastH(h *histogram.Histogram) { + if f.lastH == nil { + f.lastH = h.Copy() + } else { + h.CopyTo(f.lastH) + } +} + +func (f *histogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { + if f.lastFH == nil { + f.lastFH = fh.Copy() + } else { + fh.CopyTo(f.lastFH) + } +} + +func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { + if hint != histogram.UnknownCounterReset { + return hint + } + if f.lastFH == nil { + return histogram.NotCounterReset + } + + if f.fhReader.DetectReset(f.lastFH) { + return histogram.CounterReset + } else { + return histogram.NotCounterReset + } +} + +func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { + if h.CounterResetHint != histogram.UnknownCounterReset { + return h.CounterResetHint + } + if f.lastH == nil { + return histogram.NotCounterReset + } + + fh, prevFH := h.ToFloat(nil), f.lastH.ToFloat(nil) + if fh.DetectReset(prevFH) { + return histogram.CounterReset + } else { + return histogram.NotCounterReset + } +} diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go new file mode 100644 index 00000000000..2703d41c74c --- /dev/null +++ b/promql/histogram_stats_iterator_test.go @@ -0,0 +1,108 @@ +package promql + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" +) + +func TestHistogramStatsDecoding(t *testing.T) { + histograms := []*histogram.Histogram{ + tsdbutil.GenerateTestHistogram(0), + tsdbutil.GenerateTestHistogram(1), + tsdbutil.GenerateTestHistogram(2), + tsdbutil.GenerateTestHistogram(2), + } + histograms[0].CounterResetHint = histogram.NotCounterReset + histograms[1].CounterResetHint = histogram.UnknownCounterReset + histograms[2].CounterResetHint = histogram.CounterReset + histograms[3].CounterResetHint = histogram.UnknownCounterReset + + expectedHints := []histogram.CounterResetHint{ + histogram.NotCounterReset, + histogram.NotCounterReset, + histogram.CounterReset, + histogram.NotCounterReset, + } + + t.Run("histogram_stats", func(t *testing.T) { + decodedStats := make([]*histogram.Histogram, 0) + statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil)) + for statsIterator.Next() != chunkenc.ValNone { + _, h := statsIterator.AtHistogram(nil) + decodedStats = append(decodedStats, h) + } + for i := 0; i < len(histograms); i++ { + require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint) + require.Equal(t, histograms[i].Count, decodedStats[i].Count) + require.Equal(t, histograms[i].Sum, decodedStats[i].Sum) + } + }) + t.Run("float_histogram_stats", func(t *testing.T) { + decodedStats := make([]*histogram.FloatHistogram, 0) + statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil)) + for statsIterator.Next() != chunkenc.ValNone { + _, h := statsIterator.AtFloatHistogram(nil) + decodedStats = append(decodedStats, h) + } + for i := 0; i < len(histograms); i++ { + fh := histograms[i].ToFloat(nil) + require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint) + require.Equal(t, fh.Count, decodedStats[i].Count) + require.Equal(t, fh.Sum, decodedStats[i].Sum) + } + }) +} + +type histogramSeries struct { + histograms []*histogram.Histogram +} + +func newHistogramSeries(histograms []*histogram.Histogram) *histogramSeries { + return &histogramSeries{ + histograms: histograms, + } +} + +func (m histogramSeries) Labels() labels.Labels { return labels.EmptyLabels() } + +func (m histogramSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { + return &histogramIterator{ + i: -1, + histograms: m.histograms, + } +} + +type histogramIterator struct { + i int + histograms []*histogram.Histogram +} + +func (h *histogramIterator) Next() chunkenc.ValueType { + h.i++ + if h.i < len(h.histograms) { + return chunkenc.ValHistogram + } + return chunkenc.ValNone +} + +func (h *histogramIterator) Seek(t int64) chunkenc.ValueType { panic("not implemented") } + +func (h *histogramIterator) At() (int64, float64) { panic("not implemented") } + +func (h *histogramIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { + return 0, h.histograms[h.i] +} + +func (h *histogramIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return 0, h.histograms[h.i].ToFloat(nil) +} + +func (h *histogramIterator) AtT() int64 { return 0 } + +func (h *histogramIterator) Err() error { return nil } diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index f6726bbf6b1..e12aec4dcd4 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -1121,59 +1121,3 @@ func resize[T any](items []T, n int) []T { } return items[:n] } - -type histogramStatsIterator struct { - Iterator - hReader *histogram.Histogram - fhReader *histogram.FloatHistogram -} - -func NewHistogramStatsIterator(it Iterator) Iterator { - return histogramStatsIterator{ - Iterator: it, - hReader: &histogram.Histogram{}, - fhReader: &histogram.FloatHistogram{}, - } -} - -func (f histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { - var t int64 - t, f.hReader = f.Iterator.AtHistogram(f.hReader) - if value.IsStaleNaN(f.hReader.Sum) { - return t, &histogram.Histogram{Sum: f.hReader.Sum} - } - - if h == nil { - return t, &histogram.Histogram{ - CounterResetHint: f.hReader.CounterResetHint, - Count: f.hReader.Count, - Sum: f.hReader.Sum, - } - } - - h.CounterResetHint = f.hReader.CounterResetHint - h.Count = f.hReader.Count - h.Sum = f.hReader.Sum - return t, h -} - -func (f histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - var t int64 - t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) - if value.IsStaleNaN(f.fhReader.Sum) { - return f.Iterator.AtT(), &histogram.FloatHistogram{Sum: f.fhReader.Sum} - } - - if fh == nil { - return t, &histogram.FloatHistogram{ - CounterResetHint: f.fhReader.CounterResetHint, - Count: f.fhReader.Count, - Sum: f.fhReader.Sum, - } - } - - fh.CounterResetHint = f.fhReader.CounterResetHint - fh.Count = f.fhReader.Count - fh.Sum = f.fhReader.Sum - return t, fh -} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 4e4bff33969..f7609c1936d 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -1177,46 +1177,3 @@ func TestHistogramAppendOnlyErrors(t *testing.T) { require.EqualError(t, err, "histogram counter reset") }) } - -func TestHistogramStatsDecoding(t *testing.T) { - numHistograms := 20 - chk, err := createHistogramChunk(numHistograms) - require.NoError(t, err) - - decodedHistograms := make([]*histogram.Histogram, 0) - decodedCounts := make([]uint64, 0) - decodedSums := make([]float64, 0) - - it := chk.Iterator(nil) - for it.Next() != ValNone { - _, fh := it.AtHistogram(nil) - decodedHistograms = append(decodedHistograms, fh) - } - - statsIterator := NewHistogramStatsIterator(chk.Iterator(nil).(*histogramIterator)) - for statsIterator.Next() != ValNone { - _, h := statsIterator.AtHistogram(nil) - decodedCounts = append(decodedCounts, h.Count) - decodedSums = append(decodedSums, h.Sum) - } - for i := 0; i < len(decodedHistograms); i++ { - require.Equal(t, decodedHistograms[i].Count, decodedCounts[i]) - require.Equal(t, decodedHistograms[i].Sum, decodedSums[i]) - } -} - -func createHistogramChunk(n int) (*HistogramChunk, error) { - chunk := NewHistogramChunk() - appender, err := chunk.Appender() - if err != nil { - return nil, err - } - fhAppender := appender.(*HistogramAppender) - - for i := 0; i < n; i++ { - if _, _, _, err := fhAppender.AppendHistogram(nil, int64(i*1000), tsdbutil.GenerateTestHistogram(i), true); err != nil { - return nil, err - } - } - return chunk, nil -} From f6ec4560069b6de9f83c05781a88e65d0ac692ed Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 27 May 2024 14:46:49 +0200 Subject: [PATCH 6/8] Address comments and detect reset Signed-off-by: Filip Petkovski --- promql/engine.go | 2 +- promql/histogram_stats_iterator.go | 10 +++++++--- promql/parser/ast.go | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index dd3c19e36cd..5dce7059e80 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3323,7 +3323,7 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { // detectHistogramStatsDecoding modifies the expression by setting the // SkipHistogramBuckets field in those vector selectors for which it is safe to // return only histogram statistics (sum and count), excluding histogram spans -// and buckets. The function can be treated as an optimization and does is not +// and buckets. The function can be treated as an optimization and is not // required for correctness. func detectHistogramStatsDecoding(expr parser.Expr) { parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go index 46680d27c10..8d4fcbad4b2 100644 --- a/promql/histogram_stats_iterator.go +++ b/promql/histogram_stats_iterator.go @@ -34,18 +34,19 @@ func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *hi } if h == nil { - f.setLastH(f.hReader) h = &histogram.Histogram{ CounterResetHint: f.getResetHint(f.hReader), Count: f.hReader.Count, Sum: f.hReader.Sum, } + f.setLastH(f.hReader) return t, h } h.CounterResetHint = f.getResetHint(f.hReader) h.Count = f.hReader.Count h.Sum = f.hReader.Sum + f.setLastH(f.hReader) return t, h } @@ -54,20 +55,23 @@ func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) if value.IsStaleNaN(f.fhReader.Sum) { f.setLastFH(f.fhReader) - return f.Iterator.AtT(), &histogram.FloatHistogram{Sum: f.fhReader.Sum} + return t, &histogram.FloatHistogram{Sum: f.fhReader.Sum} } if fh == nil { - return t, &histogram.FloatHistogram{ + fh = &histogram.FloatHistogram{ CounterResetHint: f.getFloatResetHint(f.fhReader.CounterResetHint), Count: f.fhReader.Count, Sum: f.fhReader.Sum, } + f.setLastFH(f.fhReader) + return t, fh } fh.CounterResetHint = f.getFloatResetHint(f.fhReader.CounterResetHint) fh.Count = f.fhReader.Count fh.Sum = f.fhReader.Sum + f.setLastFH(f.fhReader) return t, fh } diff --git a/promql/parser/ast.go b/promql/parser/ast.go index f0dc816e917..830e8a2c5e4 100644 --- a/promql/parser/ast.go +++ b/promql/parser/ast.go @@ -200,7 +200,7 @@ type VectorSelector struct { // eval time, and subquery offsets in the AST tree. Offset time.Duration Timestamp *int64 - SkipHistogramBuckets bool // Set when do not need to decode native histogram buckets for evaluating a query. + SkipHistogramBuckets bool // Set when decoding native histogram buckets is not needed for query evaluation. StartOrEnd ItemType // Set when @ is used with start() or end() LabelMatchers []*labels.Matcher From fb3818fd0c0eab1b9365e79762cf08d6b7e77243 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 28 May 2024 11:29:58 +0200 Subject: [PATCH 7/8] Add license and fix lint Signed-off-by: Filip Petkovski --- promql/histogram_stats_iterator.go | 19 +++++++++++++++---- promql/histogram_stats_iterator_test.go | 13 +++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go index 8d4fcbad4b2..909f15e6e1b 100644 --- a/promql/histogram_stats_iterator.go +++ b/promql/histogram_stats_iterator.go @@ -1,3 +1,16 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package promql import ( @@ -101,9 +114,8 @@ func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHi if f.fhReader.DetectReset(f.lastFH) { return histogram.CounterReset - } else { - return histogram.NotCounterReset } + return histogram.NotCounterReset } func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { @@ -117,7 +129,6 @@ func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram. fh, prevFH := h.ToFloat(nil), f.lastH.ToFloat(nil) if fh.DetectReset(prevFH) { return histogram.CounterReset - } else { - return histogram.NotCounterReset } + return histogram.NotCounterReset } diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go index 2703d41c74c..b71a9d60292 100644 --- a/promql/histogram_stats_iterator_test.go +++ b/promql/histogram_stats_iterator_test.go @@ -1,3 +1,16 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package promql import ( From aed3550a6f698b0cde0a4dda1d1e9c4cddca2448 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 6 Jun 2024 14:19:24 +0200 Subject: [PATCH 8/8] Address code review comments Signed-off-by: Filip Petkovski --- promql/histogram_stats_iterator.go | 74 +++++++++++++++++------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go index 909f15e6e1b..dfafea5f8ca 100644 --- a/promql/histogram_stats_iterator.go +++ b/promql/histogram_stats_iterator.go @@ -22,69 +22,79 @@ import ( type histogramStatsIterator struct { chunkenc.Iterator - hReader *histogram.Histogram - lastH *histogram.Histogram + currentH *histogram.Histogram + lastH *histogram.Histogram - fhReader *histogram.FloatHistogram - lastFH *histogram.FloatHistogram + currentFH *histogram.FloatHistogram + lastFH *histogram.FloatHistogram } +// NewHistogramStatsIterator creates an iterator which returns histogram objects +// which have only their sum and count values populated. The iterator handles +// counter reset detection internally and sets the counter reset hint accordingly +// in each returned histogram objects. func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator { return &histogramStatsIterator{ - Iterator: it, - hReader: &histogram.Histogram{}, - fhReader: &histogram.FloatHistogram{}, + Iterator: it, + currentH: &histogram.Histogram{}, + currentFH: &histogram.FloatHistogram{}, } } +// AtHistogram returns the next timestamp/histogram pair. The counter reset +// detection is guaranteed to be correct only when the caller does not switch +// between AtHistogram and AtFloatHistogram calls. func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { var t int64 - t, f.hReader = f.Iterator.AtHistogram(f.hReader) - if value.IsStaleNaN(f.hReader.Sum) { - f.setLastH(f.hReader) - h = &histogram.Histogram{Sum: f.hReader.Sum} + t, f.currentH = f.Iterator.AtHistogram(f.currentH) + if value.IsStaleNaN(f.currentH.Sum) { + f.setLastH(f.currentH) + h = &histogram.Histogram{Sum: f.currentH.Sum} return t, h } if h == nil { h = &histogram.Histogram{ - CounterResetHint: f.getResetHint(f.hReader), - Count: f.hReader.Count, - Sum: f.hReader.Sum, + CounterResetHint: f.getResetHint(f.currentH), + Count: f.currentH.Count, + Sum: f.currentH.Sum, } - f.setLastH(f.hReader) + f.setLastH(f.currentH) return t, h } - h.CounterResetHint = f.getResetHint(f.hReader) - h.Count = f.hReader.Count - h.Sum = f.hReader.Sum - f.setLastH(f.hReader) + h.CounterResetHint = f.getResetHint(f.currentH) + h.Count = f.currentH.Count + h.Sum = f.currentH.Sum + f.setLastH(f.currentH) return t, h } +// AtFloatHistogram returns the next timestamp/float histogram pair. The counter +// reset detection is guaranteed to be correct only when the caller does not +// switch between AtHistogram and AtFloatHistogram calls. func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { var t int64 - t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader) - if value.IsStaleNaN(f.fhReader.Sum) { - f.setLastFH(f.fhReader) - return t, &histogram.FloatHistogram{Sum: f.fhReader.Sum} + t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH) + if value.IsStaleNaN(f.currentFH.Sum) { + f.setLastFH(f.currentFH) + return t, &histogram.FloatHistogram{Sum: f.currentFH.Sum} } if fh == nil { fh = &histogram.FloatHistogram{ - CounterResetHint: f.getFloatResetHint(f.fhReader.CounterResetHint), - Count: f.fhReader.Count, - Sum: f.fhReader.Sum, + CounterResetHint: f.getFloatResetHint(f.currentFH.CounterResetHint), + Count: f.currentFH.Count, + Sum: f.currentFH.Sum, } - f.setLastFH(f.fhReader) + f.setLastFH(f.currentFH) return t, fh } - fh.CounterResetHint = f.getFloatResetHint(f.fhReader.CounterResetHint) - fh.Count = f.fhReader.Count - fh.Sum = f.fhReader.Sum - f.setLastFH(f.fhReader) + fh.CounterResetHint = f.getFloatResetHint(f.currentFH.CounterResetHint) + fh.Count = f.currentFH.Count + fh.Sum = f.currentFH.Sum + f.setLastFH(f.currentFH) return t, fh } @@ -112,7 +122,7 @@ func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHi return histogram.NotCounterReset } - if f.fhReader.DetectReset(f.lastFH) { + if f.currentFH.DetectReset(f.lastFH) { return histogram.CounterReset } return histogram.NotCounterReset