Skip to content

Commit

Permalink
Cleanup internal target data
Browse files Browse the repository at this point in the history
  • Loading branch information
fabxc committed Feb 13, 2016
1 parent e26e4b6 commit 65eba08
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 141 deletions.
259 changes: 141 additions & 118 deletions retrieval/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,32 +119,37 @@ type TargetStatus struct {
func (ts *TargetStatus) LastError() error {
ts.mu.RLock()
defer ts.mu.RUnlock()

return ts.lastError
}

// LastScrape returns the time of the last scrape.
func (ts *TargetStatus) LastScrape() time.Time {
ts.mu.RLock()
defer ts.mu.RUnlock()

return ts.lastScrape
}

// Health returns the last known health state of the target.
func (ts *TargetStatus) Health() TargetHealth {
ts.mu.RLock()
defer ts.mu.RUnlock()

return ts.health
}

func (ts *TargetStatus) setLastScrape(t time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()

ts.lastScrape = t
}

func (ts *TargetStatus) setLastError(err error) {
ts.mu.Lock()
defer ts.mu.Unlock()

if err == nil {
ts.health = HealthGood
} else {
Expand All @@ -164,37 +169,26 @@ type Target struct {

// Mutex protects the members below.
sync.RWMutex
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// url is the URL to be scraped. Its host is immutable.
url *url.URL

scrapeConfig *config.ScrapeConfig

// Labels before any processing.
metaLabels model.LabelSet
// Any base labels that are added to this target and its metrics.
baseLabels model.LabelSet
// Internal labels, such as scheme.
internalLabels model.LabelSet
// The time between two scrapes.
scrapeInterval time.Duration
// Whether the target's labels have precedence over the base labels
// assigned by the scraping instance.
honorLabels bool
// Metric relabel configuration.
metricRelabelConfigs []*config.RelabelConfig
labels model.LabelSet

// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
}

// NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) (*Target, error) {
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) {
t := &Target{
url: &url.URL{
Scheme: string(baseLabels[model.SchemeLabel]),
Host: string(baseLabels[model.AddressLabel]),
},
status: &TargetStatus{},
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
err := t.Update(cfg, baseLabels, metaLabels)
err := t.Update(cfg, labels, metaLabels)
return t, err
}

Expand All @@ -207,55 +201,21 @@ func (t *Target) Status() *TargetStatus {
// it belongs to.
func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) error {
t.Lock()
defer t.Unlock()

httpClient, err := newHTTPClient(cfg)
if err != nil {
return fmt.Errorf("cannot create HTTP client: %v", err)
}
t.httpClient = httpClient

t.url.Scheme = string(baseLabels[model.SchemeLabel])
t.url.Path = string(baseLabels[model.MetricsPathLabel])

t.internalLabels = model.LabelSet{}
t.internalLabels[model.SchemeLabel] = baseLabels[model.SchemeLabel]
t.internalLabels[model.MetricsPathLabel] = baseLabels[model.MetricsPathLabel]
t.internalLabels[model.AddressLabel] = model.LabelValue(t.url.Host)
t.scrapeConfig = cfg
t.labels = baseLabels
t.metaLabels = metaLabels

params := url.Values{}
t.Unlock()

for k, v := range cfg.Params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for k, v := range baseLabels {
if strings.HasPrefix(string(k), model.ParamLabelPrefix) {
if len(params[string(k[len(model.ParamLabelPrefix):])]) > 0 {
params[string(k[len(model.ParamLabelPrefix):])][0] = string(v)
} else {
params[string(k[len(model.ParamLabelPrefix):])] = []string{string(v)}
}
t.internalLabels[model.ParamLabelPrefix+k[len(model.ParamLabelPrefix):]] = v
}
httpClient, err := t.client()
if err != nil {
return fmt.Errorf("cannot create HTTP client: %s", err)
}
t.url.RawQuery = params.Encode()

t.scrapeInterval = time.Duration(cfg.ScrapeInterval)
t.Lock()
t.httpClient = httpClient
t.Unlock()

t.honorLabels = cfg.HonorLabels
t.metaLabels = metaLabels
t.baseLabels = model.LabelSet{}
// All remaining internal labels will not be part of the label set.
for name, val := range baseLabels {
if !strings.HasPrefix(string(name), model.ReservedLabelPrefix) {
t.baseLabels[name] = val
}
}
if _, ok := t.baseLabels[model.InstanceLabel]; !ok {
t.baseLabels[model.InstanceLabel] = model.LabelValue(t.InstanceIdentifier())
}
t.metricRelabelConfigs = cfg.MetricRelabelConfigs
return nil
}

Expand Down Expand Up @@ -304,16 +264,105 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
}

func (t *Target) String() string {
return t.url.Host
return t.host()
}

func (t *Target) client() (*http.Client, error) {
t.RLock()
defer t.RUnlock()

return newHTTPClient(t.scrapeConfig)
}

func (t *Target) interval() time.Duration {
t.RLock()
defer t.RUnlock()

return time.Duration(t.scrapeConfig.ScrapeInterval)
}

func (t *Target) timeout() time.Duration {
t.RLock()
defer t.RUnlock()

return time.Duration(t.scrapeConfig.ScrapeTimeout)
}

func (t *Target) scheme() string {
t.RLock()
defer t.RUnlock()

return string(t.labels[model.SchemeLabel])
}

func (t *Target) host() string {
t.RLock()
defer t.RUnlock()

return string(t.labels[model.AddressLabel])
}

func (t *Target) path() string {
t.RLock()
defer t.RUnlock()

return string(t.labels[model.MetricsPathLabel])
}

// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
t.RLock()
defer t.RUnlock()

params := url.Values{}

for k, v := range t.scrapeConfig.Params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for k, v := range t.labels {
if !strings.HasPrefix(string(k), model.ParamLabelPrefix) {
continue
}
ks := string(k[len(model.ParamLabelPrefix):])

if len(params[ks]) > 0 {
params[ks][0] = string(v)
} else {
params[ks] = []string{string(v)}
}
}

return &url.URL{
Scheme: string(t.labels[model.SchemeLabel]),
Host: string(t.labels[model.AddressLabel]),
Path: string(t.labels[model.MetricsPathLabel]),
RawQuery: params.Encode(),
}
}

// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.host()
}

func (t *Target) fullLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()

lset := t.labels.Clone()

if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}
return lset
}

// RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped)

t.RLock()
lastScrapeInterval := t.scrapeInterval
t.RUnlock()
lastScrapeInterval := t.interval()

log.Debugf("Starting scraper for target %v...", t)

Expand Down Expand Up @@ -353,15 +402,13 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {

intervalStr := lastScrapeInterval.String()

t.RLock()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
if lastScrapeInterval != t.scrapeInterval {
if iv := t.interval(); iv != lastScrapeInterval {
ticker.Stop()
ticker = time.NewTicker(t.scrapeInterval)
lastScrapeInterval = t.scrapeInterval
ticker = time.NewTicker(iv)
lastScrapeInterval = iv
}
t.RUnlock()

targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
Expand Down Expand Up @@ -389,27 +436,28 @@ func (t *Target) StopScraper() {

const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`

func (t *Target) scrape(appender storage.SampleAppender) (err error) {
start := time.Now()
baseLabels := t.BaseLabels()

func (t *Target) scrape(appender storage.SampleAppender) error {
var (
err error
start = time.Now()
baseLabels = t.BaseLabels()
)
defer func(appender storage.SampleAppender) {
t.status.setLastError(err)
recordScrapeHealth(appender, start, baseLabels, t.status.Health(), time.Since(start))
}(appender)

t.RLock()

// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if len(t.metricRelabelConfigs) > 0 {
if len(t.scrapeConfig.MetricRelabelConfigs) > 0 {
appender = relabelAppender{
SampleAppender: appender,
relabelings: t.metricRelabelConfigs,
relabelings: t.scrapeConfig.MetricRelabelConfigs,
}
}

if t.honorLabels {
if t.scrapeConfig.HonorLabels {
appender = honorLabelsAppender{
SampleAppender: appender,
labels: baseLabels,
Expand All @@ -422,7 +470,6 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
}

httpClient := t.httpClient

t.RUnlock()

req, err := http.NewRequest("GET", t.URL().String(), nil)
Expand Down Expand Up @@ -538,55 +585,31 @@ func (app relabelAppender) Append(s *model.Sample) error {
return app.SampleAppender.Append(s)
}

// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
// BaseLabels returns a copy of the target's base labels.
func (t *Target) BaseLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()

u := &url.URL{}
*u = *t.url
return u
}

// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.url.Host
}

// fullLabels returns the base labels plus internal labels defining the target.
func (t *Target) fullLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(model.LabelSet, len(t.baseLabels)+len(t.internalLabels))
for ln, lv := range t.baseLabels {
lset[ln] = lv
}
for k, v := range t.internalLabels {
lset[k] = v
lset := make(model.LabelSet, len(t.labels))
for ln, lv := range t.labels {
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
lset[ln] = lv
}
}
return lset
}

// BaseLabels returns a copy of the target's base labels.
func (t *Target) BaseLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(model.LabelSet, len(t.baseLabels))
for ln, lv := range t.baseLabels {
lset[ln] = lv
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}

return lset
}

// MetaLabels returns a copy of the target's labels before any processing.
func (t *Target) MetaLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(model.LabelSet, len(t.metaLabels))
for ln, lv := range t.metaLabels {
lset[ln] = lv
}
return lset

return t.metaLabels.Clone()
}

func recordScrapeHealth(
Expand Down
Loading

0 comments on commit 65eba08

Please sign in to comment.