Skip to content

Commit

Permalink
Rework the way to communicate backpressure (AKA suspended ingestion)
Browse files Browse the repository at this point in the history
This gives up on the idea to communicate throuh the Append() call (by
either not returning as it is now or returning an error as
suggested/explored elsewhere). Here I have added a Throttled() call,
which has the advantage that it can be called before a whole _batch_
of Append()'s. Scrapes will happen completely or not at all. Same for
rule group evaluations. That's a highly desired behavior (as discussed
elsewhere). The code is even simpler now as the whole ingestion buffer
could be removed.

Logging of throttled mode has been streamlined and will create at most
one message per minute.
  • Loading branch information
beorn7 committed Feb 1, 2016
1 parent d9f836e commit ec08c9a
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 105 deletions.
4 changes: 2 additions & 2 deletions cmd/prometheus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ func init() {
)
cfg.fs.IntVar(
&cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024,
"How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.",
"How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.",
)
cfg.fs.DurationVar(
&cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour,
"How long to retain samples in the local storage.",
)
cfg.fs.IntVar(
&cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024,
"How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.",
"How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.",
)
cfg.fs.DurationVar(
&cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute,
Expand Down
15 changes: 8 additions & 7 deletions retrieval/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package retrieval

import (
"time"

"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/config"
Expand All @@ -26,14 +24,13 @@ type nopAppender struct{}
func (a nopAppender) Append(*model.Sample) {
}

type slowAppender struct{}

func (a slowAppender) Append(*model.Sample) {
time.Sleep(time.Millisecond)
func (a nopAppender) NeedsThrottling() bool {
return false
}

type collectResultAppender struct {
result model.Samples
result model.Samples
throttled bool
}

func (a *collectResultAppender) Append(s *model.Sample) {
Expand All @@ -45,6 +42,10 @@ func (a *collectResultAppender) Append(s *model.Sample) {
a.result = append(a.result, s)
}

func (a *collectResultAppender) NeedsThrottling() bool {
return a.throttled
}

// fakeTargetProvider implements a TargetProvider and allows manual injection
// of TargetGroups through the update channel.
type fakeTargetProvider struct {
Expand Down
89 changes: 31 additions & 58 deletions retrieval/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
)

var (
errIngestChannelFull = errors.New("ingestion channel full")
errSkippedScrape = errors.New("scrape skipped due to throttled ingestion")

targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Expand All @@ -59,10 +59,19 @@ var (
},
[]string{interval},
)
targetSkippedScrapes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "target_skipped_scrapes_total",
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
},
[]string{interval},
)
)

func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetSkippedScrapes)
}

// TargetHealth describes the health state of a target.
Expand Down Expand Up @@ -151,8 +160,6 @@ type Target struct {
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Channel to buffer ingested samples.
ingestedSamples chan model.Vector

// Mutex protects the members below.
sync.RWMutex
Expand All @@ -166,8 +173,6 @@ type Target struct {
baseLabels model.LabelSet
// Internal labels, such as scheme.
internalLabels model.LabelSet
// What is the deadline for the HTTP or HTTPS against this endpoint.
deadline time.Duration
// The time between two scrapes.
scrapeInterval time.Duration
// Whether the target's labels have precedence over the base labels
Expand Down Expand Up @@ -237,7 +242,6 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L
t.url.RawQuery = params.Encode()

t.scrapeInterval = time.Duration(cfg.ScrapeInterval)
t.deadline = time.Duration(cfg.ScrapeTimeout)

t.honorLabels = cfg.HonorLabels
t.metaLabels = metaLabels
Expand Down Expand Up @@ -361,6 +365,11 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
if sampleAppender.NeedsThrottling() {
targetSkippedScrapes.WithLabelValues(intervalStr).Inc()
t.status.setLastError(errSkippedScrape)
continue
}
t.scrape(sampleAppender)
}
}
Expand All @@ -377,26 +386,6 @@ func (t *Target) StopScraper() {
log.Debugf("Scraper for target %v stopped.", t)
}

func (t *Target) ingest(s model.Vector) error {
t.RLock()
deadline := t.deadline
t.RUnlock()
// Since the regular case is that ingestedSamples is ready to receive,
// first try without setting a timeout so that we don't need to allocate
// a timer most of the time.
select {
case t.ingestedSamples <- s:
return nil
default:
select {
case t.ingestedSamples <- s:
return nil
case <-time.After(deadline / 10):
return errIngestChannelFull
}
}
}

const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`

func (t *Target) scrape(appender storage.SampleAppender) (err error) {
Expand All @@ -414,20 +403,20 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
// so the relabeling rules are applied to the correct label set.
if len(t.metricRelabelConfigs) > 0 {
appender = relabelAppender{
app: appender,
relabelings: t.metricRelabelConfigs,
SampleAppender: appender,
relabelings: t.metricRelabelConfigs,
}
}

if t.honorLabels {
appender = honorLabelsAppender{
app: appender,
labels: baseLabels,
SampleAppender: appender,
labels: baseLabels,
}
} else {
appender = ruleLabelsAppender{
app: appender,
labels: baseLabels,
SampleAppender: appender,
labels: baseLabels,
}
}

Expand Down Expand Up @@ -460,27 +449,11 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
},
}

t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap)

go func() {
for {
// TODO(fabxc): Change the SampleAppender interface to return an error
// so we can proceed based on the status and don't leak goroutines trying
// to append a single sample after dropping all the other ones.
//
// This will also allow use to reuse this vector and save allocations.
var samples model.Vector
if err = sdec.Decode(&samples); err != nil {
break
}
if err = t.ingest(samples); err != nil {
break
}
var samples model.Vector
for {
if err = sdec.Decode(&samples); err != nil {
break
}
close(t.ingestedSamples)
}()

for samples := range t.ingestedSamples {
for _, s := range samples {
appender.Append(s)
}
Expand All @@ -495,7 +468,7 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
app storage.SampleAppender
storage.SampleAppender
labels model.LabelSet
}

Expand All @@ -507,11 +480,11 @@ func (app ruleLabelsAppender) Append(s *model.Sample) {
s.Metric[ln] = lv
}

app.app.Append(s)
app.SampleAppender.Append(s)
}

type honorLabelsAppender struct {
app storage.SampleAppender
storage.SampleAppender
labels model.LabelSet
}

Expand All @@ -525,13 +498,13 @@ func (app honorLabelsAppender) Append(s *model.Sample) {
}
}

app.app.Append(s)
app.SampleAppender.Append(s)
}

// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
app storage.SampleAppender
storage.SampleAppender
relabelings []*config.RelabelConfig
}

Expand All @@ -547,7 +520,7 @@ func (app relabelAppender) Append(s *model.Sample) {
}
s.Metric = model.Metric(labels)

app.app.Append(s)
app.SampleAppender.Append(s)
}

// URL returns a copy of the target's URL.
Expand Down
21 changes: 13 additions & 8 deletions retrieval/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
}
}

func TestTargetScrapeWithFullChannel(t *testing.T) {
func TestTargetScrapeWithThrottledStorage(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
for i := 0; i < 2*ingestedSamplesCap; i++ {
for i := 0; i < 10; i++ {
w.Write([]byte(
fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i),
))
Expand All @@ -155,15 +155,21 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
defer server.Close()

testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"})
// Affects full channel but not HTTP fetch
testTarget.deadline = 0

testTarget.scrape(slowAppender{})
go testTarget.RunScraper(&collectResultAppender{throttled: true})

// Enough time for a scrape to happen.
time.Sleep(20 * time.Millisecond)

testTarget.StopScraper()
// Wait for it to take effect.
time.Sleep(20 * time.Millisecond)

if testTarget.status.Health() != HealthBad {
t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health())
}
if testTarget.status.LastError() != errIngestChannelFull {
t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError())
if testTarget.status.LastError() != errSkippedScrape {
t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError())
}
}

Expand Down Expand Up @@ -450,7 +456,6 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.La
Host: strings.TrimLeft(targetURL, "http://"),
Path: "/metrics",
},
deadline: deadline,
status: &TargetStatus{},
scrapeInterval: 1 * time.Millisecond,
httpClient: c,
Expand Down
1 change: 1 addition & 0 deletions retrieval/targetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (tm *TargetManager) Run() {
})

tm.running = true
log.Info("Target manager started.")
}

// handleUpdates receives target group updates and handles them in the
Expand Down
18 changes: 17 additions & 1 deletion rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,19 @@ var (
iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Name: "evaluator_duration_seconds",
Help: "The duration for all evaluations to execute.",
Help: "The duration of rule group evaluations.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
})
iterationsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "evaluator_iterations_skipped_total",
Help: "The total number of rule group evaluations skipped due to throttled metric storage.",
})
iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "evaluator_iterations_total",
Help: "The total number of scheduled rule group evaluations, whether skipped or executed.",
})
)

func init() {
Expand All @@ -78,6 +88,7 @@ func init() {
evalFailures.WithLabelValues(string(ruleTypeRecording))

prometheus.MustRegister(iterationDuration)
prometheus.MustRegister(iterationsSkipped)
prometheus.MustRegister(evalFailures)
prometheus.MustRegister(evalDuration)
}
Expand Down Expand Up @@ -133,6 +144,11 @@ func (g *Group) run() {
}

iter := func() {
iterationsScheduled.Inc()
if g.opts.SampleAppender.NeedsThrottling() {
iterationsSkipped.Inc()
return
}
start := time.Now()
g.eval()

Expand Down
3 changes: 3 additions & 0 deletions storage/local/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Storage interface {
// from the provided Sample as those labels are considered equivalent to
// a label not present at all.
Append(*model.Sample)
// NeedsThrottling returns true if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence.
NeedsThrottling() bool
// NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query.
NewPreloader() Preloader
Expand Down
Loading

0 comments on commit ec08c9a

Please sign in to comment.