Skip to content

Commit

Permalink
Return partial histograms
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed May 15, 2024
1 parent d8061d5 commit e6f9cd1
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 107 deletions.
8 changes: 8 additions & 0 deletions promql/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ func BenchmarkNativeHistograms(b *testing.B) {
name: "sum rate with long rate interval",
query: "sum(rate(native_histogram_series[20m]))",
},
{
name: "histogram_count with short rate interval",
query: "histogram_count(sum(rate(native_histogram_series[2m])))",
},
{
name: "histogram_count with long rate interval",
query: "histogram_count(sum(rate(native_histogram_series[20m])))",
},
}

opts := promql.EngineOpts{
Expand Down
44 changes: 44 additions & 0 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,11 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations
return nil, nil
}
series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
if e.SkipHistogramBuckets {
for i := range series {
series[i] = newHistogramStatsSeries(series[i])
}
}
e.Series = series
return ws, err
}
Expand Down Expand Up @@ -3179,6 +3184,8 @@ func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
// PreprocessExpr wraps all possible step invariant parts of the given expression with
// StepInvariantExpr. It also resolves the preprocessors.
func PreprocessExpr(expr parser.Expr, start, end time.Time) parser.Expr {
detectHistogramStatsDecoding(expr)

isStepInvariant := preprocessExprHelper(expr, start, end)
if isStepInvariant {
return newStepInvariantExpr(expr)
Expand Down Expand Up @@ -3313,8 +3320,45 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) {
})
}

func detectHistogramStatsDecoding(expr parser.Expr) {
var readHistogramStats bool
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
n.SkipHistogramBuckets = readHistogramStats

case *parser.MatrixSelector:
vs := n.VectorSelector.(*parser.VectorSelector)
vs.SkipHistogramBuckets = readHistogramStats

case *parser.Call:
if n.Func.Name == "histogram_count" || n.Func.Name == "histogram_sum" {
readHistogramStats = true
return nil
}
if n.Func.Name == "histogram_quantile" || n.Func.Name == "histogram_fraction" {
readHistogramStats = false
return nil
}
}
return nil
})
}

func makeInt64Pointer(val int64) *int64 {
valp := new(int64)
*valp = val
return valp
}

type histogramStatsSeries struct {
storage.Series
}

func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries {
return &histogramStatsSeries{Series: series}
}

func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
return chunkenc.NewHistogramStatsIterator(s.Series.Iterator(it))
}
9 changes: 5 additions & 4 deletions promql/parser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,11 @@ type VectorSelector struct {
// Offset is the offset used during the query execution
// which is calculated using the original offset, at modifier time,
// eval time, and subquery offsets in the AST tree.
Offset time.Duration
Timestamp *int64
StartOrEnd ItemType // Set when @ is used with start() or end()
LabelMatchers []*labels.Matcher
Offset time.Duration
Timestamp *int64
SkipHistogramBuckets bool // Set when do not need to decode native histogram buckets for evaluating a query.
StartOrEnd ItemType // Set when @ is used with start() or end()
LabelMatchers []*labels.Matcher

// The unexpanded seriesSet populated at query preparation time.
UnexpandedSeriesSet storage.SeriesSet
Expand Down
56 changes: 0 additions & 56 deletions tsdb/chunkenc/float_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,59 +965,3 @@ func (it *floatHistogramIterator) readXor(v *float64, leading, trailing *uint8)
}
return true
}

type histogramStat int

const (
statSum = iota
statCount = 1
)

type floatHistogramStatsDecoder struct {
it *floatHistogramIterator
stat histogramStat
}

func newFloatHistogramStatsDecoder(it *floatHistogramIterator, stat histogramStat) *floatHistogramStatsDecoder {
return &floatHistogramStatsDecoder{
it: it,
stat: stat,
}
}

func (f floatHistogramStatsDecoder) Next() ValueType {
if f.it.Next() == ValFloatHistogram {
return ValFloat
}
return ValNone
}

func (f floatHistogramStatsDecoder) Seek(t int64) ValueType {
if f.it.Seek(t) == ValFloatHistogram {
return ValFloat
}
return ValNone
}

func (f floatHistogramStatsDecoder) At() (int64, float64) {
switch f.stat {
case statCount:
return f.it.t, f.it.cnt.value
case statSum:
return f.it.t, f.it.sum.value
default:
panic(fmt.Sprintf("unknown stat type %d", f.stat))
}
}

func (f floatHistogramStatsDecoder) AtT() int64 { return f.it.t }

func (f floatHistogramStatsDecoder) Err() error { return f.it.err }

func (f floatHistogramStatsDecoder) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) {
panic("cannot call floatHistogramStatsDecoder.AtHistogram")
}

func (f floatHistogramStatsDecoder) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("cannot call floatHistogramStatsDecoder.AtFloatHistogram")
}
47 changes: 0 additions & 47 deletions tsdb/chunkenc/float_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,50 +976,3 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
require.EqualError(t, err, "float histogram counter reset")
})
}

func TestFloatHistogramCountDecoding(t *testing.T) {
numHistograms := 20
chk, err := createHistogramChunk(numHistograms)
require.NoError(t, err)

decodedHistograms := make([]*histogram.FloatHistogram, 0)
decodedCounts := make([]float64, 0)
decodedSums := make([]float64, 0)

it := chk.Iterator(nil)
for it.Next() != ValNone {
_, fh := it.AtFloatHistogram(nil)
decodedHistograms = append(decodedHistograms, fh)
}

countIterator := newFloatHistogramStatsDecoder(chk.Iterator(nil).(*floatHistogramIterator), statCount)
for countIterator.Next() != ValNone {
_, count := countIterator.At()
decodedCounts = append(decodedCounts, count)
}
sumIterator := newFloatHistogramStatsDecoder(chk.Iterator(nil).(*floatHistogramIterator), statSum)
for sumIterator.Next() != ValNone {
_, sum := sumIterator.At()
decodedSums = append(decodedSums, sum)
}
for i := 0; i < len(decodedHistograms); i++ {
require.Equal(t, decodedHistograms[i].Count, decodedCounts[i])
require.Equal(t, decodedHistograms[i].Sum, decodedSums[i])
}
}

func createHistogramChunk(n int) (*FloatHistogramChunk, error) {
chunk := NewFloatHistogramChunk()
appender, err := chunk.Appender()
if err != nil {
return nil, err
}
fhAppender := appender.(*FloatHistogramAppender)

for i := 0; i < n; i++ {
if _, _, _, err := fhAppender.AppendFloatHistogram(nil, int64(i*1000), tsdbutil.GenerateTestFloatHistogram(i), true); err != nil {
return nil, err
}
}
return chunk, nil
}
68 changes: 68 additions & 0 deletions tsdb/chunkenc/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,3 +1121,71 @@ func resize[T any](items []T, n int) []T {
}
return items[:n]
}

type histogramStatsIterator struct {
Iterator
hReader *histogram.Histogram
fhReader *histogram.FloatHistogram
}

func NewHistogramStatsIterator(it Iterator) Iterator {
return histogramStatsIterator{
Iterator: it,
hReader: &histogram.Histogram{},
fhReader: &histogram.FloatHistogram{},
}
}

func (f histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
var t int64
t, f.hReader = f.Iterator.AtHistogram(f.hReader)
if value.IsStaleNaN(f.hReader.Sum) {
return t, &histogram.Histogram{Sum: f.hReader.Sum}
}

if h == nil {
return t, &histogram.Histogram{
CounterResetHint: f.hReader.CounterResetHint,
Count: f.hReader.Count,
ZeroCount: f.hReader.ZeroCount,
Sum: f.hReader.Sum,
ZeroThreshold: f.hReader.ZeroThreshold,
Schema: f.hReader.Schema,
}
}

h.CounterResetHint = f.fhReader.CounterResetHint
h.Count = f.hReader.Count
h.ZeroCount = f.hReader.ZeroCount
h.Sum = f.hReader.Sum
h.ZeroThreshold = f.hReader.ZeroThreshold
h.Schema = f.hReader.Schema
return t, h
}

func (f histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
var t int64
t, f.fhReader = f.Iterator.AtFloatHistogram(f.fhReader)
if value.IsStaleNaN(f.fhReader.Sum) {
return f.Iterator.AtT(), &histogram.FloatHistogram{Sum: f.fhReader.Sum}
}

if fh == nil {
return t, &histogram.FloatHistogram{
CounterResetHint: f.fhReader.CounterResetHint,
Count: f.fhReader.Count,
ZeroCount: f.fhReader.ZeroCount,
Sum: f.fhReader.Sum,
ZeroThreshold: f.fhReader.ZeroThreshold,
Schema: f.fhReader.Schema,
}
}

fh.CounterResetHint = f.fhReader.CounterResetHint
fh.Schema = f.fhReader.Schema
fh.ZeroThreshold = f.fhReader.ZeroThreshold
fh.ZeroCount = f.fhReader.ZeroCount
fh.Count = f.fhReader.Count
fh.Sum = f.fhReader.Sum
return t, fh
}
43 changes: 43 additions & 0 deletions tsdb/chunkenc/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,3 +1177,46 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
require.EqualError(t, err, "histogram counter reset")
})
}

func TestHistogramStatsDecoding(t *testing.T) {
numHistograms := 20
chk, err := createHistogramChunk(numHistograms)
require.NoError(t, err)

decodedHistograms := make([]*histogram.Histogram, 0)
decodedCounts := make([]uint64, 0)
decodedSums := make([]float64, 0)

it := chk.Iterator(nil)
for it.Next() != ValNone {
_, fh := it.AtHistogram(nil)
decodedHistograms = append(decodedHistograms, fh)
}

statsIterator := NewHistogramStatsIterator(chk.Iterator(nil).(*histogramIterator))
for statsIterator.Next() != ValNone {
_, h := statsIterator.AtHistogram(nil)
decodedCounts = append(decodedCounts, h.Count)
decodedSums = append(decodedSums, h.Sum)
}
for i := 0; i < len(decodedHistograms); i++ {
require.Equal(t, decodedHistograms[i].Count, decodedCounts[i])
require.Equal(t, decodedHistograms[i].Sum, decodedSums[i])
}
}

func createHistogramChunk(n int) (*HistogramChunk, error) {
chunk := NewHistogramChunk()
appender, err := chunk.Appender()
if err != nil {
return nil, err
}
fhAppender := appender.(*HistogramAppender)

for i := 0; i < n; i++ {
if _, _, _, err := fhAppender.AppendHistogram(nil, int64(i*1000), tsdbutil.GenerateTestHistogram(i), true); err != nil {
return nil, err
}
}
return chunk, nil
}

0 comments on commit e6f9cd1

Please sign in to comment.