Skip to content

Commit

Permalink
Separate storage implementation from interfaces.
Browse files Browse the repository at this point in the history
This was initially motivated by wanting to distribute the rule checker
tool under `tools/rule_checker`. However, this was not possible without
also distributing the LevelDB dynamic libraries because the tool
transitively depended on Levigo:

rule checker -> query layer -> tiered storage layer -> leveldb

This change separates external storage interfaces from the
implementation (tiered storage, leveldb storage, memory storage) by
putting them into separate packages:

- storage/metric: public, implementation-agnostic interfaces
- storage/metric/tiered: tiered storage implementation, including memory
                         and LevelDB storage.

I initially also considered splitting up the implementation into
separate packages for tiered storage, memory storage, and LevelDB
storage, but these are currently so intertwined that it would be another
major project in itself.

The query layers and most other parts of Prometheus now have notion of
the storage implementation anymore and just use whatever implementation
they get passed in via interfaces.

The rule_checker is now a static binary :)

Change-Id: I793bbf631a8648ca31790e7e772ecf9c2b92f7a0
  • Loading branch information
juliusv committed Apr 16, 2014
1 parent 3e969a8 commit 01f652c
Show file tree
Hide file tree
Showing 47 changed files with 748 additions and 678 deletions.
16 changes: 8 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/prometheus/prometheus/notification"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/tiered"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/prometheus/web"
Expand Down Expand Up @@ -85,10 +85,10 @@ type prometheus struct {
ruleManager rules.RuleManager
targetManager retrieval.TargetManager
notifications chan notification.NotificationReqs
storage *metric.TieredStorage
storage *tiered.TieredStorage
remoteTSDBQueue *remote.TSDBQueueManager

curationState metric.CurationStateUpdater
curationState tiered.CurationStateUpdater

closeOnce sync.Once
}
Expand Down Expand Up @@ -124,13 +124,13 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
return nil
}

processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{
processor := tiered.NewCompactionProcessor(&tiered.CompactionProcessorOptions{
MaximumMutationPoolBatch: groupSize * 3,
MinimumGroupSize: groupSize,
})
defer processor.Close()

curator := metric.NewCurator(&metric.CuratorOptions{
curator := tiered.NewCurator(&tiered.CuratorOptions{
Stop: p.stopBackgroundOperations,

ViewQueue: p.storage.ViewQueue,
Expand Down Expand Up @@ -158,12 +158,12 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
return nil
}

processor := metric.NewDeletionProcessor(&metric.DeletionProcessorOptions{
processor := tiered.NewDeletionProcessor(&tiered.DeletionProcessorOptions{
MaximumMutationPoolBatch: batchSize,
})
defer processor.Close()

curator := metric.NewCurator(&metric.CuratorOptions{
curator := tiered.NewCurator(&tiered.CuratorOptions{
Stop: p.stopBackgroundOperations,

ViewQueue: p.storage.ViewQueue,
Expand Down Expand Up @@ -240,7 +240,7 @@ func main() {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}

ts, err := metric.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, *arenaFlushInterval, *arenaTTL, *metricsStoragePath)
ts, err := tiered.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, *arenaFlushInterval, *arenaTTL, *metricsStoragePath)
if err != nil {
glog.Fatal("Error opening storage: ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ func (rule *AlertingRule) Name() string {
return rule.name
}

func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule *AlertingRule) EvalRaw(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
}

func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule *AlertingRule) Eval(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
// Get the raw value of the rule expression.
exprResult, err := rule.EvalRaw(timestamp, storage)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions rules/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func labelsToKey(labels clientmodel.Metric) uint64 {
}

// EvalVectorInstant evaluates a VectorNode with an instant query.
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (vector Vector, err error) {
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (vector Vector, err error) {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
if err != nil {
return
Expand All @@ -367,7 +367,7 @@ func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage
}

// EvalVectorRange evaluates a VectorNode with a range query.
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (Matrix, error) {
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (Matrix, error) {
// Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON.
matrix := Matrix{}
Expand Down
2 changes: 1 addition & 1 deletion rules/ast/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
}

// EvalToString evaluates the given node into a string of the given format.
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage *metric.TieredStorage, queryStats *stats.TimerGroup) string {
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) string {
viewTimer := queryStats.GetTimer(stats.TotalViewBuildingTime).Start()
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage, queryStats)
viewTimer.Stop()
Expand Down
17 changes: 8 additions & 9 deletions rules/ast/query_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ type QueryAnalyzer struct {
IntervalRanges IntervalRangeMap
// The underlying storage to which the query will be applied. Needed for
// extracting timeseries fingerprint information during query analysis.
storage *metric.TieredStorage
storage metric.Persistence
}

// NewQueryAnalyzer returns a pointer to a newly instantiated
// QueryAnalyzer. The storage is needed to extract timeseries
// fingerprint information during query analysis.
func NewQueryAnalyzer(storage *metric.TieredStorage) *QueryAnalyzer {
func NewQueryAnalyzer(storage metric.Persistence) *QueryAnalyzer {
return &QueryAnalyzer{
FullRanges: FullRangeMap{},
IntervalRanges: IntervalRangeMap{},
Expand Down Expand Up @@ -104,14 +104,14 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
Walk(analyzer, node)
}

func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (*viewAdapter, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node)
analyzeTimer.Stop()

requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder()
viewBuilder := storage.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
viewBuilder.GetMetricRange(&fingerprint, timestamp.Add(-rangeDuration), timestamp)
}
Expand All @@ -121,23 +121,22 @@ func viewAdapterForInstantQuery(node Node, timestamp clientmodel.Timestamp, stor
requestBuildTimer.Stop()

buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
// BUG(julius): Clear Law of Demeter violation.
view, err := analyzer.storage.MakeView(viewBuilder, 60*time.Second, queryStats)
view, err := viewBuilder.Execute(60*time.Second, queryStats)
buildTimer.Stop()
if err != nil {
return nil, err
}
return NewViewAdapter(view, storage, queryStats), nil
}

func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage *metric.TieredStorage, queryStats *stats.TimerGroup) (*viewAdapter, error) {
func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage metric.PreloadingPersistence, queryStats *stats.TimerGroup) (*viewAdapter, error) {
analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node)
analyzeTimer.Stop()

requestBuildTimer := queryStats.GetTimer(stats.ViewRequestBuildTime).Start()
viewBuilder := metric.NewViewRequestBuilder()
viewBuilder := storage.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges {
if interval < rangeDuration {
viewBuilder.GetMetricRange(&fingerprint, start.Add(-rangeDuration), end)
Expand All @@ -151,7 +150,7 @@ func viewAdapterForRangeQuery(node Node, start clientmodel.Timestamp, end client
requestBuildTimer.Stop()

buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second, queryStats)
view, err := viewBuilder.Execute(time.Duration(60)*time.Second, queryStats)
buildTimer.Stop()
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions rules/ast/view_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type viewAdapter struct {
stalenessPolicy StalenessPolicy
// AST-global storage to use for operations that are not supported by views
// (i.e. fingerprint->metric lookups).
storage *metric.TieredStorage
storage metric.Persistence
// The materialized view which contains all timeseries data required for
// executing a query.
view metric.View
Expand Down Expand Up @@ -184,7 +184,7 @@ func (v *viewAdapter) GetRangeValues(fingerprints clientmodel.Fingerprints, inte

// NewViewAdapter returns an initialized view adapter with a default
// staleness policy (based on the --defaultStalenessDelta flag).
func NewViewAdapter(view metric.View, storage *metric.TieredStorage, queryStats *stats.TimerGroup) *viewAdapter {
func NewViewAdapter(view metric.View, storage metric.Persistence, queryStats *stats.TimerGroup) *viewAdapter {
stalenessPolicy := StalenessPolicy{
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
}
Expand Down
2 changes: 1 addition & 1 deletion rules/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
return vector
}

func storeMatrix(storage metric.TieredStorage, matrix ast.Matrix) (err error) {
func storeMatrix(storage metric.Persistence, matrix ast.Matrix) (err error) {
pendingSamples := clientmodel.Samples{}
for _, sampleSet := range matrix {
for _, sample := range sampleSet.Values {
Expand Down
4 changes: 2 additions & 2 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type ruleManager struct {
done chan bool

interval time.Duration
storage *metric.TieredStorage
storage metric.PreloadingPersistence

results chan<- *extraction.Result
notifications chan<- notification.NotificationReqs
Expand All @@ -59,7 +59,7 @@ type ruleManager struct {

type RuleManagerOptions struct {
EvaluationInterval time.Duration
Storage *metric.TieredStorage
Storage metric.PreloadingPersistence

Notifications chan<- notification.NotificationReqs
Results chan<- *extraction.Result
Expand Down
4 changes: 2 additions & 2 deletions rules/recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type RecordingRule struct {

func (rule RecordingRule) Name() string { return rule.name }

func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule RecordingRule) EvalRaw(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
return ast.EvalVectorInstant(rule.vector, timestamp, storage, stats.NewTimerGroup())
}

func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error) {
func (rule RecordingRule) Eval(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error) {
// Get the raw value of the rule expression.
vector, err := rule.EvalRaw(timestamp, storage)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type Rule interface {
Name() string
// EvalRaw evaluates the rule's vector expression without triggering any
// other actions, like recording or alerting.
EvalRaw(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error)
EvalRaw(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error)
// Eval evaluates the rule, including any associated recording or alerting actions.
Eval(timestamp clientmodel.Timestamp, storage *metric.TieredStorage) (ast.Vector, error)
Eval(timestamp clientmodel.Timestamp, storage metric.PreloadingPersistence) (ast.Vector, error)
// ToDotGraph returns a Graphviz dot graph of the rule.
ToDotGraph() string
// String returns a human-readable string representation of the rule.
Expand Down
12 changes: 6 additions & 6 deletions rules/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/metric/tiered"
"github.com/prometheus/prometheus/utility/test"
)

Expand Down Expand Up @@ -53,7 +53,7 @@ func vectorComparisonString(expected []string, actual []string) string {
}

type testTieredStorageCloser struct {
storage *metric.TieredStorage
storage *tiered.TieredStorage
directory test.Closer
}

Expand All @@ -64,10 +64,10 @@ func (t testTieredStorageCloser) Close() {

// This is copied from storage/metric/helpers_test.go, which is unfortunate but
// presently required to make things work.
func NewTestTieredStorage(t test.Tester) (storage *metric.TieredStorage, closer test.Closer) {
func NewTestTieredStorage(t test.Tester) (storage *tiered.TieredStorage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := metric.NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path())
storage, err := tiered.NewTieredStorage(2500, 1000, 5*time.Second, 0*time.Second, directory.Path())

if err != nil {
if storage != nil {
Expand All @@ -91,12 +91,12 @@ func NewTestTieredStorage(t test.Tester) (storage *metric.TieredStorage, closer
return
}

func newTestStorage(t test.Tester) (storage *metric.TieredStorage, closer test.Closer) {
func newTestStorage(t test.Tester) (storage *tiered.TieredStorage, closer test.Closer) {
storage, closer = NewTestTieredStorage(t)
if storage == nil {
t.Fatal("storage == nil")
}
storeMatrix(*storage, testMatrix)
storeMatrix(storage, testMatrix)
return
}

Expand Down
1 change: 0 additions & 1 deletion storage/metric/.gitignore

This file was deleted.

Loading

0 comments on commit 01f652c

Please sign in to comment.