Skip to content

Commit

Permalink
promql: Allow per-query contexts.
Browse files Browse the repository at this point in the history
For Weaveworks' Frankenstein, we need to support multitenancy. In
Frankenstein, we initially solved this without modifying the promql
package at all: we constructed a new promql.Engine for every
query and injected a storage implementation into that engine which would
be primed to only collect data for a given user.

This is problematic to upstream, however. Prometheus assumes that there
is only one engine: the query concurrency gate is part of the engine,
and the engine contains one central cancellable context to shut down all
queries. Also, creating a new engine for every query seems like overkill.

Thus, we want to be able to pass per-query contexts into a single engine.

This change gets rid of the promql.Engine's built-in base context and
allows passing in a per-query context instead. Central cancellation of
all queries is still possible by deriving all passed-in contexts from
one central one, but this is now the responsibility of the caller. The
central query context is now created in main() and passed into the
relevant components (web handler / API, rule manager).

In a next step, the per-query context would have to be passed to the
storage implementation, so that the storage can implement multi-tenancy
or other features based on the contextual information.
  • Loading branch information
juliusv committed Sep 19, 2016
1 parent c9c2663 commit ed5a0f0
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 66 deletions.
14 changes: 9 additions & 5 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/version"
"golang.org/x/net/context"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -102,15 +104,17 @@ func Main() int {
}

var (
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
queryCtx, cancelQueries = context.WithCancel(context.Background())
)

ruleManager := rules.NewManager(&rules.ManagerOptions{
SampleAppender: sampleAppender,
Notifier: notifier,
QueryEngine: queryEngine,
QueryCtx: queryCtx,
ExternalURL: cfg.web.ExternalURL,
})

Expand All @@ -128,7 +132,7 @@ func Main() int {
GoVersion: version.GoVersion,
}

webHandler := web.New(localStorage, queryEngine, targetManager, ruleManager, version, flags, &cfg.web)
webHandler := web.New(localStorage, queryEngine, queryCtx, targetManager, ruleManager, version, flags, &cfg.web)

reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)

Expand Down Expand Up @@ -201,7 +205,7 @@ func Main() int {

// Shutting down the query engine before the rule manager will cause pending queries
// to be canceled and ensures a quick shutdown of the rule manager.
defer queryEngine.Stop()
defer cancelQueries()

go webHandler.Run()

Expand Down
31 changes: 9 additions & 22 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele
// it is associated with.
type Query interface {
// Exec processes the query and
Exec() *Result
Exec(ctx context.Context) *Result
// Statement returns the parsed statement of the query.
Statement() Statement
// Stats returns statistics about the lifetime of the query.
Expand Down Expand Up @@ -192,8 +192,8 @@ func (q *query) Cancel() {
}

// Exec implements the Query interface.
func (q *query) Exec() *Result {
res, err := q.ng.exec(q)
func (q *query) Exec(ctx context.Context) *Result {
res, err := q.ng.exec(ctx, q)
return &Result{Err: err, Value: res}
}

Expand All @@ -220,13 +220,8 @@ func contextDone(ctx context.Context, env string) error {
type Engine struct {
// The querier on which the engine operates.
querier local.Querier

// The base context for all queries and its cancellation function.
baseCtx context.Context
cancelQueries func()
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate

gate *queryGate
options *EngineOptions
}

Expand All @@ -235,13 +230,10 @@ func NewEngine(querier local.Querier, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}
ctx, cancel := context.WithCancel(context.Background())
return &Engine{
querier: querier,
baseCtx: ctx,
cancelQueries: cancel,
gate: newQueryGate(o.MaxConcurrentQueries),
options: o,
querier: querier,
gate: newQueryGate(o.MaxConcurrentQueries),
options: o,
}
}

Expand All @@ -257,11 +249,6 @@ var DefaultEngineOptions = &EngineOptions{
Timeout: 2 * time.Minute,
}

// Stop the engine and cancel all running queries.
func (ng *Engine) Stop() {
ng.cancelQueries()
}

// NewInstantQuery returns an evaluation query for the given expression at the given time.
func (ng *Engine) NewInstantQuery(qs string, ts model.Time) (Query, error) {
expr, err := ParseExpr(qs)
Expand Down Expand Up @@ -326,8 +313,8 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(q *query) (model.Value, error) {
ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout)
func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) {
ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout)
q.cancel = cancel

queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
Expand Down
30 changes: 17 additions & 13 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (

func TestQueryConcurrency(t *testing.T) {
engine := NewEngine(nil, nil)
defer engine.Stop()
ctx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()

block := make(chan struct{})
processing := make(chan struct{})
Expand All @@ -36,7 +37,7 @@ func TestQueryConcurrency(t *testing.T) {

for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
q := engine.newTestQuery(f)
go q.Exec()
go q.Exec(ctx)
select {
case <-processing:
// Expected.
Expand All @@ -46,7 +47,7 @@ func TestQueryConcurrency(t *testing.T) {
}

q := engine.newTestQuery(f)
go q.Exec()
go q.Exec(ctx)

select {
case <-processing:
Expand Down Expand Up @@ -76,14 +77,15 @@ func TestQueryTimeout(t *testing.T) {
Timeout: 5 * time.Millisecond,
MaxConcurrentQueries: 20,
})
defer engine.Stop()
ctx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()

query := engine.newTestQuery(func(ctx context.Context) error {
time.Sleep(50 * time.Millisecond)
return contextDone(ctx, "test statement execution")
})

res := query.Exec()
res := query.Exec(ctx)
if res.Err == nil {
t.Fatalf("expected timeout error but got none")
}
Expand All @@ -94,7 +96,8 @@ func TestQueryTimeout(t *testing.T) {

func TestQueryCancel(t *testing.T) {
engine := NewEngine(nil, nil)
defer engine.Stop()
ctx, cancelQueries := context.WithCancel(context.Background())
defer cancelQueries()

// Cancel a running query before it completes.
block := make(chan struct{})
Expand All @@ -109,7 +112,7 @@ func TestQueryCancel(t *testing.T) {
var res *Result

go func() {
res = query1.Exec()
res = query1.Exec(ctx)
processing <- struct{}{}
}()

Expand All @@ -131,14 +134,15 @@ func TestQueryCancel(t *testing.T) {
})

query2.Cancel()
res = query2.Exec()
res = query2.Exec(ctx)
if res.Err != nil {
t.Fatalf("unexpeceted error on executing query2: %s", res.Err)
t.Fatalf("unexpected error on executing query2: %s", res.Err)
}
}

func TestEngineShutdown(t *testing.T) {
engine := NewEngine(nil, nil)
ctx, cancelQueries := context.WithCancel(context.Background())

block := make(chan struct{})
processing := make(chan struct{})
Expand All @@ -158,12 +162,12 @@ func TestEngineShutdown(t *testing.T) {

var res *Result
go func() {
res = query1.Exec()
res = query1.Exec(ctx)
processing <- struct{}{}
}()

<-processing
engine.Stop()
cancelQueries()
block <- struct{}{}
<-processing

Expand All @@ -181,9 +185,9 @@ func TestEngineShutdown(t *testing.T) {

// The second query is started after the engine shut down. It must
// be canceled immediately.
res2 := query2.Exec()
res2 := query2.Exec(ctx)
if res2.Err == nil {
t.Fatalf("expected error on querying shutdown engine but got none")
t.Fatalf("expected error on querying with canceled context but got none")
}
if _, ok := res2.Err.(ErrQueryCanceled); !ok {
t.Fatalf("expected cancelation error, got %q", res2.Err)
Expand Down
23 changes: 16 additions & 7 deletions promql/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
Expand All @@ -49,9 +50,11 @@ type Test struct {

cmds []testCommand

storage local.Storage
closeStorage func()
queryEngine *Engine
storage local.Storage
closeStorage func()
queryEngine *Engine
queryCtx context.Context
cancelQueries context.CancelFunc
}

// NewTest returns an initialized empty Test.
Expand Down Expand Up @@ -79,6 +82,11 @@ func (t *Test) QueryEngine() *Engine {
return t.queryEngine
}

// Context returns the test's query context.
func (t *Test) Context() context.Context {
return t.queryCtx
}

// Storage returns the test's storage.
func (t *Test) Storage() local.Storage {
return t.storage
Expand Down Expand Up @@ -463,7 +471,7 @@ func (t *Test) exec(tc testCommand) error {

case *evalCmd:
q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval)
res := q.Exec()
res := q.Exec(t.queryCtx)
if res.Err != nil {
if cmd.fail {
return nil
Expand All @@ -490,20 +498,21 @@ func (t *Test) clear() {
if t.closeStorage != nil {
t.closeStorage()
}
if t.queryEngine != nil {
t.queryEngine.Stop()
if t.cancelQueries != nil {
t.cancelQueries()
}

var closer testutil.Closer
t.storage, closer = local.NewTestStorage(t, 2)

t.closeStorage = closer.Close
t.queryEngine = NewEngine(t.storage, nil)
t.queryCtx, t.cancelQueries = context.WithCancel(context.Background())
}

// Close closes resources associated with the Test.
func (t *Test) Close() {
t.queryEngine.Stop()
t.cancelQueries()
t.closeStorage()
}

Expand Down
7 changes: 5 additions & 2 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"

html_template "html/template"

"github.com/prometheus/common/log"
Expand Down Expand Up @@ -146,12 +148,12 @@ const resolvedRetention = 15 * time.Minute

// eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) {
func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx context.Context, externalURLPath string) (model.Vector, error) {
query, err := engine.NewInstantQuery(r.vector.String(), ts)
if err != nil {
return nil, err
}
res, err := query.Exec().Vector()
res, err := query.Exec(queryCtx).Vector()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,6 +190,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPat
tmplData,
ts,
engine,
queryCtx,
externalURLPath,
)
result, err := tmpl.Expand()
Expand Down
7 changes: 5 additions & 2 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"

html_template "html/template"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -105,7 +107,7 @@ const (
type Rule interface {
Name() string
// eval evaluates the rule, including any associated recording or alerting actions.
eval(model.Time, *promql.Engine, string) (model.Vector, error)
eval(model.Time, *promql.Engine, context.Context, string) (model.Vector, error)
// String returns a human-readable string representation of the rule.
String() string
// HTMLSnippet returns a human-readable string representation of the rule,
Expand Down Expand Up @@ -256,7 +258,7 @@ func (g *Group) eval() {

evalTotal.WithLabelValues(rtyp).Inc()

vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.QueryCtx, g.opts.ExternalURL.Path)
if err != nil {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
Expand Down Expand Up @@ -341,6 +343,7 @@ type Manager struct {
type ManagerOptions struct {
ExternalURL *url.URL
QueryEngine *promql.Engine
QueryCtx context.Context
Notifier *notifier.Notifier
SampleAppender storage.SampleAppender
}
Expand Down
2 changes: 1 addition & 1 deletion rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) {
for i, test := range tests {
evalTime := model.Time(0).Add(test.time)

res, err := rule.eval(evalTime, suite.QueryEngine(), "")
res, err := rule.eval(evalTime, suite.QueryEngine(), suite.Context(), "")
if err != nil {
t.Fatalf("Error during alerting rule evaluation: %s", err)
}
Expand Down
Loading

0 comments on commit ed5a0f0

Please sign in to comment.