Skip to content

Commit

Permalink
Alternative approach: bundle metadata in TimeSeries protobuf
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
  • Loading branch information
tpaschalis committed Apr 24, 2023
1 parent 49cccc6 commit 0093563
Show file tree
Hide file tree
Showing 9 changed files with 604 additions and 159 deletions.
5 changes: 1 addition & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ type RemoteWriteConfig struct {
Name string `yaml:"name,omitempty"`
SendExemplars bool `yaml:"send_exemplars,omitempty"`
SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"`
SendWALMetadata bool `yaml:"send_metadata,omitempty"` // TODO(@tpaschalis) Adding an extra field to enable us to remove the `metadata_config` struct in the future.

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
Expand Down Expand Up @@ -965,10 +966,6 @@ type MetadataConfig struct {
SendInterval model.Duration `yaml:"send_interval"`
// Maximum number of samples per send.
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
// SendFromWAL controls whether we send metadata from the WAL
// TODO (@tpaschalis) Maybe this should also be the feature flag that
// disables the current MetadataWatcher?
SendFromWAL bool `yaml:"send_from_wal,omitempty"`
}

// RemoteReadConfig is the configuration for reading from remote storage.
Expand Down
547 changes: 463 additions & 84 deletions prompb/types.pb.go

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions prompb/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ message MetricMetadata {
string unit = 5;
}

message Metadata {
// The protobuf style guide recommends setting enum values as uppercase.
// Having them lowercase would save us an instruction every time we convert
// from textparse.MetricType.
// https://protobuf.dev/programming-guides/style/#enums
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
MetricType type = 1;
string help = 2;
string unit = 3;
}

message Sample {
double value = 1;
// timestamp is in ms format, see model/timestamp/timestamp.go for
Expand Down Expand Up @@ -127,6 +147,7 @@ message TimeSeries {
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
repeated Metadata metadatas = 5 [(gogoproto.nullable) = false];
}

message Label {
Expand Down
24 changes: 24 additions & 0 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -624,6 +625,14 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
}
}

func metadataProtoToMetadata(mp prompb.Metadata) metadata.Metadata {
return metadata.Metadata{
Type: metricTypeFromProtoEquivalent(mp.Type),
Unit: mp.Unit,
Help: mp.Help,
}
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an integer histogram and not a float histogram, or it panics.
Expand Down Expand Up @@ -799,6 +808,21 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M
return prompb.MetricMetadata_MetricType(v)
}

func metricTypeToProtoEquivalent(t textparse.MetricType) prompb.Metadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.Metadata_MetricType_value[mt]
if !ok {
return prompb.Metadata_UNKNOWN
}

return prompb.Metadata_MetricType(v)
}

func metricTypeFromProtoEquivalent(t prompb.Metadata_MetricType) textparse.MetricType {
mt := strings.ToLower(t.String())
return textparse.MetricType(mt) // TODO(@tpaschalis) a better way for this?
}

// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
Expand Down
67 changes: 35 additions & 32 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ type QueueManager struct {
relabelConfigs []*relabel.Config
sendExemplars bool
sendNativeHistograms bool
sendMetadata bool
watcher *wlog.Watcher
metadataWatcher *MetadataWatcher

Expand Down Expand Up @@ -459,6 +460,7 @@ func NewQueueManager(
sm ReadyScrapeManager,
enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool,
enableMetadataRemoteWrite bool,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -481,6 +483,7 @@ func NewQueueManager(
storeClient: client,
sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite,
sendMetadata: enableMetadataRemoteWrite,

seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
Expand All @@ -500,7 +503,7 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp,
}

t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite)
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, enableMetadataRemoteWrite)
if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
}
Expand Down Expand Up @@ -774,7 +777,7 @@ outer:
}

func (t *QueueManager) AppendWALMetadata(ms []record.RefMetadata) bool {
if !t.mcfg.SendFromWAL {
if !t.sendMetadata {
return true
}

Expand All @@ -784,16 +787,16 @@ outer:
lbls, ok := t.seriesLabels[m.Ref]
if !ok {
t.metrics.droppedMetadataTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc.
// Track dropped metadata in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[m.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", m.Ref)
level.Info(t.logger).Log("msg", "Dropped metadata for series that was not explicitly dropped via relabelling", "ref", m.Ref)
}
t.seriesMtx.Unlock()
continue
}
t.seriesMtx.Unlock()
// This will only loop if the queues are being resharded.

backoff := t.cfg.MinBackoff
for {
select {
Expand All @@ -802,12 +805,12 @@ outer:
default:
}
if t.shards.enqueue(m.Ref, timeSeries{
seriesLabels: lbls,
sType: tMetadata,
seriesLabels: lbls, // TODO (@tpaschalis) We take the labels here so we can refer to the metric's name on populateTimeSeries. There's probably a better way to do that.
metadata: &metadata.Metadata{
Type: record.ToTextparseMetricType(m.Type),
Help: m.Help,
Unit: m.Unit,
Type: record.ToTextparseMetricType(m.Type),
},
}) {
continue outer
Expand Down Expand Up @@ -1180,6 +1183,7 @@ func (s *shards) start(n int) {
s.samplesDroppedOnHardShutdown.Store(0)
s.exemplarsDroppedOnHardShutdown.Store(0)
s.histogramsDroppedOnHardShutdown.Store(0)
s.metadataDroppedOnHardShutdown.Store(0)
for i := 0; i < n; i++ {
go s.runShard(hardShutdownCtx, i, newQueues[i])
}
Expand Down Expand Up @@ -1416,6 +1420,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pBuf = proto.NewBuffer(nil)
buf []byte
)
// TODO(@tpaschalis) Should we also raise the max if we have WAL metadata?
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
}
Expand All @@ -1429,11 +1434,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
}

// TODO(@tpaschalis) Since metadata might appear infrequently, I'm not sure
// whether it's cheaper to pre-allocate, or initialize the slice as empty
// and append whenever metadata is encountered.
pendingMetadata := make([]prompb.MetricMetadata, s.qm.mcfg.MaxSamplesPerSend)

timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
if !timer.Stop() {
Expand Down Expand Up @@ -1471,31 +1471,30 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
if !ok {
return
}
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata)
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData)
queue.ReturnForReuse(batch)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
n := nPendingSamples + nPendingExemplars + nPendingHistograms + nPendingMetadata
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)

stop()
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))

case <-timer.C:
batch := queue.Batch()
if len(batch) > 0 {
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData, pendingMetadata)
n := nPendingSamples + nPendingExemplars + nPendingHistograms
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := s.populateTimeSeries(batch, pendingData)
n := nPendingSamples + nPendingExemplars + nPendingHistograms + nPendingMetadata
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum)
s.sendSamples(ctx, pendingData[:n], pendingMetadata[:nPendingMetadata], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms, "metadata", nPendingMetadata)
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, pBuf, &buf)
}
queue.ReturnForReuse(batch)
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
}
}
}

func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, pendingMetadata []prompb.MetricMetadata) (int, int, int, int) {
func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch {
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
Expand All @@ -1505,6 +1504,9 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
if s.qm.sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
}
if s.qm.sendMetadata {
pendingData[nPending].Metadatas = pendingData[nPending].Metadatas[:0]
}

// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
Expand All @@ -1531,19 +1533,20 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, FloatHistogramToHistogramProto(d.timestamp, d.floatHistogram))
nPendingHistograms++
case tMetadata:
pendingMetadata[nPendingMetadata].MetricFamilyName = d.seriesLabels.Get(labels.MetricName)
pendingMetadata[nPendingMetadata].Type = metricTypeToMetricTypeProto(d.metadata.Type)
pendingMetadata[nPendingMetadata].Help = d.metadata.Help
pendingMetadata[nPendingMetadata].Unit = d.metadata.Unit
pendingData[nPending].Metadatas = append(pendingData[nPending].Metadatas, prompb.Metadata{
Type: metricTypeToProtoEquivalent(d.metadata.Type),
Help: d.metadata.Help,
Unit: d.metadata.Unit,
})
nPendingMetadata++
}
}
return nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata
}

func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) {
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) {
begin := time.Now()
err := s.sendSamplesWithBackoff(ctx, samples, metadata, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf)
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf)
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount))
Expand All @@ -1570,12 +1573,9 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, m
}

// sendSamples to the remote storage with backoff for recoverable errors.
// TODO(@tpaschalis) If we're going to reuse this method for metadata as well,
// we need a better name.
// TODO(@tpaschalis) Add metadata-specific metrics and attributes.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, metadata, pBuf, *buf)
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
Expand Down Expand Up @@ -1606,6 +1606,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if histogramCount > 0 {
span.SetAttributes(attribute.Int("histograms", histogramCount))
}
if metadataCount > 0 {
span.SetAttributes(attribute.Int("metadata", metadataCount))
}

begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
Expand Down
Loading

0 comments on commit 0093563

Please sign in to comment.