Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support pickle to graphite. #312

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -75,6 +75,10 @@ eliminate race conditions but it may be possible to encounter a stuck state. If
doing dev work, a `killall statsd` will kill any stray test servers in the
background (don't do this on a production machine!).

Be sure to install the test dependencies:

npm install nodeunit temp underscore

Tests can be executed with `./run_tests.sh`.


143 changes: 107 additions & 36 deletions backends/graphite.js
Original file line number Diff line number Diff line change
@@ -11,7 +11,11 @@
* This backend supports the following config options:
*
* graphiteHost: Hostname of graphite server.
* graphitePort: Port to contact graphite server at.
* graphitePort: Port for the graphite text collector. Defaults to 2003.
* graphitePicklePort: Port for the graphite pickle collector. Defaults to 2004.
* graphiteProtocol: Either 'text' or 'pickle'. Defaults to 'text'.
*
* If graphiteHost is not specified, metrics are processed but discarded.
*/

var net = require('net'),
@@ -24,6 +28,8 @@ var debug;
var flushInterval;
var graphiteHost;
var graphitePort;
var graphitePicklePort;
var graphiteProtocol;

// prefix configuration
var globalPrefix;
@@ -43,47 +49,105 @@ var setsNamespace = [];

var graphiteStats = {};

var post_stats = function graphite_post_stats(statString) {
var post_stats = function graphite_post_stats(stats) {
var last_flush = graphiteStats.last_flush || 0;
var last_exception = graphiteStats.last_exception || 0;
var flush_time = graphiteStats.flush_time || 0;
var flush_length = graphiteStats.flush_length || 0;

if (graphiteHost) {
try {
var graphite = net.createConnection(graphitePort, graphiteHost);
var port = graphiteProtocol == 'pickle' ? graphitePicklePort : graphitePort;
var graphite = net.createConnection(port, graphiteHost);
graphite.addListener('error', function(connectionException){
if (debug) {
l.log(connectionException);
}
});
graphite.on('connect', function() {
var ts = Math.round(new Date().getTime() / 1000);
var ts_suffix = ' ' + ts + "\n";
var ts = Math.round(Date.now() / 1000);
var namespace = globalNamespace.concat(prefixStats).join(".");
statString += namespace + '.graphiteStats.last_exception ' + last_exception + ts_suffix;
statString += namespace + '.graphiteStats.last_flush ' + last_flush + ts_suffix;
statString += namespace + '.graphiteStats.flush_time ' + flush_time + ts_suffix;
statString += namespace + '.graphiteStats.flush_length ' + flush_length + ts_suffix;
stats.add(namespace + '.graphiteStats.last_exception', last_exception, ts);
stats.add(namespace + '.graphiteStats.last_flush', last_flush, ts);
stats.add(namespace + '.graphiteStats.flush_time', flush_time, ts);
stats.add(namespace + '.graphiteStats.flush_length', flush_length, ts);

var stats_payload = graphiteProtocol == 'pickle' ? stats.toPickle() : stats.toText();

var starttime = Date.now();
this.write(statString);
this.write(stats_payload);
this.end();

graphiteStats.flush_time = (Date.now() - starttime);
graphiteStats.flush_length = statString.length;
graphiteStats.last_flush = Math.round(new Date().getTime() / 1000);
graphiteStats.flush_length = stats_payload.length;
graphiteStats.last_flush = Math.round(Date.now() / 1000);
});
} catch(e){
if (debug) {
l.log(e);
}
graphiteStats.last_exception = Math.round(new Date().getTime() / 1000);
graphiteStats.last_exception = Math.round(Date.now() / 1000);
}
}
};

// Minimally necessary pickle opcodes.
var MARK = '(',
STOP = '.',
LONG = 'L',
STRING = 'S',
APPEND = 'a',
LIST = 'l',
TUPLE = 't';

// A single measurement for sending to graphite.
function Metric(key, value, ts) {
var m = this;
this.key = key;
this.value = value;
this.ts = ts;

// return a string representation of this metric appropriate
// for sending to the graphite collector. does not include
// a trailing newline.
this.toText = function() {
return m.key + " " + m.value + " " + m.ts;
};

this.toPickle = function() {
return MARK + STRING + '\'' + m.key + '\'\n' + MARK + LONG + m.ts + 'L\n' + STRING + '\'' + m.value + '\'\n' + TUPLE + TUPLE + APPEND;
};
}

// A collection of measurements for sending to graphite.
function Stats() {
var s = this;
this.metrics = [];
this.add = function(key, value, ts) {
s.metrics.push(new Metric(key, value, ts));
};

this.toText = function() {
return s.metrics.map(function(m) { return m.toText(); }).join('\n') + '\n';
};

this.toPickle = function() {
var body = MARK + LIST + s.metrics.map(function(m) { return m.toPickle(); }).join('') + STOP;

// The first four bytes of the graphite pickle format
// contain the length of the rest of the payload.
// We use Buffer because this is binary data.
var buf = new Buffer(4 + body.length);

buf.writeUInt32BE(body.length,0);
buf.write(body,4);

return buf;
};
}

var flush_stats = function graphite_flush(ts, metrics) {
var ts_suffix = ' ' + ts + "\n";
var starttime = Date.now();
var statString = '';
var numStats = 0;
var key;
var timer_data_key;
@@ -95,17 +159,22 @@ var flush_stats = function graphite_flush(ts, metrics) {
var timer_data = metrics.timer_data;
var statsd_metrics = metrics.statsd_metrics;

// Flatten all the different types of metrics into a single
// collection so we can allow serialization to either the graphite
// text and pickle formats.
var stats = new Stats();

for (key in counters) {
var namespace = counterNamespace.concat(key);
var value = counters[key];
var valuePerSecond = counter_rates[key]; // pre-calculated "per second" rate

if (legacyNamespace === true) {
statString += namespace.join(".") + ' ' + valuePerSecond + ts_suffix;
statString += 'stats_counts.' + key + ' ' + value + ts_suffix;
stats.add(namespace.join("."), valuePerSecond, ts);
stats.add('stats_counts.' + key, value, ts);
} else {
statString += namespace.concat('rate').join(".") + ' ' + valuePerSecond + ts_suffix;
statString += namespace.concat('count').join(".") + ' ' + value + ts_suffix;
stats.add(namespace.concat('rate').join("."), valuePerSecond, ts);
stats.add(namespace.concat('count').join("."), value, ts);
}

numStats += 1;
@@ -116,48 +185,48 @@ var flush_stats = function graphite_flush(ts, metrics) {
var the_key = namespace.join(".");
for (timer_data_key in timer_data[key]) {
if (typeof(timer_data[key][timer_data_key]) === 'number') {
statString += the_key + '.' + timer_data_key + ' ' + timer_data[key][timer_data_key] + ts_suffix;
stats.add(the_key + '.' + timer_data_key, timer_data[key][timer_data_key], ts);
} else {
for (var timer_data_sub_key in timer_data[key][timer_data_key]) {
if (debug) {
l.log(timer_data[key][timer_data_key][timer_data_sub_key].toString());
}
statString += the_key + '.' + timer_data_key + '.' + timer_data_sub_key + ' ' +
timer_data[key][timer_data_key][timer_data_sub_key] + ts_suffix;
stats.add(the_key + '.' + timer_data_key + '.' + timer_data_sub_key,
timer_data[key][timer_data_key][timer_data_sub_key], ts);
}
}
}
numStats += 1;
}

for (key in gauges) {
var namespace = gaugesNamespace.concat(key);
statString += namespace.join(".") + ' ' + gauges[key] + ts_suffix;
stats.add(gaugesNamespace.concat(key).join("."), gauges[key], ts);
numStats += 1;
}

for (key in sets) {
var namespace = setsNamespace.concat(key);
statString += namespace.join(".") + '.count ' + sets[key].values().length + ts_suffix;
stats.add(setsNamespace.concat([key, 'count']).join("."),
sets[key].values().length, ts);
numStats += 1;
}

var namespace = globalNamespace.concat(prefixStats);
if (legacyNamespace === true) {
statString += prefixStats + '.numStats ' + numStats + ts_suffix;
statString += 'stats.' + prefixStats + '.graphiteStats.calculationtime ' + (Date.now() - starttime) + ts_suffix;
stats.add(prefixStats + '.numStats', numStats, ts);
stats.add('stats.' + prefixStats + '.graphiteStats.calculationtime',
(Date.now() - starttime), ts);
for (key in statsd_metrics) {
statString += 'stats.' + prefixStats + '.' + key + ' ' + statsd_metrics[key] + ts_suffix;
stats.add('stats.' + prefixStats + '.' + key, statsd_metrics[key], ts);
}
} else {
statString += namespace.join(".") + '.numStats ' + numStats + ts_suffix;
statString += namespace.join(".") + '.graphiteStats.calculationtime ' + (Date.now() - starttime) + ts_suffix;
var namespace = globalNamespace.concat(prefixStats);
stats.add(namespace.concat('numStats').join("."), numStats, ts);
stats.add(namespace.concat('graphiteStats.calculationtime').join("."),
(Date.now() - starttime), ts);
for (key in statsd_metrics) {
var the_key = namespace.concat(key);
statString += the_key.join(".") + ' ' + statsd_metrics[key] + ts_suffix;
stats.add(namespace.concat(key).join("."), statsd_metrics[key], ts);
}
}
post_stats(statString);
post_stats(stats);
if (debug) {
l.log("numStats: " + numStats);
}
@@ -173,7 +242,9 @@ exports.init = function graphite_init(startup_time, config, events) {
l = new logger.Logger(config.log || {});
debug = config.debug;
graphiteHost = config.graphiteHost;
graphitePort = config.graphitePort;
graphitePort = config.graphitePort || 2003;
graphitePicklePort = config.graphitePicklePort || 2004;
graphiteProtocol = config.graphiteProtocol || 'text';
config.graphite = config.graphite || {};
globalPrefix = config.graphite.globalPrefix;
prefixCounter = config.graphite.prefixCounter;
81 changes: 81 additions & 0 deletions docs/graphite_pickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
Pickling for Graphite
=====================

The graphite statsd backend can optionally be configured to use pickle
for its over-the-wire protocol.

```javascript
{ graphiteHost: "your.graphite.host",
graphiteProtocol: "pickle" }
```

The default is to use the graphite text protocol, which can require
more CPU processing by the graphite endpoint.

The message format expected by the graphite pickle endpoint consists
of a header and payload.

The Payload
-----------

The message payload is a list of tuples. Each tuple contains the measurement
for a single metric name. The measurement is encoded as a second,
nested tuple containing timestamp and measured value.

This ends up looking like:

```python
[ ( "path.to.metric.name", ( timestamp, "value" ) ),
( "path.to.another.name", ( timestamp, "value" ) ) ]
```

The graphite receiver `carbon.protocols.MetricPickleReceiver` coerces
both the timestamp and measured value into `float`.

The timestamp must be seconds since epoch encoded as a number.

The measured value is encoded as a string. This may change in the
future.

We have chosen to not implement pickle's object memoization. This
simplifies what is sent across the wire. It is not likely any
optimization would result within a single poll cycle.

Here is some Python code showing how a given set of metrics can be
serialized in a more simple way.

```python
import pickle

metrics = [ ( "a.b.c", ( 1234L, "5678" ) ), ( "d.e.f.g", ( 1234L, "9012" ) ) ]
pickle.dumps(metrics)
# "(lp0\n(S'a.b.c'\np1\n(L1234L\nS'5678'\np2\ntp3\ntp4\na(S'd.e.f.g'\np5\n(L1234L\nS'9012'\np6\ntp7\ntp8\na."

payload = "(l(S'a.b.c'\n(L1234L\nS'5678'\ntta(S'd.e.f.g'\n(L1234L\nS'9012'\ntta."
pickle.loads(payload)
# [('a.b.c', (1234L, '5678')), ('d.e.f.g', (1234L, '9012'))]
```

The trailing `L` for long fields is unnecessary, but we are adding the
character to match Python pickle output. It's a side-effect of
`repr(long(1234))`.

The Header
----------

The message header is a 32-bit integer sent over the wire as
four-bytes. This integer must describe the length of the pickled
payload.

Here is some sample code showing how to construct the message header
containing the payload length.

```python
import struct

payload_length = 81
header = struct.pack("!L", payload_length)
# '\x00\x00\x00Q'
```

The `Q` character is equivalent to `\x81` (ASCII encoding).
6 changes: 4 additions & 2 deletions exampleConfig.js
Original file line number Diff line number Diff line change
@@ -4,17 +4,19 @@ Required Variables:

port: StatsD listening port [default: 8125]

Graphite Required Variables:
Graphite Required Variable:

(Leave these unset to avoid sending stats to Graphite.
Set debug flag and leave these unset to run in 'dry' debug mode -
useful for testing statsd clients without a Graphite server.)

graphiteHost: hostname or IP of Graphite server
graphitePort: port of Graphite server

Optional Variables:

graphitePort: port for the graphite text collector [default: 2003]
graphitePicklePort: port for the graphite pickle collector [default: 2004]
graphiteProtocol: either 'text' or 'pickle' [default: 'text']
backends: an array of backends to load. Each backend must exist
by name in the directory backends/. If not specified,
the default graphite backend will be loaded.
Loading
Oops, something went wrong.