Skip to content

Commit

Permalink
Merge pull request prometheus#2047 from prometheus/write-relabel
Browse files Browse the repository at this point in the history
Add support for remote write relabelling.
  • Loading branch information
brian-brazil authored Oct 5, 2016
2 parents 9172728 + 7760564 commit 6e8f87a
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 31 deletions.
13 changes: 7 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ type Config struct {
RuleFiles []string `yaml:"rule_files,omitempty"`
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`

RemoteWriteConfig []RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"`

// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
Expand Down Expand Up @@ -1075,11 +1075,12 @@ func (re Regexp) MarshalYAML() (interface{}, error) {

// RemoteWriteConfig is the configuration for remote storage.
type RemoteWriteConfig struct {
URL URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
ProxyURL URL `yaml:"proxy_url,omitempty"`
URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
ProxyURL URL `yaml:"proxy_url,omitempty"`
WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"`

// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
Expand Down
13 changes: 13 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ var expectedConf = &Config{
"testdata/my/*.rules",
},

RemoteWriteConfig: RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
WriteRelabelConfigs: []*RelabelConfig{
{
SourceLabels: model.LabelNames{"__name__"},
Separator: ";",
Regex: MustNewRegexp("expensive.*"),
Replacement: "$1",
Action: RelabelDrop,
},
},
},

ScrapeConfigs: []*ScrapeConfig{
{
JobName: "prometheus",
Expand Down
6 changes: 6 additions & 0 deletions config/testdata/conf.good.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ rule_files:
- "/absolute/second.rules"
- "my/*.rules"

remote_write:
write_relabel_configs:
- source_labels: [__name__]
regex: expensive.*
action: drop

scrape_configs:
- job_name: prometheus

Expand Down
12 changes: 3 additions & 9 deletions storage/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ import (

// Client allows sending batches of Prometheus samples to an HTTP endpoint.
type Client struct {
index int // Used to differentiate metrics
url config.URL
client *http.Client
timeout time.Duration
}

// NewClient creates a new Client.
func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
func NewClient(conf config.RemoteWriteConfig) (*Client, error) {
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
Expand All @@ -56,8 +55,7 @@ func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
}

return &Client{
index: index,
url: conf.URL,
url: *conf.URL,
client: httputil.NewClient(rt),
timeout: time.Duration(conf.RemoteTimeout),
}, nil
Expand Down Expand Up @@ -117,10 +115,6 @@ func (c *Client) Store(samples model.Samples) error {
}

// Name identifies the client as a generic client.
//
// TODO: This client is going to be the only one soon - then this method
// will simply be removed in the restructuring and the whole "generic" naming
// will be gone for good.
func (c Client) Name() string {
return fmt.Sprintf("generic:%d:%s", c.index, c.url)
return "generic"
}
9 changes: 9 additions & 0 deletions storage/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
influx "github.com/influxdb/influxdb/client"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
"github.com/prometheus/prometheus/storage/remote/graphite"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
Expand All @@ -33,6 +34,7 @@ import (
type Storage struct {
queues []*StorageQueueManager
externalLabels model.LabelSet
relabelConfigs []*config.RelabelConfig
mtx sync.RWMutex
}

Expand All @@ -42,6 +44,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
defer s.mtx.Unlock()

s.externalLabels = conf.GlobalConfig.ExternalLabels
s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs
return nil
}

Expand Down Expand Up @@ -116,8 +119,14 @@ func (s *Storage) Append(smpl *model.Sample) error {
snew.Metric[ln] = lv
}
}
snew.Metric = model.Metric(
relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...))
s.mtx.RUnlock()

if snew.Metric == nil {
return nil
}

for _, q := range s.queues {
q.Append(&snew)
}
Expand Down
39 changes: 23 additions & 16 deletions storage/remote/remote_reloadable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
)

// Storage collects multiple remote storage queues.
type ReloadableStorage struct {
mtx sync.RWMutex
externalLabels model.LabelSet
conf []config.RemoteWriteConfig
conf config.RemoteWriteConfig

queues []*StorageQueueManager
queue *StorageQueueManager
}

// New returns a new remote Storage.
Expand All @@ -42,31 +43,32 @@ func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error {

// TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive.
newQueues := []*StorageQueueManager{}
for i, c := range conf.RemoteWriteConfig {
c, err := NewClient(i, c)
var newQueue *StorageQueueManager

if conf.RemoteWriteConfig.URL != nil {
c, err := NewClient(conf.RemoteWriteConfig)
if err != nil {
return err
}
newQueues = append(newQueues, NewStorageQueueManager(c, nil))
newQueue = NewStorageQueueManager(c, nil)
}

for _, q := range s.queues {
q.Stop()
if s.queue != nil {
s.queue.Stop()
}
s.queues = newQueues
s.externalLabels = conf.GlobalConfig.ExternalLabels
s.queue = newQueue
s.conf = conf.RemoteWriteConfig
for _, q := range s.queues {
q.Start()
s.externalLabels = conf.GlobalConfig.ExternalLabels
if s.queue != nil {
s.queue.Start()
}
return nil
}

// Stop the background processing of the storage queues.
func (s *ReloadableStorage) Stop() {
for _, q := range s.queues {
q.Stop()
if s.queue != nil {
s.queue.Stop()
}
}

Expand All @@ -84,9 +86,14 @@ func (s *ReloadableStorage) Append(smpl *model.Sample) error {
snew.Metric[ln] = lv
}
}
snew.Metric = model.Metric(
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))

for _, q := range s.queues {
q.Append(&snew)
if snew.Metric == nil {
return nil
}
if s.queue != nil {
s.queue.Append(&snew)
}
return nil
}
Expand Down

0 comments on commit 6e8f87a

Please sign in to comment.