Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support pickle to graphite #528

Merged
merged 5 commits into from
Mar 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 105 additions & 37 deletions backends/graphite.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
* This backend supports the following config options:
*
* graphiteHost: Hostname of graphite server.
* graphitePort: Port to contact graphite server at.
* graphitePort: Port for the graphite text collector. Defaults to 2003.
* graphitePicklePort: Port for the graphite pickle collector. Defaults to 2004.
* graphiteProtocol: Either 'text' or 'pickle'. Defaults to 'text'.
*
* If graphiteHost is not specified, metrics are processed but discarded.
*/

var net = require('net');
Expand All @@ -23,6 +27,8 @@ var debug;
var flushInterval;
var graphiteHost;
var graphitePort;
var graphitePicklePort;
var graphiteProtocol;
var flush_counts;

// prefix configuration
Expand All @@ -45,48 +51,104 @@ var setsNamespace = [];

var graphiteStats = {};

var post_stats = function graphite_post_stats(statString) {
var post_stats = function graphite_post_stats(stats) {
var last_flush = graphiteStats.last_flush || 0;
var last_exception = graphiteStats.last_exception || 0;
var flush_time = graphiteStats.flush_time || 0;
var flush_length = graphiteStats.flush_length || 0;

if (graphiteHost) {
try {
var graphite = net.createConnection(graphitePort, graphiteHost);
var port = graphiteProtocol == 'pickle' ? graphitePicklePort : graphitePort;
var graphite = net.createConnection(port, graphiteHost);
graphite.addListener('error', function(connectionException){
if (debug) {
l.log(connectionException);
}
});
graphite.on('connect', function() {
var ts = Math.round(new Date().getTime() / 1000);
var ts_suffix = ' ' + ts + "\n";
var ts = Math.round(Date.now() / 1000);
var namespace = globalNamespace.concat(prefixStats).join(".");
statString += namespace + '.graphiteStats.last_exception' + globalSuffix + last_exception + ts_suffix;
statString += namespace + '.graphiteStats.last_flush' + globalSuffix + last_flush + ts_suffix;
statString += namespace + '.graphiteStats.flush_time' + globalSuffix + flush_time + ts_suffix;
statString += namespace + '.graphiteStats.flush_length' + globalSuffix + flush_length + ts_suffix;
stats.add(namespace + '.graphiteStats.last_exception' + globalSuffix, last_exception, ts);
stats.add(namespace + '.graphiteStats.last_flush' + globalSuffix, last_flush , ts);
stats.add(namespace + '.graphiteStats.flush_time' + globalSuffix, flush_time , ts);
stats.add(namespace + '.graphiteStats.flush_length' + globalSuffix, flush_length , ts);
var stats_payload = graphiteProtocol == 'pickle' ? stats.toPickle() : stats.toText();

var starttime = Date.now();
this.write(statString);
this.write(stats_payload);
this.end();

graphiteStats.flush_time = (Date.now() - starttime);
graphiteStats.flush_length = statString.length;
graphiteStats.last_flush = Math.round(new Date().getTime() / 1000);
graphiteStats.flush_length = stats_payload.length;
graphiteStats.last_flush = Math.round(Date.now() / 1000);
});
} catch(e){
if (debug) {
l.log(e);
}
graphiteStats.last_exception = Math.round(new Date().getTime() / 1000);
graphiteStats.last_exception = Math.round(Date.now() / 1000);
}
}
};

// Minimally necessary pickle opcodes.
var MARK = '(',
STOP = '.',
LONG = 'L',
STRING = 'S',
APPEND = 'a',
LIST = 'l',
TUPLE = 't';

// A single measurement for sending to graphite.
function Metric(key, value, ts) {
var m = this;
this.key = key;
this.value = value;
this.ts = ts;

// return a string representation of this metric appropriate
// for sending to the graphite collector. does not include
// a trailing newline.
this.toText = function() {
return m.key + " " + m.value + " " + m.ts;
};

this.toPickle = function() {
return MARK + STRING + '\'' + m.key + '\'\n' + MARK + LONG + m.ts + 'L\n' + STRING + '\'' + m.value + '\'\n' + TUPLE + TUPLE + APPEND;
};
}

// A collection of measurements for sending to graphite.
function Stats() {
var s = this;
this.metrics = [];
this.add = function(key, value, ts) {
s.metrics.push(new Metric(key, value, ts));
};

this.toText = function() {
return s.metrics.map(function(m) { return m.toText(); }).join('\n') + '\n';
};

this.toPickle = function() {
var body = MARK + LIST + s.metrics.map(function(m) { return m.toPickle(); }).join('') + STOP;

// The first four bytes of the graphite pickle format
// contain the length of the rest of the payload.
// We use Buffer because this is binary data.
var buf = new Buffer(4 + body.length);

buf.writeUInt32BE(body.length,0);
buf.write(body,4);

return buf;
};
}

var flush_stats = function graphite_flush(ts, metrics) {
var ts_suffix = ' ' + ts + "\n";
var starttime = Date.now();
var statString = '';
var numStats = 0;
var key;
var timer_data_key;
Expand All @@ -109,21 +171,26 @@ var flush_stats = function graphite_flush(ts, metrics) {
}
};

// Flatten all the different types of metrics into a single
// collection so we can allow serialization to either the graphite
// text and pickle formats.
var stats = new Stats();

for (key in counters) {
var value = counters[key];
var valuePerSecond = counter_rates[key]; // pre-calculated "per second" rate
var keyName = sk(key);
var namespace = counterNamespace.concat(keyName);

if (legacyNamespace === true) {
statString += namespace.join(".") + globalSuffix + valuePerSecond + ts_suffix;
stats.add(namespace.join(".") + globalSuffix, valuePerSecond, ts);
if (flush_counts) {
statString += 'stats_counts.' + keyName + globalSuffix + value + ts_suffix;
stats.add('stats_counts.' + keyName + globalSuffix, value, ts);
}
} else {
statString += namespace.concat('rate').join(".") + globalSuffix + valuePerSecond + ts_suffix;
stats.add(namespace.concat('rate').join(".") + globalSuffix, valuePerSecond, ts);
if (flush_counts) {
statString += namespace.concat('count').join(".") + globalSuffix + value + ts_suffix;
stats.add(namespace.concat('count').join(".") + globalSuffix, value, ts);
}
}

Expand All @@ -136,14 +203,14 @@ var flush_stats = function graphite_flush(ts, metrics) {

for (timer_data_key in timer_data[key]) {
if (typeof(timer_data[key][timer_data_key]) === 'number') {
statString += the_key + '.' + timer_data_key + globalSuffix + timer_data[key][timer_data_key] + ts_suffix;
stats.add(the_key + '.' + timer_data_key + globalSuffix, timer_data[key][timer_data_key], ts);
} else {
for (var timer_data_sub_key in timer_data[key][timer_data_key]) {
if (debug) {
l.log(timer_data[key][timer_data_key][timer_data_sub_key].toString());
}
statString += the_key + '.' + timer_data_key + '.' + timer_data_sub_key + globalSuffix +
timer_data[key][timer_data_key][timer_data_sub_key] + ts_suffix;
stats.add(the_key + '.' + timer_data_key + '.' + timer_data_sub_key + globalSuffix,
timer_data[key][timer_data_key][timer_data_sub_key], ts);
}
}
}
Expand All @@ -152,32 +219,32 @@ var flush_stats = function graphite_flush(ts, metrics) {

for (key in gauges) {
var namespace = gaugesNamespace.concat(sk(key));
statString += namespace.join(".") + globalSuffix + gauges[key] + ts_suffix;
stats.add(namespace.join(".") + globalSuffix, gauges[key], ts);
numStats += 1;
}

for (key in sets) {
var namespace = setsNamespace.concat(sk(key));
statString += namespace.join(".") + '.count' + globalSuffix + sets[key].size() + ts_suffix;
stats.add(namespace.join(".") + '.count' + globalSuffix, sets[key].size(), ts);
numStats += 1;
}

var namespace = globalNamespace.concat(prefixStats);
if (legacyNamespace === true) {
statString += prefixStats + '.numStats' + globalSuffix + numStats + ts_suffix;
statString += 'stats.' + prefixStats + '.graphiteStats.calculationtime' + globalSuffix + (Date.now() - starttime) + ts_suffix;
stats.add(prefixStats + '.numStats' + globalSuffix, numStats, ts);
stats.add('stats.' + prefixStats + '.graphiteStats.calculationtime' + globalSuffix, (Date.now() - starttime), ts);
for (key in statsd_metrics) {
statString += 'stats.' + prefixStats + '.' + key + globalSuffix + statsd_metrics[key] + ts_suffix;
stats.add('stats.' + prefixStats + '.' + key + globalSuffix, statsd_metrics[key], ts);
}
} else {
statString += namespace.join(".") + '.numStats' + globalSuffix + numStats + ts_suffix;
statString += namespace.join(".") + '.graphiteStats.calculationtime' + globalSuffix + (Date.now() - starttime) + ts_suffix;
var namespace = globalNamespace.concat(prefixStats);
stats.add(namespace.join(".") + '.numStats' + globalSuffix, numStats, ts);
stats.add(namespace.join(".") + '.graphiteStats.calculationtime' + globalSuffix, (Date.now() - starttime) , ts);
for (key in statsd_metrics) {
var the_key = namespace.concat(key);
statString += the_key.join(".") + globalSuffix + statsd_metrics[key] + ts_suffix;
stats.add(the_key.join(".") + globalSuffix,+ statsd_metrics[key], ts);
}
}
post_stats(statString);
post_stats(stats);

if (debug) {
l.log("numStats: " + numStats);
Expand All @@ -194,7 +261,9 @@ exports.init = function graphite_init(startup_time, config, events, logger) {
debug = config.debug;
l = logger;
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;
graphitePort = config.graphitePort || 2003;
graphitePicklePort = config.graphitePicklePort || 2004;
graphiteProtocol = config.graphiteProtocol || 'text';
config.graphite = config.graphite || {};
globalPrefix = config.graphite.globalPrefix;
prefixCounter = config.graphite.prefixCounter;
Expand All @@ -214,10 +283,9 @@ exports.init = function graphite_init(startup_time, config, events, logger) {
prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
legacyNamespace = legacyNamespace !== undefined ? legacyNamespace : true;

// In order to unconditionally add this string, it either needs to be
// a single space if it was unset, OR surrounded by a . and a space if
// it was set.
globalSuffix = globalSuffix !== undefined ? '.' + globalSuffix + ' ' : ' ';
// In order to unconditionally add this string, it either needs to be an
// empty string if it was unset, OR prefixed by a . if it was set.
globalSuffix = globalSuffix !== undefined ? '.' + globalSuffix : '';

if (legacyNamespace === false) {
if (globalPrefix !== "") {
Expand Down
81 changes: 81 additions & 0 deletions docs/graphite_pickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
Pickling for Graphite
=====================

The graphite statsd backend can optionally be configured to use pickle
for its over-the-wire protocol.

```javascript
{ graphiteHost: "your.graphite.host",
graphiteProtocol: "pickle" }
```

The default is to use the graphite text protocol, which can require
more CPU processing by the graphite endpoint.

The message format expected by the graphite pickle endpoint consists
of a header and payload.

The Payload
-----------

The message payload is a list of tuples. Each tuple contains the measurement
for a single metric name. The measurement is encoded as a second,
nested tuple containing timestamp and measured value.

This ends up looking like:

```python
[ ( "path.to.metric.name", ( timestamp, "value" ) ),
( "path.to.another.name", ( timestamp, "value" ) ) ]
```

The graphite receiver `carbon.protocols.MetricPickleReceiver` coerces
both the timestamp and measured value into `float`.

The timestamp must be seconds since epoch encoded as a number.

The measured value is encoded as a string. This may change in the
future.

We have chosen to not implement pickle's object memoization. This
simplifies what is sent across the wire. It is not likely any
optimization would result within a single poll cycle.

Here is some Python code showing how a given set of metrics can be
serialized in a more simple way.

```python
import pickle

metrics = [ ( "a.b.c", ( 1234L, "5678" ) ), ( "d.e.f.g", ( 1234L, "9012" ) ) ]
pickle.dumps(metrics)
# "(lp0\n(S'a.b.c'\np1\n(L1234L\nS'5678'\np2\ntp3\ntp4\na(S'd.e.f.g'\np5\n(L1234L\nS'9012'\np6\ntp7\ntp8\na."

payload = "(l(S'a.b.c'\n(L1234L\nS'5678'\ntta(S'd.e.f.g'\n(L1234L\nS'9012'\ntta."
pickle.loads(payload)
# [('a.b.c', (1234L, '5678')), ('d.e.f.g', (1234L, '9012'))]
```

The trailing `L` for long fields is unnecessary, but we are adding the
character to match Python pickle output. It's a side-effect of
`repr(long(1234))`.

The Header
----------

The message header is a 32-bit integer sent over the wire as
four-bytes. This integer must describe the length of the pickled
payload.

Here is some sample code showing how to construct the message header
containing the payload length.

```python
import struct

payload_length = 81
header = struct.pack("!L", payload_length)
# '\x00\x00\x00Q'
```

The `Q` character is equivalent to `\x81` (ASCII encoding).
10 changes: 6 additions & 4 deletions exampleConfig.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
/*
Graphite Required Variables:
Graphite Required Variable:

(Leave these unset to avoid sending stats to Graphite.
Set debug flag and leave these unset to run in 'dry' debug mode -
(Leave this unset to avoid sending stats to Graphite.
Set debug flag and leave this unset to run in 'dry' debug mode -
useful for testing statsd clients without a Graphite server.)

graphiteHost: hostname or IP of Graphite server
graphitePort: port of Graphite server

Optional Variables:

graphitePort: port for the graphite text collector [default: 2003]
graphitePicklePort: port for the graphite pickle collector [default: 2004]
graphiteProtocol: either 'text' or 'pickle' [default: 'text']
backends: an array of backends to load. Each backend must exist
by name in the directory backends/. If not specified,
the default graphite backend will be loaded.
Expand Down
Loading