Skip to content

Commit

Permalink
Merge pull request #528 from aronatkins/pickle
Browse files Browse the repository at this point in the history
support pickle to graphite
  • Loading branch information
coykitten committed Mar 28, 2016
2 parents 30c2b6b + f5b66c5 commit ff845eb
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 44 deletions.
142 changes: 105 additions & 37 deletions backends/graphite.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
* This backend supports the following config options:
*
* graphiteHost: Hostname of graphite server.
* graphitePort: Port to contact graphite server at.
* graphitePort: Port for the graphite text collector. Defaults to 2003.
* graphitePicklePort: Port for the graphite pickle collector. Defaults to 2004.
* graphiteProtocol: Either 'text' or 'pickle'. Defaults to 'text'.
*
* If graphiteHost is not specified, metrics are processed but discarded.
*/

var net = require('net');
Expand All @@ -23,6 +27,8 @@ var debug;
var flushInterval;
var graphiteHost;
var graphitePort;
var graphitePicklePort;
var graphiteProtocol;
var flush_counts;

// prefix configuration
Expand All @@ -45,48 +51,104 @@ var setsNamespace = [];

var graphiteStats = {};

var post_stats = function graphite_post_stats(statString) {
var post_stats = function graphite_post_stats(stats) {
var last_flush = graphiteStats.last_flush || 0;
var last_exception = graphiteStats.last_exception || 0;
var flush_time = graphiteStats.flush_time || 0;
var flush_length = graphiteStats.flush_length || 0;

if (graphiteHost) {
try {
var graphite = net.createConnection(graphitePort, graphiteHost);
var port = graphiteProtocol == 'pickle' ? graphitePicklePort : graphitePort;
var graphite = net.createConnection(port, graphiteHost);
graphite.addListener('error', function(connectionException){
if (debug) {
l.log(connectionException);
}
});
graphite.on('connect', function() {
var ts = Math.round(new Date().getTime() / 1000);
var ts_suffix = ' ' + ts + "\n";
var ts = Math.round(Date.now() / 1000);
var namespace = globalNamespace.concat(prefixStats).join(".");
statString += namespace + '.graphiteStats.last_exception' + globalSuffix + last_exception + ts_suffix;
statString += namespace + '.graphiteStats.last_flush' + globalSuffix + last_flush + ts_suffix;
statString += namespace + '.graphiteStats.flush_time' + globalSuffix + flush_time + ts_suffix;
statString += namespace + '.graphiteStats.flush_length' + globalSuffix + flush_length + ts_suffix;
stats.add(namespace + '.graphiteStats.last_exception' + globalSuffix, last_exception, ts);
stats.add(namespace + '.graphiteStats.last_flush' + globalSuffix, last_flush , ts);
stats.add(namespace + '.graphiteStats.flush_time' + globalSuffix, flush_time , ts);
stats.add(namespace + '.graphiteStats.flush_length' + globalSuffix, flush_length , ts);
var stats_payload = graphiteProtocol == 'pickle' ? stats.toPickle() : stats.toText();

var starttime = Date.now();
this.write(statString);
this.write(stats_payload);
this.end();

graphiteStats.flush_time = (Date.now() - starttime);
graphiteStats.flush_length = statString.length;
graphiteStats.last_flush = Math.round(new Date().getTime() / 1000);
graphiteStats.flush_length = stats_payload.length;
graphiteStats.last_flush = Math.round(Date.now() / 1000);
});
} catch(e){
if (debug) {
l.log(e);
}
graphiteStats.last_exception = Math.round(new Date().getTime() / 1000);
graphiteStats.last_exception = Math.round(Date.now() / 1000);
}
}
};

// Minimally necessary pickle opcodes.
var MARK = '(',
STOP = '.',
LONG = 'L',
STRING = 'S',
APPEND = 'a',
LIST = 'l',
TUPLE = 't';

// A single measurement for sending to graphite.
function Metric(key, value, ts) {
var m = this;
this.key = key;
this.value = value;
this.ts = ts;

// return a string representation of this metric appropriate
// for sending to the graphite collector. does not include
// a trailing newline.
this.toText = function() {
return m.key + " " + m.value + " " + m.ts;
};

this.toPickle = function() {
return MARK + STRING + '\'' + m.key + '\'\n' + MARK + LONG + m.ts + 'L\n' + STRING + '\'' + m.value + '\'\n' + TUPLE + TUPLE + APPEND;
};
}

// A collection of measurements for sending to graphite.
function Stats() {
var s = this;
this.metrics = [];
this.add = function(key, value, ts) {
s.metrics.push(new Metric(key, value, ts));
};

this.toText = function() {
return s.metrics.map(function(m) { return m.toText(); }).join('\n') + '\n';
};

this.toPickle = function() {
var body = MARK + LIST + s.metrics.map(function(m) { return m.toPickle(); }).join('') + STOP;

// The first four bytes of the graphite pickle format
// contain the length of the rest of the payload.
// We use Buffer because this is binary data.
var buf = new Buffer(4 + body.length);

buf.writeUInt32BE(body.length,0);
buf.write(body,4);

return buf;
};
}

var flush_stats = function graphite_flush(ts, metrics) {
var ts_suffix = ' ' + ts + "\n";
var starttime = Date.now();
var statString = '';
var numStats = 0;
var key;
var timer_data_key;
Expand All @@ -109,21 +171,26 @@ var flush_stats = function graphite_flush(ts, metrics) {
}
};

// Flatten all the different types of metrics into a single
// collection so we can allow serialization to either the graphite
// text and pickle formats.
var stats = new Stats();

for (key in counters) {
var value = counters[key];
var valuePerSecond = counter_rates[key]; // pre-calculated "per second" rate
var keyName = sk(key);
var namespace = counterNamespace.concat(keyName);

if (legacyNamespace === true) {
statString += namespace.join(".") + globalSuffix + valuePerSecond + ts_suffix;
stats.add(namespace.join(".") + globalSuffix, valuePerSecond, ts);
if (flush_counts) {
statString += 'stats_counts.' + keyName + globalSuffix + value + ts_suffix;
stats.add('stats_counts.' + keyName + globalSuffix, value, ts);
}
} else {
statString += namespace.concat('rate').join(".") + globalSuffix + valuePerSecond + ts_suffix;
stats.add(namespace.concat('rate').join(".") + globalSuffix, valuePerSecond, ts);
if (flush_counts) {
statString += namespace.concat('count').join(".") + globalSuffix + value + ts_suffix;
stats.add(namespace.concat('count').join(".") + globalSuffix, value, ts);
}
}

Expand All @@ -136,14 +203,14 @@ var flush_stats = function graphite_flush(ts, metrics) {

for (timer_data_key in timer_data[key]) {
if (typeof(timer_data[key][timer_data_key]) === 'number') {
statString += the_key + '.' + timer_data_key + globalSuffix + timer_data[key][timer_data_key] + ts_suffix;
stats.add(the_key + '.' + timer_data_key + globalSuffix, timer_data[key][timer_data_key], ts);
} else {
for (var timer_data_sub_key in timer_data[key][timer_data_key]) {
if (debug) {
l.log(timer_data[key][timer_data_key][timer_data_sub_key].toString());
}
statString += the_key + '.' + timer_data_key + '.' + timer_data_sub_key + globalSuffix +
timer_data[key][timer_data_key][timer_data_sub_key] + ts_suffix;
stats.add(the_key + '.' + timer_data_key + '.' + timer_data_sub_key + globalSuffix,
timer_data[key][timer_data_key][timer_data_sub_key], ts);
}
}
}
Expand All @@ -152,32 +219,32 @@ var flush_stats = function graphite_flush(ts, metrics) {

for (key in gauges) {
var namespace = gaugesNamespace.concat(sk(key));
statString += namespace.join(".") + globalSuffix + gauges[key] + ts_suffix;
stats.add(namespace.join(".") + globalSuffix, gauges[key], ts);
numStats += 1;
}

for (key in sets) {
var namespace = setsNamespace.concat(sk(key));
statString += namespace.join(".") + '.count' + globalSuffix + sets[key].size() + ts_suffix;
stats.add(namespace.join(".") + '.count' + globalSuffix, sets[key].size(), ts);
numStats += 1;
}

var namespace = globalNamespace.concat(prefixStats);
if (legacyNamespace === true) {
statString += prefixStats + '.numStats' + globalSuffix + numStats + ts_suffix;
statString += 'stats.' + prefixStats + '.graphiteStats.calculationtime' + globalSuffix + (Date.now() - starttime) + ts_suffix;
stats.add(prefixStats + '.numStats' + globalSuffix, numStats, ts);
stats.add('stats.' + prefixStats + '.graphiteStats.calculationtime' + globalSuffix, (Date.now() - starttime), ts);
for (key in statsd_metrics) {
statString += 'stats.' + prefixStats + '.' + key + globalSuffix + statsd_metrics[key] + ts_suffix;
stats.add('stats.' + prefixStats + '.' + key + globalSuffix, statsd_metrics[key], ts);
}
} else {
statString += namespace.join(".") + '.numStats' + globalSuffix + numStats + ts_suffix;
statString += namespace.join(".") + '.graphiteStats.calculationtime' + globalSuffix + (Date.now() - starttime) + ts_suffix;
var namespace = globalNamespace.concat(prefixStats);
stats.add(namespace.join(".") + '.numStats' + globalSuffix, numStats, ts);
stats.add(namespace.join(".") + '.graphiteStats.calculationtime' + globalSuffix, (Date.now() - starttime) , ts);
for (key in statsd_metrics) {
var the_key = namespace.concat(key);
statString += the_key.join(".") + globalSuffix + statsd_metrics[key] + ts_suffix;
stats.add(the_key.join(".") + globalSuffix,+ statsd_metrics[key], ts);
}
}
post_stats(statString);
post_stats(stats);

if (debug) {
l.log("numStats: " + numStats);
Expand All @@ -194,7 +261,9 @@ exports.init = function graphite_init(startup_time, config, events, logger) {
debug = config.debug;
l = logger;
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;
graphitePort = config.graphitePort || 2003;
graphitePicklePort = config.graphitePicklePort || 2004;
graphiteProtocol = config.graphiteProtocol || 'text';
config.graphite = config.graphite || {};
globalPrefix = config.graphite.globalPrefix;
prefixCounter = config.graphite.prefixCounter;
Expand All @@ -214,10 +283,9 @@ exports.init = function graphite_init(startup_time, config, events, logger) {
prefixStats = prefixStats !== undefined ? prefixStats : "statsd";
legacyNamespace = legacyNamespace !== undefined ? legacyNamespace : true;

// In order to unconditionally add this string, it either needs to be
// a single space if it was unset, OR surrounded by a . and a space if
// it was set.
globalSuffix = globalSuffix !== undefined ? '.' + globalSuffix + ' ' : ' ';
// In order to unconditionally add this string, it either needs to be an
// empty string if it was unset, OR prefixed by a . if it was set.
globalSuffix = globalSuffix !== undefined ? '.' + globalSuffix : '';

if (legacyNamespace === false) {
if (globalPrefix !== "") {
Expand Down
81 changes: 81 additions & 0 deletions docs/graphite_pickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
Pickling for Graphite
=====================

The graphite statsd backend can optionally be configured to use pickle
for its over-the-wire protocol.

```javascript
{ graphiteHost: "your.graphite.host",
graphiteProtocol: "pickle" }
```

The default is to use the graphite text protocol, which can require
more CPU processing by the graphite endpoint.

The message format expected by the graphite pickle endpoint consists
of a header and payload.

The Payload
-----------

The message payload is a list of tuples. Each tuple contains the measurement
for a single metric name. The measurement is encoded as a second,
nested tuple containing timestamp and measured value.

This ends up looking like:

```python
[ ( "path.to.metric.name", ( timestamp, "value" ) ),
( "path.to.another.name", ( timestamp, "value" ) ) ]
```

The graphite receiver `carbon.protocols.MetricPickleReceiver` coerces
both the timestamp and measured value into `float`.

The timestamp must be seconds since epoch encoded as a number.

The measured value is encoded as a string. This may change in the
future.

We have chosen to not implement pickle's object memoization. This
simplifies what is sent across the wire. It is not likely any
optimization would result within a single poll cycle.

Here is some Python code showing how a given set of metrics can be
serialized in a more simple way.

```python
import pickle

metrics = [ ( "a.b.c", ( 1234L, "5678" ) ), ( "d.e.f.g", ( 1234L, "9012" ) ) ]
pickle.dumps(metrics)
# "(lp0\n(S'a.b.c'\np1\n(L1234L\nS'5678'\np2\ntp3\ntp4\na(S'd.e.f.g'\np5\n(L1234L\nS'9012'\np6\ntp7\ntp8\na."

payload = "(l(S'a.b.c'\n(L1234L\nS'5678'\ntta(S'd.e.f.g'\n(L1234L\nS'9012'\ntta."
pickle.loads(payload)
# [('a.b.c', (1234L, '5678')), ('d.e.f.g', (1234L, '9012'))]
```

The trailing `L` for long fields is unnecessary, but we are adding the
character to match Python pickle output. It's a side-effect of
`repr(long(1234))`.

The Header
----------

The message header is a 32-bit integer sent over the wire as
four-bytes. This integer must describe the length of the pickled
payload.

Here is some sample code showing how to construct the message header
containing the payload length.

```python
import struct

payload_length = 81
header = struct.pack("!L", payload_length)
# '\x00\x00\x00Q'
```

The `Q` character is equivalent to `\x81` (ASCII encoding).
10 changes: 6 additions & 4 deletions exampleConfig.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
/*
Graphite Required Variables:
Graphite Required Variable:
(Leave these unset to avoid sending stats to Graphite.
Set debug flag and leave these unset to run in 'dry' debug mode -
(Leave this unset to avoid sending stats to Graphite.
Set debug flag and leave this unset to run in 'dry' debug mode -
useful for testing statsd clients without a Graphite server.)
graphiteHost: hostname or IP of Graphite server
graphitePort: port of Graphite server
Optional Variables:
graphitePort: port for the graphite text collector [default: 2003]
graphitePicklePort: port for the graphite pickle collector [default: 2004]
graphiteProtocol: either 'text' or 'pickle' [default: 'text']
backends: an array of backends to load. Each backend must exist
by name in the directory backends/. If not specified,
the default graphite backend will be loaded.
Expand Down
Loading

0 comments on commit ff845eb

Please sign in to comment.