Skip to content

Commit

Permalink
Merge pull request prometheus#1759 from prometheus/separate-querier
Browse files Browse the repository at this point in the history
Separate query interface out of local.Storage.
  • Loading branch information
fabxc authored Jun 23, 2016
2 parents 0e281f5 + b7b6717 commit 6f6dddb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 35 deletions.
2 changes: 1 addition & 1 deletion promql/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// from the storage. It is bound to a context that allows cancellation and timing out.
type Analyzer struct {
// The storage from which to query data.
Storage local.Storage
Storage local.Querier
// The expression being analyzed.
Expr Expr
// The time range for evaluation of Expr.
Expand Down
4 changes: 2 additions & 2 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func contextDone(ctx context.Context, env string) error {
// It is connected to a storage.
type Engine struct {
// The storage on which the engine operates.
storage local.Storage
storage local.Querier

// The base context for all queries and its cancellation function.
baseCtx context.Context
Expand All @@ -230,7 +230,7 @@ type Engine struct {
}

// NewEngine returns a new engine.
func NewEngine(storage local.Storage, o *EngineOptions) *Engine {
func NewEngine(storage local.Querier, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}
Expand Down
55 changes: 30 additions & 25 deletions storage/local/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,45 @@ package local
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
)

// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
prometheus.Collector
// Append stores a sample in the Storage. Multiple samples for the same
// fingerprint need to be submitted in chronological order, from oldest
// to newest. When Append has returned, the appended sample might not be
// queryable immediately. (Use WaitForIndexing to wait for complete
// processing.) The implementation might remove labels with empty value
// from the provided Sample as those labels are considered equivalent to
// a label not present at all.
Append(*model.Sample) error
// NeedsThrottling returns true if the Storage has too many chunks in memory
Querier

// This SampleAppender needs multiple samples for the same fingerprint to be
// submitted in chronological order, from oldest to newest. When Append has
// returned, the appended sample might not be queryable immediately. (Use
// WaitForIndexing to wait for complete processing.) The implementation might
// remove labels with empty value from the provided Sample as those labels
// are considered equivalent to a label not present at all.
//
// Appending is throttled if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence.
NeedsThrottling() bool
storage.SampleAppender

// Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}

// Querier allows querying a time series storage.
type Querier interface {
// NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query.
NewPreloader() Preloader
Expand All @@ -56,19 +74,6 @@ type Storage interface {
LastSampleForFingerprint(model.Fingerprint) model.Sample
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues
// Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}

// SeriesIterator enables efficient access of sample values in a series. Its
Expand Down
2 changes: 1 addition & 1 deletion storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type MemorySeriesStorageOptions struct {

// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage {
s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(o.NumMutexes),

Expand Down
8 changes: 4 additions & 4 deletions storage/local/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func TestLoop(t *testing.T) {
storage.Append(s)
}
storage.WaitForIndexing()
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(model.Metric{}.FastFingerprint())
series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint())
cdsBefore := len(series.chunkDescs)
time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
cdsAfter := len(series.chunkDescs)
Expand Down Expand Up @@ -1497,12 +1497,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
for _, sample := range samples[start:middle] {
s.Append(sample)
}
verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle])
verifyStorageRandom(b, s, samples[:middle])
for _, sample := range samples[middle:end] {
s.Append(sample)
}
verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end])
verifyStorageSequential(b, s.(*memorySeriesStorage), samples)
verifyStorageRandom(b, s, samples[:end])
verifyStorageSequential(b, s, samples)
}
}

Expand Down
4 changes: 2 additions & 2 deletions storage/local/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
SyncStrategy: Adaptive,
}
storage := NewMemorySeriesStorage(o)
storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest
storage.archiveHighWatermark = model.Latest
if err := storage.Start(); err != nil {
directory.Close()
t.Fatalf("Error creating storage: %s", err)
Expand All @@ -63,5 +63,5 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
directory: directory,
}

return storage.(*memorySeriesStorage), closer
return storage, closer
}

0 comments on commit 6f6dddb

Please sign in to comment.