Skip to content

Commit

Permalink
New encoding for OpenTSDB tag values (and metric names).
Browse files Browse the repository at this point in the history
Change-Id: I0f4393f638c6e2bb2b2ce14e58e38b49ce456da8
  • Loading branch information
Bjoern Rabenstein committed Mar 21, 2014
1 parent 0a65b69 commit caf47b2
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 29 deletions.
31 changes: 16 additions & 15 deletions storage/remote/opentsdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"regexp"
"time"

"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"

"github.com/prometheus/prometheus/utility"
Expand Down Expand Up @@ -41,26 +43,20 @@ func NewClient(url string, timeout time.Duration) *Client {
// StoreSamplesRequest is used for building a JSON request for storing samples
// via the OpenTSDB.
type StoreSamplesRequest struct {
Metric string `json:"metric"`
Timestamp int64 `json:"timestamp"`
Value clientmodel.SampleValue `json:"value"`
Tags map[string]string `json:"tags"`
}

// escapeTagValue escapes Prometheus label values to valid tag values for
// OpenTSDB.
func escapeTagValue(l clientmodel.LabelValue) string {
return illegalCharsRE.ReplaceAllString(string(l), "_")
Metric TagValue `json:"metric"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Tags map[string]TagValue `json:"tags"`
}

// tagsFromMetric translates Prometheus metric into OpenTSDB tags.
func tagsFromMetric(m clientmodel.Metric) map[string]string {
tags := make(map[string]string, len(m)-1)
func tagsFromMetric(m clientmodel.Metric) map[string]TagValue {
tags := make(map[string]TagValue, len(m)-1)
for l, v := range m {
if l == clientmodel.MetricNameLabel {
continue
}
tags[string(l)] = escapeTagValue(v)
tags[string(l)] = TagValue(v)
}
return tags
}
Expand All @@ -69,11 +65,16 @@ func tagsFromMetric(m clientmodel.Metric) map[string]string {
func (c *Client) Store(samples clientmodel.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples {
metric := escapeTagValue(s.Metric[clientmodel.MetricNameLabel])
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
glog.Warningf("cannot send value %d to OpenTSDB, skipping sample %#v", v, s)
continue
}
metric := TagValue(s.Metric[clientmodel.MetricNameLabel])
reqs = append(reqs, StoreSamplesRequest{
Metric: metric,
Timestamp: s.Timestamp.Unix(),
Value: s.Value,
Value: v,
Tags: tagsFromMetric(s.Metric),
})
}
Expand Down
62 changes: 48 additions & 14 deletions storage/remote/opentsdb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,62 @@
package opentsdb

import (
"bytes"
"encoding/json"
"reflect"
"testing"

clientmodel "github.com/prometheus/client_golang/model"
)

func TestTagsFromMetric(t *testing.T) {
input := clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"test:label": "test:value",
var (
metric = clientmodel.Metric{
clientmodel.MetricNameLabel: "test:metric",
"testlabel": "test:value",
"many_chars": "abc!ABC:012-3!45ö67~89./",
}
expected := map[string]string{
"test:label": "test_value",
"many_chars": "abc_ABC_012-3_45_67_89./",
)

func TestTagsFromMetric(t *testing.T) {
expected := map[string]TagValue{
"testlabel": TagValue("test:value"),
"many_chars": TagValue("abc!ABC:012-3!45ö67~89./"),
}
actual := tagsFromMetric(metric)
if !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, got %#v", expected, actual)
}
}

func TestMarshalStoreSamplesRequest(t *testing.T) {
request := StoreSamplesRequest{
Metric: TagValue("test:metric"),
Timestamp: 4711,
Value: 3.1415,
Tags: tagsFromMetric(metric),
}
expectedJSON := []byte(`{"metric":"test_.metric","timestamp":4711,"value":3.1415,"tags":{"many_chars":"abc_21ABC_.012-3_2145_C3_B667_7E89./","testlabel":"test_.value"}}`)

resultingJSON, err := json.Marshal(request)
if err != nil {
t.Fatalf("Marshal(request) resulted in err: %s", err)
}
if !bytes.Equal(resultingJSON, expectedJSON) {
t.Errorf(
"Marshal(request) => %q, want %q",
resultingJSON, expectedJSON,
)
}
actual := tagsFromMetric(input)
if len(actual) != len(expected) {
t.Fatalf("Expected %v, got %v", expected, actual)

var unmarshaledRequest StoreSamplesRequest
err = json.Unmarshal(expectedJSON, &unmarshaledRequest)
if err != nil {
t.Fatalf("Unarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err)
}
for k, v := range expected {
if v != actual[k] {
t.Fatalf("Expected %s => %s, got %s => %s", k, v, k, actual[k])
}
if !reflect.DeepEqual(unmarshaledRequest, request) {
t.Errorf(
"Unarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v",
unmarshaledRequest, request,
)
}
}
144 changes: 144 additions & 0 deletions storage/remote/opentsdb/tagvalue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package opentsdb

import (
"bytes"
"fmt"

clientmodel "github.com/prometheus/client_golang/model"
)

// TagValue is a clientmodel.LabelValue that implements json.Marshaler and
// json.Unmarshaler. These implementations avoid characters illegal in
// OpenTSDB. See the MarshalJSON for details. TagValue is used for the values of
// OpenTSDB tags as well as for OpenTSDB metric names.
type TagValue clientmodel.LabelValue

// MarshalJSON marshals this TagValue into JSON that only contains runes allowed
// in OpenTSDB. It implements json.Marshaler. The runes allowed in OpenTSDB are
// all single-byte. This function encodes the arbitrary byte sequence found in
// this TagValue in the following way:
//
// - The string that underlies TagValue is scanned byte by byte.
//
// - If a byte represents a legal OpenTSDB rune with the exception of '_', that
// byte is directly copied to the resulting JSON byte slice.
//
// - If '_' is encountered, it is replaced by '__'.
//
// - If ':' is encountered, it is replaced by '_.'.
//
// - All other bytes are replaced by '_' followed by two bytes containing the
// uppercase ASCII representation of their hexadecimal value.
//
// This encoding allows to save arbitrary Go strings in OpenTSDB. That's
// required because Prometheus label values can contain anything, and even
// Prometheus metric names may (and often do) contain ':' (which is disallowed
// in OpenTSDB strings). The encoding uses '_' as an escape character and
// renders a ':' more or less recognizable as '_.'
//
// Examples:
//
// "foo-bar-42" -> "foo-bar-42"
//
// "foo_bar_42" -> "foo__bar__42"
//
// "http://example.org:8080" -> "http_.//example.org_.8080"
//
// "Björn's email: bjoern@soundcloud.com" ->
// "Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"
//
// "日" -> "_E6_97_A5"
func (tv TagValue) MarshalJSON() ([]byte, error) {
length := len(tv)
// Need at least two more bytes than in tv.
result := bytes.NewBuffer(make([]byte, 1, length+2))
result.WriteByte('"')
for i := 0; i < length; i++ {
b := tv[i]
switch {
case (b >= '-' && b <= '9') || // '-', '.', '/', 0-9
(b >= 'A' && b <= 'Z') ||
(b >= 'a' && b <= 'z'):
result.WriteByte(b)
case b == '_':
result.WriteString("__")
case b == ':':
result.WriteString("_.")
default:
result.WriteString(fmt.Sprintf("_%X", b))
}
}
result.WriteByte('"')
return result.Bytes(), nil
}

// UnmarshalJSON unmarshals JSON strings coming from OpenTSDB into Go strings
// by applying the inverse of what is described for the MarshalJSON method.
func (tv *TagValue) UnmarshalJSON(json []byte) error {
escapeLevel := 0 // How many bytes after '_'.
var parsedByte byte

// Might need fewer bytes, but let's avoid realloc.
result := bytes.NewBuffer(make([]byte, 0, len(json)-2))

for i, b := range json {
if i == 0 {
if b != '"' {
return fmt.Errorf("expected '\"', got %q", b)
}
continue
}
if i == len(json)-1 {
if b != '"' {
return fmt.Errorf("expected '\"', got %q", b)
}
break
}
switch escapeLevel {
case 0:
if b == '_' {
escapeLevel = 1
continue
}
result.WriteByte(b)
case 1:
switch {
case b == '_':
result.WriteByte('_')
escapeLevel = 0
case b == '.':
result.WriteByte(':')
escapeLevel = 0
case b >= '0' && b <= '9':
parsedByte = (b - 48) << 4
escapeLevel = 2
case b >= 'A' && b <= 'F': // A-F
parsedByte = (b - 55) << 4
escapeLevel = 2
default:
return fmt.Errorf(
"illegal escape sequence at byte %d (%c)",
i, b,
)
}
case 2:
switch {
case b >= '0' && b <= '9':
parsedByte += b - 48
case b >= 'A' && b <= 'F': // A-F
parsedByte += b - 55
default:
return fmt.Errorf(
"illegal escape sequence at byte %d (%c)",
i, b,
)
}
result.WriteByte(parsedByte)
escapeLevel = 0
default:
panic("unexpected escape level")
}
}
*tv = TagValue(result.String())
return nil
}
51 changes: 51 additions & 0 deletions storage/remote/opentsdb/tagvalue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package opentsdb

import (
"bytes"
"encoding/json"
"testing"
)

var stringtests = []struct {
tv TagValue
json []byte
}{
{TagValue("foo-bar-42"), []byte(`"foo-bar-42"`)},
{TagValue("foo_bar_42"), []byte(`"foo__bar__42"`)},
{TagValue("http://example.org:8080"), []byte(`"http_.//example.org_.8080"`)},
{TagValue("Björn's email: bjoern@soundcloud.com"), []byte(`"Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"`)},
{TagValue("日"), []byte(`"_E6_97_A5"`)},
}

func TestTagValueMarshaling(t *testing.T) {
for i, tt := range stringtests {
json, err := json.Marshal(tt.tv)
if err != nil {
t.Errorf("%d. Marshal(%q) returned err: %s", i, tt.tv, err)
} else {
if !bytes.Equal(json, tt.json) {
t.Errorf(
"%d. Marshal(%q) => %q, want %q",
i, tt.tv, json, tt.json,
)
}
}
}
}

func TestTagValueUnMarshaling(t *testing.T) {
for i, tt := range stringtests {
var tv TagValue
err := json.Unmarshal(tt.json, &tv)
if err != nil {
t.Errorf("%d. Unmarshal(%q, &str) returned err: %s", i, tt.json, err)
} else {
if tv != tt.tv {
t.Errorf(
"%d. Unmarshal(%q, &str) => str==%q, want %q",
i, tt.json, tv, tt.tv,
)
}
}
}
}

0 comments on commit caf47b2

Please sign in to comment.