Skip to content

Commit

Permalink
Merge pull request prometheus#420 from prometheus/metric-cow
Browse files Browse the repository at this point in the history
Introduce copy-on-write for metrics in AST.
beorn7 committed Dec 16, 2014
2 parents e2fddd6 + 00a2a93 commit d72d49f
Showing 13 changed files with 395 additions and 235 deletions.
20 changes: 11 additions & 9 deletions rules/alerting.go
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ type Alert struct {
}

// sample returns a Sample suitable for recording the alert.
func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *clientmodel.Sample {
func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleValue) *ast.Sample {
recordedMetric := clientmodel.Metric{}
for label, value := range a.Labels {
recordedMetric[label] = value
@@ -90,8 +90,11 @@ func (a Alert) sample(timestamp clientmodel.Timestamp, value clientmodel.SampleV
recordedMetric[AlertNameLabel] = clientmodel.LabelValue(a.Name)
recordedMetric[AlertStateLabel] = clientmodel.LabelValue(a.State.String())

return &clientmodel.Sample{
Metric: recordedMetric,
return &ast.Sample{
Metric: clientmodel.COWMetric{
Metric: recordedMetric,
Copied: true,
},
Value: value,
Timestamp: timestamp,
}
@@ -145,18 +148,17 @@ func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage local.St
// or update the expression value for existing elements.
resultFingerprints := utility.Set{}
for _, sample := range exprResult {
fp := new(clientmodel.Fingerprint)
fp.LoadFromMetric(sample.Metric)
resultFingerprints.Add(*fp)
fp := sample.Metric.Metric.Fingerprint()
resultFingerprints.Add(fp)

if alert, ok := rule.activeAlerts[*fp]; !ok {
if alert, ok := rule.activeAlerts[fp]; !ok {
labels := clientmodel.LabelSet{}
labels.MergeFromMetric(sample.Metric)
labels.MergeFromMetric(sample.Metric.Metric)
labels = labels.Merge(rule.Labels)
if _, ok := labels[clientmodel.MetricNameLabel]; ok {
delete(labels, clientmodel.MetricNameLabel)
}
rule.activeAlerts[*fp] = &Alert{
rule.activeAlerts[fp] = &Alert{
Name: rule.name,
Labels: labels,
State: Pending,
104 changes: 60 additions & 44 deletions rules/ast/ast.go
Original file line number Diff line number Diff line change
@@ -34,17 +34,30 @@ var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "St
// ----------------------------------------------------------------------------
// Raw data value types.

// SampleStream is a stream of Values belonging to an attached COWMetric.
type SampleStream struct {
Metric clientmodel.COWMetric
Values metric.Values
}

// Sample is a single sample belonging to a COWMetric.
type Sample struct {
Metric clientmodel.COWMetric
Value clientmodel.SampleValue
Timestamp clientmodel.Timestamp
}

// Vector is basically only an alias for clientmodel.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp.
type Vector clientmodel.Samples
type Vector []*Sample

// Matrix is a slice of SampleSets that implements sort.Interface and
// Matrix is a slice of SampleStreams that implements sort.Interface and
// has a String method.
// BUG(julius): Pointerize this.
type Matrix []metric.SampleSet
type Matrix []SampleStream

type groupedAggregation struct {
labels clientmodel.Metric
labels clientmodel.COWMetric
value clientmodel.SampleValue
groupCount int
}
@@ -191,7 +204,7 @@ type (
labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.Metric
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
}
@@ -231,7 +244,7 @@ type (
labelMatchers metric.LabelMatchers
// The series iterators are populated at query analysis time.
iterators map[clientmodel.Fingerprint]local.SeriesIterator
metrics map[clientmodel.Fingerprint]clientmodel.Metric
metrics map[clientmodel.Fingerprint]clientmodel.COWMetric
// Fingerprints are populated from label matchers at query analysis time.
fingerprints clientmodel.Fingerprints
interval time.Duration
@@ -400,44 +413,43 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod

// TODO implement watchdog timer for long-running queries.
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
sampleSets := map[uint64]*metric.SampleSet{}
for t := start; t.Before(end); t = t.Add(interval) {
sampleStreams := map[uint64]*SampleStream{}
for t := start; !t.After(end); t = t.Add(interval) {
vector := node.Eval(t)
for _, sample := range vector {
samplePair := metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
}
groupingKey := labelsToKey(sample.Metric)
if sampleSets[groupingKey] == nil {
sampleSets[groupingKey] = &metric.SampleSet{
groupingKey := labelsToKey(sample.Metric.Metric)
if sampleStreams[groupingKey] == nil {
sampleStreams[groupingKey] = &SampleStream{
Metric: sample.Metric,
Values: metric.Values{samplePair},
}
} else {
sampleSets[groupingKey].Values = append(sampleSets[groupingKey].Values, samplePair)
sampleStreams[groupingKey].Values = append(sampleStreams[groupingKey].Values, samplePair)
}
}
}
evalTimer.Stop()

appendTimer := queryStats.GetTimer(stats.ResultAppendTime).Start()
for _, sampleSet := range sampleSets {
matrix = append(matrix, *sampleSet)
for _, sampleStream := range sampleStreams {
matrix = append(matrix, *sampleStream)
}
appendTimer.Stop()

return matrix, nil
}

func labelIntersection(metric1, metric2 clientmodel.Metric) clientmodel.Metric {
intersection := clientmodel.Metric{}
for label, value := range metric1 {
if metric2[label] == value {
intersection[label] = value
func labelIntersection(metric1, metric2 clientmodel.COWMetric) clientmodel.COWMetric {
for label, value := range metric1.Metric {
if metric2.Metric[label] != value {
metric1.Delete(label)
}
}
return intersection
return metric1
}

func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint64]*groupedAggregation, timestamp clientmodel.Timestamp) Vector {
@@ -451,7 +463,7 @@ func (node *VectorAggregation) groupedAggregationsToVector(aggregations map[uint
default:
// For other aggregations, we already have the right value.
}
sample := &clientmodel.Sample{
sample := &Sample{
Metric: aggregation.labels,
Value: aggregation.value,
Timestamp: timestamp,
@@ -467,7 +479,7 @@ func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector {
vector := node.vector.Eval(timestamp)
result := map[uint64]*groupedAggregation{}
for _, sample := range vector {
groupingKey := node.labelsToGroupingKey(sample.Metric)
groupingKey := node.labelsToGroupingKey(sample.Metric.Metric)
if groupedResult, ok := result[groupingKey]; ok {
if node.keepExtraLabels {
groupedResult.labels = labelIntersection(groupedResult.labels, sample.Metric)
@@ -493,14 +505,18 @@ func (node *VectorAggregation) Eval(timestamp clientmodel.Timestamp) Vector {
panic("Unknown aggregation type")
}
} else {
m := clientmodel.Metric{}
var m clientmodel.COWMetric
if node.keepExtraLabels {
m = sample.Metric
delete(m, clientmodel.MetricNameLabel)
m.Delete(clientmodel.MetricNameLabel)
} else {
m = clientmodel.COWMetric{
Metric: clientmodel.Metric{},
Copied: true,
}
for _, l := range node.groupBy {
if v, ok := sample.Metric[l]; ok {
m[l] = v
if v, ok := sample.Metric.Metric[l]; ok {
m.Set(l, v)
}
}
}
@@ -524,8 +540,8 @@ func (node *VectorSelector) Eval(timestamp clientmodel.Timestamp) Vector {
sampleCandidates := it.GetValueAtTime(timestamp)
samplePair := chooseClosestSample(sampleCandidates, timestamp)
if samplePair != nil {
samples = append(samples, &clientmodel.Sample{
Metric: node.metrics[fp], // TODO: need copy here because downstream can modify!
samples = append(samples, &Sample{
Metric: node.metrics[fp],
Value: samplePair.Value,
Timestamp: timestamp,
})
@@ -737,7 +753,7 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
if keep {
rhsSample.Value = value
if node.opType.shouldDropMetric() {
delete(rhsSample.Metric, clientmodel.MetricNameLabel)
rhsSample.Metric.Delete(clientmodel.MetricNameLabel)
}
result = append(result, rhsSample)
}
@@ -751,7 +767,7 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
if keep {
lhsSample.Value = value
if node.opType.shouldDropMetric() {
delete(lhsSample.Metric, clientmodel.MetricNameLabel)
lhsSample.Metric.Delete(clientmodel.MetricNameLabel)
}
result = append(result, lhsSample)
}
@@ -762,12 +778,12 @@ func (node *VectorArithExpr) Eval(timestamp clientmodel.Timestamp) Vector {
rhs := node.rhs.(VectorNode).Eval(timestamp)
for _, lhsSample := range lhs {
for _, rhsSample := range rhs {
if labelsEqual(lhsSample.Metric, rhsSample.Metric) {
if labelsEqual(lhsSample.Metric.Metric, rhsSample.Metric.Metric) {
value, keep := evalVectorBinop(node.opType, lhsSample.Value, rhsSample.Value)
if keep {
lhsSample.Value = value
if node.opType.shouldDropMetric() {
delete(lhsSample.Metric, clientmodel.MetricNameLabel)
lhsSample.Metric.Delete(clientmodel.MetricNameLabel)
}
result = append(result, lhsSample)
}
@@ -788,21 +804,21 @@ func (node *MatrixSelector) Eval(timestamp clientmodel.Timestamp) Matrix {
}

//// timer := v.stats.GetTimer(stats.GetRangeValuesTime).Start()
sampleSets := []metric.SampleSet{}
sampleStreams := []SampleStream{}
for fp, it := range node.iterators {
samplePairs := it.GetRangeValues(*interval)
if len(samplePairs) == 0 {
continue
}

sampleSet := metric.SampleSet{
Metric: node.metrics[fp], // TODO: need copy here because downstream can modify!
sampleStream := SampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleSets = append(sampleSets, sampleSet)
sampleStreams = append(sampleStreams, sampleStream)
}
//// timer.Stop()
return sampleSets
return sampleStreams
}

// EvalBoundaries implements the MatrixNode interface and returns the
@@ -814,21 +830,21 @@ func (node *MatrixSelector) EvalBoundaries(timestamp clientmodel.Timestamp) Matr
}

//// timer := v.stats.GetTimer(stats.GetBoundaryValuesTime).Start()
sampleSets := []metric.SampleSet{}
sampleStreams := []SampleStream{}
for fp, it := range node.iterators {
samplePairs := it.GetBoundaryValues(*interval)
if len(samplePairs) == 0 {
continue
}

sampleSet := metric.SampleSet{
Metric: node.metrics[fp], // TODO: make copy of metric.
sampleStream := SampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleSets = append(sampleSets, sampleSet)
sampleStreams = append(sampleStreams, sampleStream)
}
//// timer.Stop()
return sampleSets
return sampleStreams
}

// Len implements sort.Interface.
@@ -874,7 +890,7 @@ func NewVectorSelector(m metric.LabelMatchers) *VectorSelector {
return &VectorSelector{
labelMatchers: m,
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.Metric{},
metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{},
}
}

@@ -967,7 +983,7 @@ func NewMatrixSelector(vector *VectorSelector, interval time.Duration) *MatrixSe
labelMatchers: vector.labelMatchers,
interval: interval,
iterators: map[clientmodel.Fingerprint]local.SeriesIterator{},
metrics: map[clientmodel.Fingerprint]clientmodel.Metric{},
metrics: map[clientmodel.Fingerprint]clientmodel.COWMetric{},
}
}

Loading
Oops, something went wrong.

0 comments on commit d72d49f

Please sign in to comment.