Skip to content

Commit

Permalink
storage: Contextify storage interfaces.
Browse files Browse the repository at this point in the history
This is based on prometheus#1997.

This adds contexts to the relevant Storage methods and already passes
PromQL's new per-query context into the storage's query methods.
The immediate motivation supporting multi-tenancy in Frankenstein, but
this could also be used by Prometheus's normal local storage to support
cancellations and timeouts at some point.
  • Loading branch information
juliusv committed Sep 19, 2016
1 parent ed5a0f0 commit c187308
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 102 deletions.
30 changes: 18 additions & 12 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,27 @@ func Main() int {
}

var (
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
queryCtx, cancelQueries = context.WithCancel(context.Background())
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)

ruleManager := rules.NewManager(&rules.ManagerOptions{
SampleAppender: sampleAppender,
Notifier: notifier,
QueryEngine: queryEngine,
QueryCtx: queryCtx,
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
})

flags := map[string]string{}
cfg.fs.VisitAll(func(f *flag.Flag) {
flags[f.Name] = f.Value.String()
})
cfg.web.Context = ctx
cfg.web.Storage = localStorage
cfg.web.QueryEngine = queryEngine
cfg.web.TargetManager = targetManager
cfg.web.RuleManager = ruleManager

version := &web.PrometheusVersion{
cfg.web.Version = &web.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
Expand All @@ -132,7 +133,12 @@ func Main() int {
GoVersion: version.GoVersion,
}

webHandler := web.New(localStorage, queryEngine, queryCtx, targetManager, ruleManager, version, flags, &cfg.web)
cfg.web.Flags = map[string]string{}
cfg.fs.VisitAll(func(f *flag.Flag) {
cfg.web.Flags[f.Name] = f.Value.String()
})

webHandler := web.New(&cfg.web)

reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)

Expand Down Expand Up @@ -205,7 +211,7 @@ func Main() int {

// Shutting down the query engine before the rule manager will cause pending queries
// to be canceled and ensures a quick shutdown of the rule manager.
defer cancelQueries()
defer cancelCtx()

go webHandler.Run()

Expand Down
7 changes: 5 additions & 2 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) {
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) {
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
err := ng.populateIterators(s)
err := ng.populateIterators(ctx, s)
prepareTimer.Stop()
if err != nil {
return nil, err
Expand Down Expand Up @@ -463,19 +463,21 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return resMatrix, nil
}

func (ng *Engine) populateIterators(s *EvalStmt) error {
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error {
var queryErr error
Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
if s.Start.Equal(s.End) {
n.iterators, queryErr = ng.querier.QueryInstant(
ctx,
s.Start.Add(-n.Offset),
StalenessDelta,
n.LabelMatchers...,
)
} else {
n.iterators, queryErr = ng.querier.QueryRange(
ctx,
s.Start.Add(-n.Offset-StalenessDelta),
s.End.Add(-n.Offset),
n.LabelMatchers...,
Expand All @@ -486,6 +488,7 @@ func (ng *Engine) populateIterators(s *EvalStmt) error {
}
case *MatrixSelector:
n.iterators, queryErr = ng.querier.QueryRange(
ctx,
s.Start.Add(-n.Offset-n.Range),
s.End.Add(-n.Offset),
n.LabelMatchers...,
Expand Down
16 changes: 8 additions & 8 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

func TestQueryConcurrency(t *testing.T) {
engine := NewEngine(nil, nil)
ctx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

block := make(chan struct{})
processing := make(chan struct{})
Expand Down Expand Up @@ -77,8 +77,8 @@ func TestQueryTimeout(t *testing.T) {
Timeout: 5 * time.Millisecond,
MaxConcurrentQueries: 20,
})
ctx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

query := engine.newTestQuery(func(ctx context.Context) error {
time.Sleep(50 * time.Millisecond)
Expand All @@ -96,8 +96,8 @@ func TestQueryTimeout(t *testing.T) {

func TestQueryCancel(t *testing.T) {
engine := NewEngine(nil, nil)
ctx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

// Cancel a running query before it completes.
block := make(chan struct{})
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestQueryCancel(t *testing.T) {

func TestEngineShutdown(t *testing.T) {
engine := NewEngine(nil, nil)
ctx, cancelQueries := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(context.Background())

block := make(chan struct{})
processing := make(chan struct{})
Expand All @@ -167,7 +167,7 @@ func TestEngineShutdown(t *testing.T) {
}()

<-processing
cancelQueries()
cancelCtx()
block <- struct{}{}
<-processing

Expand Down
24 changes: 12 additions & 12 deletions promql/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type Test struct {

cmds []testCommand

storage local.Storage
closeStorage func()
queryEngine *Engine
queryCtx context.Context
cancelQueries context.CancelFunc
storage local.Storage
closeStorage func()
queryEngine *Engine
context context.Context
cancelCtx context.CancelFunc
}

// NewTest returns an initialized empty Test.
Expand Down Expand Up @@ -82,9 +82,9 @@ func (t *Test) QueryEngine() *Engine {
return t.queryEngine
}

// Context returns the test's query context.
// Context returns the test's context.
func (t *Test) Context() context.Context {
return t.queryCtx
return t.context
}

// Storage returns the test's storage.
Expand Down Expand Up @@ -471,7 +471,7 @@ func (t *Test) exec(tc testCommand) error {

case *evalCmd:
q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval)
res := q.Exec(t.queryCtx)
res := q.Exec(t.context)
if res.Err != nil {
if cmd.fail {
return nil
Expand All @@ -498,21 +498,21 @@ func (t *Test) clear() {
if t.closeStorage != nil {
t.closeStorage()
}
if t.cancelQueries != nil {
t.cancelQueries()
if t.cancelCtx != nil {
t.cancelCtx()
}

var closer testutil.Closer
t.storage, closer = local.NewTestStorage(t, 2)

t.closeStorage = closer.Close
t.queryEngine = NewEngine(t.storage, nil)
t.queryCtx, t.cancelQueries = context.WithCancel(context.Background())
t.context, t.cancelCtx = context.WithCancel(context.Background())
}

// Close closes resources associated with the Test.
func (t *Test) Close() {
t.cancelQueries()
t.cancelCtx()
t.closeStorage()
}

Expand Down
6 changes: 3 additions & 3 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ const resolvedRetention = 15 * time.Minute

// eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx context.Context, externalURLPath string) (model.Vector, error) {
func (r *AlertingRule) eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) {
query, err := engine.NewInstantQuery(r.vector.String(), ts)
if err != nil {
return nil, err
}
res, err := query.Exec(queryCtx).Vector()
res, err := query.Exec(ctx).Vector()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,12 +185,12 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx conte

expand := func(text model.LabelValue) model.LabelValue {
tmpl := template.NewTemplateExpander(
ctx,
defs+string(text),
"__alert_"+r.Name(),
tmplData,
ts,
engine,
queryCtx,
externalURLPath,
)
result, err := tmpl.Expand()
Expand Down
9 changes: 4 additions & 5 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"sync"
"time"

"golang.org/x/net/context"

html_template "html/template"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier"
Expand Down Expand Up @@ -107,7 +106,7 @@ const (
type Rule interface {
Name() string
// eval evaluates the rule, including any associated recording or alerting actions.
eval(model.Time, *promql.Engine, context.Context, string) (model.Vector, error)
eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error)
// String returns a human-readable string representation of the rule.
String() string
// HTMLSnippet returns a human-readable string representation of the rule,
Expand Down Expand Up @@ -258,7 +257,7 @@ func (g *Group) eval() {

evalTotal.WithLabelValues(rtyp).Inc()

vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.QueryCtx, g.opts.ExternalURL.Path)
vector, err := rule.eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
if err != nil {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
Expand Down Expand Up @@ -343,7 +342,7 @@ type Manager struct {
type ManagerOptions struct {
ExternalURL *url.URL
QueryEngine *promql.Engine
QueryCtx context.Context
Context context.Context
Notifier *notifier.Notifier
SampleAppender storage.SampleAppender
}
Expand Down
2 changes: 1 addition & 1 deletion rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) {
for i, test := range tests {
evalTime := model.Time(0).Add(test.time)

res, err := rule.eval(evalTime, suite.QueryEngine(), suite.Context(), "")
res, err := rule.eval(suite.Context(), evalTime, suite.QueryEngine(), "")
if err != nil {
t.Fatalf("Error during alerting rule evaluation: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions rules/recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (rule RecordingRule) Name() string {
}

// eval evaluates the rule and then overrides the metric names and labels accordingly.
func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, queryCtx context.Context, _ string) (model.Vector, error) {
func (rule RecordingRule) eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) {
query, err := engine.NewInstantQuery(rule.vector.String(), timestamp)
if err != nil {
return nil, err
}

var (
result = query.Exec(queryCtx)
result = query.Exec(ctx)
vector model.Vector
)
if result.Err != nil {
Expand Down
6 changes: 3 additions & 3 deletions rules/recording_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func TestRuleEval(t *testing.T) {
storage, closer := local.NewTestStorage(t, 2)
defer closer.Close()
engine := promql.NewEngine(storage, nil)
queryCtx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

now := model.Now()

Expand Down Expand Up @@ -63,7 +63,7 @@ func TestRuleEval(t *testing.T) {

for _, test := range suite {
rule := NewRecordingRule(test.name, test.expr, test.labels)
result, err := rule.eval(now, engine, queryCtx, "")
result, err := rule.eval(ctx, now, engine, "")
if err != nil {
t.Fatalf("Error evaluating %s", test.name)
}
Expand Down
13 changes: 7 additions & 6 deletions storage/local/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
Expand All @@ -40,7 +41,7 @@ type Storage interface {

// Drop all time series associated with the given label matchers. Returns
// the number series that were dropped.
DropMetricsForLabelMatchers(...*metric.LabelMatcher) (int, error)
DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Expand All @@ -59,10 +60,10 @@ type Querier interface {
// QueryRange returns a list of series iterators for the selected
// time range and label matchers. The iterators need to be closed
// after usage.
QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
// QueryInstant returns a list of series iterators for the selected
// instant and label matchers. The iterators need to be closed after usage.
QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
// MetricsForLabelMatchers returns the metrics from storage that satisfy
// the given sets of label matchers. Each set of matchers must contain at
// least one label matcher that does not match the empty string. Otherwise,
Expand All @@ -72,14 +73,14 @@ type Querier interface {
// storage to optimize the search. The storage MAY exclude metrics that
// have no samples in the specified interval from the returned map. In
// doubt, specify model.Earliest for from and model.Latest for through.
MetricsForLabelMatchers(from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error)
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error)
// LastSampleForLabelMatchers returns the last samples that have been
// ingested for the time series matching the given set of label matchers.
// The label matching behavior is the same as in MetricsForLabelMatchers.
// All returned samples are between the specified cutoff time and now.
LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error)
LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error)
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) (model.LabelValues, error)
LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error)
}

// SeriesIterator enables efficient access of sample values in a series. Its
Expand Down
Loading

0 comments on commit c187308

Please sign in to comment.