Skip to content

Commit

Permalink
Retrieval: Do not buffer the samples if no sample limit configured
Browse files Browse the repository at this point in the history
Also, simplify and streamline the code a bit.
  • Loading branch information
beorn7 committed Jan 7, 2017
1 parent 3044828 commit 3610331
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 182 deletions.
173 changes: 95 additions & 78 deletions retrieval/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type scrapePool struct {
loops map[uint64]loop

// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender, uint) loop
newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
}

func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit)
newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
)
wg.Add(1)

Expand Down Expand Up @@ -240,7 +240,7 @@ func (sp *scrapePool) sync(targets []*Target) {

if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit)
l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)

sp.targets[hash] = t
sp.loops[hash] = l
Expand Down Expand Up @@ -272,41 +272,6 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait()
}

// sampleMutator returns a function that'll take an appender and return an appender for mutated samples.
func (sp *scrapePool) sampleMutator(target *Target) func(storage.SampleAppender) storage.SampleAppender {
return func(app storage.SampleAppender) storage.SampleAppender {
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: mrc,
}
}

if sp.config.HonorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: target.Labels(),
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: target.Labels(),
}
}
return app
}
}

// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
return ruleLabelsAppender{
SampleAppender: sp.appender,
labels: target.Labels(),
}
}

// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
scrape(ctx context.Context, ts time.Time) (model.Samples, error)
Expand Down Expand Up @@ -376,26 +341,32 @@ type scrapeLoop struct {

// Where samples are ultimately sent.
appender storage.SampleAppender
// Applies relabel rules and label handling.
mutator func(storage.SampleAppender) storage.SampleAppender
// For sending up and scrape_*.
reportAppender storage.SampleAppender
// Limit on number of samples that will be accepted.
sampleLimit uint

targetLabels model.LabelSet
metricRelabelConfigs []*config.RelabelConfig
honorLabels bool
sampleLimit uint

done chan struct{}
ctx context.Context
cancel func()
}

func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop {
func newScrapeLoop(
ctx context.Context,
sc scraper,
appender storage.SampleAppender,
targetLabels model.LabelSet,
config *config.ScrapeConfig,
) loop {
sl := &scrapeLoop{
scraper: sc,
appender: app,
mutator: mut,
reportAppender: reportApp,
sampleLimit: sampleLimit,
done: make(chan struct{}),
scraper: sc,
appender: appender,
targetLabels: targetLabels,
metricRelabelConfigs: config.MetricRelabelConfigs,
honorLabels: config.HonorLabels,
sampleLimit: config.SampleLimit,
done: make(chan struct{}),
}
sl.ctx, sl.cancel = context.WithCancel(ctx)

Expand Down Expand Up @@ -426,8 +397,9 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {

if !sl.appender.NeedsThrottling() {
var (
start = time.Now()
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
start = time.Now()
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
numPostRelabelSamples = 0
)

// Only record after the first scrape.
Expand All @@ -438,11 +410,13 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
}

samples, err := sl.scraper.scrape(scrapeCtx, start)
err = sl.processScrapeResult(samples, err, start)
if err == nil {
numPostRelabelSamples, err = sl.append(samples)
}
if err != nil && errc != nil {
errc <- err
}

sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
last = start
} else {
targetSkippedScrapes.WithLabelValues(interval.String()).Inc()
Expand All @@ -461,36 +435,73 @@ func (sl *scrapeLoop) stop() {
<-sl.done
}

func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error, start time.Time) error {
// Collect samples post-relabelling and label handling in a buffer.
buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
if scrapeErr == nil {
app := sl.mutator(buf)
for _, sample := range samples {
app.Append(sample)
// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend
// appender and an innermost countingAppender that counts the samples actually
// appended in the end.
func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) {
// Innermost appender is a countingAppender to count how many samples
// are left in the end.
countingAppender := &countingAppender{
SampleAppender: app,
}
app = countingAppender

// The relabelAppender has to be inside the label-modifying appenders so
// the relabeling rules are applied to the correct label set.
if len(sl.metricRelabelConfigs) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: sl.metricRelabelConfigs,
}
}

if sl.sampleLimit > 0 && uint(len(buf.buffer)) > sl.sampleLimit {
scrapeErr = fmt.Errorf("%d samples exceeded limit of %d", len(buf.buffer), sl.sampleLimit)
targetScrapeSampleLimit.Inc()
} else {
// Send samples to storage.
sl.append(buf.buffer)
if sl.honorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: sl.targetLabels,
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: sl.targetLabels,
}
}

sl.report(start, time.Since(start), len(samples), len(buf.buffer), scrapeErr)
return scrapeErr
return app, countingAppender
}

func (sl *scrapeLoop) append(samples model.Samples) {
func (sl *scrapeLoop) append(samples model.Samples) (int, error) {
var (
numOutOfOrder = 0
numDuplicates = 0
app = sl.appender
countingApp *countingAppender
)

if sl.sampleLimit > 0 {
// We need to check for the sample limit, so append everything
// to a wrapped bufferAppender first. Then point samples to the
// result.
bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
var wrappedBufApp storage.SampleAppender
wrappedBufApp, countingApp = sl.wrapAppender(bufApp)
for _, s := range samples {
// Ignore errors as bufferedAppender always succeds.
wrappedBufApp.Append(s)
}
samples = bufApp.buffer
if uint(countingApp.count) > sl.sampleLimit {
targetScrapeSampleLimit.Inc()
return countingApp.count, fmt.Errorf(
"%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit,
)
}
} else {
// No need to check for sample limit. Wrap sl.appender directly.
app, countingApp = sl.wrapAppender(sl.appender)
}

for _, s := range samples {
if err := sl.appender.Append(s); err != nil {
if err := app.Append(s); err != nil {
switch err {
case local.ErrOutOfOrderSample:
numOutOfOrder++
Expand All @@ -509,6 +520,7 @@ func (sl *scrapeLoop) append(samples model.Samples) {
if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
}
return countingApp.count, nil
}

func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) {
Expand Down Expand Up @@ -550,16 +562,21 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
Value: model.SampleValue(postRelabelSamples),
}

if err := sl.reportAppender.Append(healthSample); err != nil {
reportAppender := ruleLabelsAppender{
SampleAppender: sl.appender,
labels: sl.targetLabels,
}

if err := reportAppender.Append(healthSample); err != nil {
log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded")
}
if err := sl.reportAppender.Append(durationSample); err != nil {
if err := reportAppender.Append(durationSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded")
}
if err := sl.reportAppender.Append(countSample); err != nil {
if err := reportAppender.Append(countSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded")
}
if err := sl.reportAppender.Append(postRelabelSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabelling sample discarded")
if err := reportAppender.Append(postRelabelSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded")
}
}
Loading

0 comments on commit 3610331

Please sign in to comment.