Skip to content

Commit

Permalink
promql: remove global flags
Browse files Browse the repository at this point in the history
  • Loading branch information
fabxc committed Jun 15, 2015
1 parent de66e32 commit fe301d7
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 29 deletions.
4 changes: 2 additions & 2 deletions promql/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
err = p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta)
err = p.PreloadRange(fp, start.Add(-rangeDuration), end, StalenessDelta)
if err != nil {
return nil, err
}
Expand All @@ -146,7 +146,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil {
return nil, err
}
err = p.PreloadRange(fp, start, end, *stalenessDelta)
err = p.PreloadRange(fp, start, end, StalenessDelta)
if err != nil {
return nil, err
}
Expand Down
39 changes: 27 additions & 12 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package promql

import (
"flag"
"fmt"
"math"
"runtime"
Expand All @@ -30,12 +29,6 @@ import (
"github.com/prometheus/prometheus/util/stats"
)

var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.")
)

// SampleStream is a stream of Values belonging to an attached COWMetric.
type SampleStream struct {
Metric clientmodel.COWMetric `json:"metric"`
Expand Down Expand Up @@ -249,19 +242,37 @@ type Engine struct {
cancelQueries func()
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate

options *EngineOptions
}

// NewEngine returns a new engine.
func NewEngine(storage local.Storage) *Engine {
func NewEngine(storage local.Storage, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}
ctx, cancel := context.WithCancel(context.Background())
return &Engine{
storage: storage,
baseCtx: ctx,
cancelQueries: cancel,
gate: newQueryGate(*maxConcurrentQueries),
gate: newQueryGate(o.MaxConcurrentQueries),
options: o,
}
}

// EngineOptions contains configuration parameters for an Engine.
type EngineOptions struct {
MaxConcurrentQueries int
Timeout time.Duration
}

// DefaultEngineOptions are the default engine options.
var DefaultEngineOptions = &EngineOptions{
MaxConcurrentQueries: 20,
Timeout: 2 * time.Minute,
}

// Stop the engine and cancel all running queries.
func (ng *Engine) Stop() {
ng.cancelQueries()
Expand Down Expand Up @@ -328,7 +339,7 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query {
func (ng *Engine) exec(q *query) (Value, error) {
const env = "query execution"

ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout)
ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout)
q.cancel = cancel

queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
Expand Down Expand Up @@ -1107,6 +1118,10 @@ func shouldDropMetricName(op itemType) bool {
}
}

// StalenessDelta determines the time since the last sample after which a time
// series is considered stale.
var StalenessDelta = 5 * time.Minute

// chooseClosestSample chooses the closest sample of a list of samples
// surrounding a given target time. If samples are found both before and after
// the target time, the sample value is interpolated between these. Otherwise,
Expand All @@ -1119,7 +1134,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp)
// Samples before target time.
if delta < 0 {
// Ignore samples outside of staleness policy window.
if -delta > *stalenessDelta {
if -delta > StalenessDelta {
continue
}
// Ignore samples that are farther away than what we've seen before.
Expand All @@ -1133,7 +1148,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp)
// Samples after target time.
if delta >= 0 {
// Ignore samples outside of staleness policy window.
if delta > *stalenessDelta {
if delta > StalenessDelta {
continue
}
// Ignore samples that are farther away than samples we've seen before.
Expand Down
21 changes: 9 additions & 12 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var noop = testStmt(func(context.Context) error {
})

func TestQueryConcurreny(t *testing.T) {
engine := NewEngine(nil)
engine := NewEngine(nil, nil)
defer engine.Stop()

block := make(chan struct{})
Expand All @@ -24,7 +24,7 @@ func TestQueryConcurreny(t *testing.T) {
return nil
})

for i := 0; i < *maxConcurrentQueries; i++ {
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
q := engine.newTestQuery(f1)
go q.Exec()
select {
Expand Down Expand Up @@ -56,19 +56,16 @@ func TestQueryConcurreny(t *testing.T) {
}

// Terminate remaining queries.
for i := 0; i < *maxConcurrentQueries; i++ {
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
block <- struct{}{}
}
}

func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond
defer func() {
// Restore default query timeout
*defaultQueryTimeout = 2 * time.Minute
}()

engine := NewEngine(nil)
engine := NewEngine(nil, &EngineOptions{
Timeout: 5 * time.Millisecond,
MaxConcurrentQueries: 20,
})
defer engine.Stop()

f1 := testStmt(func(context.Context) error {
Expand All @@ -90,7 +87,7 @@ func TestQueryTimeout(t *testing.T) {
}

func TestQueryCancel(t *testing.T) {
engine := NewEngine(nil)
engine := NewEngine(nil, nil)
defer engine.Stop()

// As for timeouts, cancellation is only checked at designated points. We ensure
Expand Down Expand Up @@ -132,7 +129,7 @@ func TestQueryCancel(t *testing.T) {
}

func TestEngineShutdown(t *testing.T) {
engine := NewEngine(nil)
engine := NewEngine(nil, nil)

handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become
Expand Down
2 changes: 1 addition & 1 deletion promql/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (t *Test) clear() {
t.storage, closer = local.NewTestStorage(t, 1)

t.closeStorage = closer.Close
t.queryEngine = NewEngine(t.storage)
t.queryEngine = NewEngine(t.storage, nil)
}

func (t *Test) Close() {
Expand Down
2 changes: 1 addition & 1 deletion rules/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestAlertingRule(t *testing.T) {

storeMatrix(storage, testMatrix)

engine := promql.NewEngine(storage)
engine := promql.NewEngine(storage, nil)
defer engine.Stop()

expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
Expand Down
2 changes: 1 addition & 1 deletion template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestTemplateExpansion(t *testing.T) {
})
storage.WaitForIndexing()

engine := promql.NewEngine(storage)
engine := promql.NewEngine(storage, nil)

for i, s := range scenarios {
var result string
Expand Down

0 comments on commit fe301d7

Please sign in to comment.