Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support pickle to graphite. #312

Closed
wants to merge 6 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
pickling support.
aronatkins committed Jul 10, 2013
commit 2c6b83b0d9bcb19466b3da419bb6c2cc06f9ccfb
25 changes: 24 additions & 1 deletion backends/graphite.js
Original file line number Diff line number Diff line change
@@ -91,6 +91,15 @@ var post_stats = function graphite_post_stats(stats) {
}
};

// Minimally necessary pickle opcodes.
var MARK = '(',
STOP = '.',
LONG = 'L',
STRING = 'S',
APPEND = 'a',
LIST = 'l',
TUPLE = 't';

// A single measurement for sending to graphite.
function Metric(key, value, ts) {
var m = this;
@@ -104,6 +113,10 @@ function Metric(key, value, ts) {
this.toText = function() {
return m.key + " " + m.value + " " + m.ts;
};

this.toPickle = function() {
return MARK + STRING + '\'' + m.key + '\'\n' + MARK + LONG + m.ts + 'L\n' + STRING + '\'' + m.value + '\'\n' + TUPLE + TUPLE + APPEND;
};
}

// A collection of measurements for sending to graphite.
@@ -119,7 +132,17 @@ function Stats() {
};

this.toPickle = function() {
return '\n';
var body = MARK + LIST + s.metrics.map(function(m) { return m.toPickle(); }).join('') + STOP;

// The first four bytes of the graphite pickle format
// contain the length of the rest of the payload.
// We use Buffer because this is binary data.
var buf = new Buffer(4 + body.length);

buf.writeUInt32BE(body.length,0);
buf.write(body,4);

return buf;
};
}

81 changes: 81 additions & 0 deletions docs/graphite_pickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
Pickling for Graphite
=====================

The graphite statsd backend can optionally be configured to use pickle
for its over-the-wire protocol.

```javascript
{ graphiteHost: "your.graphite.host",
graphiteProtocol: "pickle" }
```

The default is to use the graphite text protocol, which can require
more CPU processing by the graphite endpoint.

The message format expected by the graphite pickle endpoint consists
of a header and payload.

The Payload
-----------

The message payload is a list of tuples. Each tuple contains the measurement
for a single metric name. The measurement is encoded as a second,
nested tuple containing timestamp and measured value.

This ends up looking like:

```python
[ ( "path.to.metric.name", ( timestamp, "value" ) ),
( "path.to.another.name", ( timestamp, "value" ) ) ]
```

The graphite receiver `carbon.protocols.MetricPickleReceiver` coerces
both the timestamp and measured value into `float`.

The timestamp must be seconds since epoch encoded as a number.

The measured value is encoded as a string. This may change in the
future.

We have chosen to not implement pickle's object memoization. This
simplifies what is sent across the wire. It is not likely any
optimization would result within a single poll cycle.

Here is some Python code showing how a given set of metrics can be
serialized in a more simple way.

```python
import pickle

metrics = [ ( "a.b.c", ( 1234L, "5678" ) ), ( "d.e.f.g", ( 1234L, "9012" ) ) ]
pickle.dumps(metrics)
# "(lp0\n(S'a.b.c'\np1\n(L1234L\nS'5678'\np2\ntp3\ntp4\na(S'd.e.f.g'\np5\n(L1234L\nS'9012'\np6\ntp7\ntp8\na."

payload = "(l(S'a.b.c'\n(L1234L\nS'5678'\ntta(S'd.e.f.g'\n(L1234L\nS'9012'\ntta."
pickle.loads(payload)
# [('a.b.c', (1234L, '5678')), ('d.e.f.g', (1234L, '9012'))]
```

The trailing `L` for long fields is unnecessary, but we are adding the
character to match Python pickle output. It's a side-effect of
`repr(long(1234))`.

The Header
----------

The message header is a 32-bit integer sent over the wire as
four-bytes. This integer must describe the length of the pickled
payload.

Here is some sample code showing how to construct the message header
containing the payload length.

```python
import struct

payload_length = 81
header = struct.pack("!L", payload_length)
# '\x00\x00\x00Q'
```

The `Q` character is equivalent to `\x81` (ASCII encoding).
2 changes: 1 addition & 1 deletion exampleConfig.js
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ Optional Variables:

graphitePort: port for the graphite text collector [default: 2003]
graphitePicklePort: port for the graphite pickle collector [default: 2004]
graphiteProtocol: either 'text' or 'pickle' [default: text]
graphiteProtocol: either 'text' or 'pickle' [default: 'text']
backends: an array of backends to load. Each backend must exist
by name in the directory backends/. If not specified,
the default graphite backend will be loaded.
226 changes: 226 additions & 0 deletions test/graphite_pickle_tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
var fs = require('fs'),
net = require('net'),
temp = require('temp'),
cp = require('child_process'),
util = require('util'),
urlparse = require('url').parse,
_ = require('underscore'),
dgram = require('dgram'),
qsparse = require('querystring').parse,
http = require('http');

var spawn = cp.spawn;

var writeconfig = function(text,worker,cb,obj){
temp.open({suffix: '-statsdconf.js'}, function(err, info) {
if (err) throw err;
fs.writeSync(info.fd, text);
fs.close(info.fd, function(err) {
if (err) throw err;
worker(info.path,cb,obj);
});
});
};

var statsd_send = function(data,sock,host,port,cb){
send_data = new Buffer(data);
sock.send(send_data,0,send_data.length,port,host,function(err,bytes){
if (err) {
throw err;
}
cb();
});
};

// keep collecting data until a specified timeout period has elapsed
// this will let us capture all data chunks so we don't miss one
var collect_for = function(server,timeout,cb){
// We have binary data arriving over the wire. Avoid strings.
var received = new Buffer(0);
var in_flight = 0;
var timed_out = false;
var collector = function(req,res){
in_flight += 1;
req.on('data',function(data){ received = Buffer.concat([received,data]); });
req.on('end',function(){
in_flight -= 1;
if((in_flight < 1) && timed_out){
server.removeListener('request',collector);
cb(received);
}
});
};

setTimeout(function (){
timed_out = true;
if((in_flight < 1)) {
server.removeListener('connection',collector);
cb(received);
}
},timeout);

server.on('connection',collector);
};

// A python script that converts from the graphite pickle-based
// wire protocol into JSON written to stdout.
var script =
"import sys\n" +
"import cPickle\n" +
"import struct\n" +
"import json\n" +
"payload = open(sys.argv[1], 'rb').read()\n" +
"pack_format = '!L'\n" +
"header_length = struct.calcsize(pack_format)\n" +
"payload_length, = struct.unpack(pack_format, payload[:header_length])\n" +
"batch_length = header_length + payload_length\n" +
"metrics = cPickle.loads(payload[header_length:batch_length])\n" +
"print json.dumps(metrics)\n";

// Write our binary payload and unpickling script to disk
// then process the unserialized results.
var unpickle = function(payload, cb) {
temp.open({suffix: '-payload.pickle'}, function(err, payload_info) {
if (err) throw err;

// the header may contain null characters. explicit length is necessary.
var len = fs.writeSync(payload_info.fd, payload, 0, payload.length);
fs.close(payload_info.fd, function(err) {
if (err) throw err;

temp.open({suffix:'-unpickle.py'}, function(err, unpickle_info) {
if (err) throw err;

fs.writeSync(unpickle_info.fd, script);
fs.close(unpickle_info.fd, function(err) {
if (err) throw err;

var cmd = 'python ' + unpickle_info.path + ' ' + payload_info.path;
var python = cp.exec(cmd, function(err, stdout, stderr) {
if (err) throw err;
var metrics = JSON.parse(stdout);
// Transform the output into the same list of dictionaries
// used by the other graphite_* tests so our tests look
// the same.
var hashes = _.map(metrics, function(m) {
var data = {};
data[m[0]] = m[1][1];
return data;
});
cb(hashes);
});
});
});
});
});
};

module.exports = {
setUp: function (callback) {
this.testport = 31337;
this.myflush = 200;
var configfile = "{graphService: \"graphite\"\n\
, batch: 200 \n\
, flushInterval: " + this.myflush + " \n\
, percentThreshold: 90\n\
, histogram: [ { metric: \"a_test_value\", bins: [1000] } ]\n\
, port: 8125\n\
, dumpMessages: false \n\
, debug: false\n\
, graphite: { legacyNamespace: false }\n\
, graphitePicklePort: " + this.testport + "\n\
, graphiteHost: \"127.0.0.1\"\n\
, graphiteProtocol: \"pickle\"}";

this.acceptor = net.createServer();
this.acceptor.listen(this.testport);
this.sock = dgram.createSocket('udp4');

this.server_up = true;
this.ok_to_die = false;
this.exit_callback_callback = process.exit;

writeconfig(configfile,function(path,cb,obj){
obj.path = path;
obj.server = spawn('node',['stats.js', path]);
obj.exit_callback = function (code) {
obj.server_up = false;
if(!obj.ok_to_die){
console.log('node server unexpectedly quit with code: ' + code);
process.exit(1);
}
else {
obj.exit_callback_callback();
}
};
obj.server.on('exit', obj.exit_callback);
obj.server.stderr.on('data', function (data) {
console.log('stderr: ' + data.toString().replace(/\n$/,''));
});
/*
obj.server.stdout.on('data', function (data) {
console.log('stdout: ' + data.toString().replace(/\n$/,''));
});
*/
obj.server.stdout.on('data', function (data) {
// wait until server is up before we finish setUp
if (data.toString().match(/server is up/)) {
cb();
}
});

},callback,this);
},

tearDown: function (callback) {
this.sock.close();
this.acceptor.close();
this.ok_to_die = true;
if(this.server_up){
this.exit_callback_callback = callback;
this.server.kill();
} else {
callback();
}
},

timers_are_valid: function (test) {
test.expect(6);

var testvalue = 100;
var me = this;
this.acceptor.once('connection',function(c){
statsd_send('a_test_value:' + testvalue + '|ms',me.sock,'127.0.0.1',8125,function(){
collect_for(me.acceptor,me.myflush*2,function(payload){
test.ok(payload.length > 0,'should receive some data');
unpickle(payload, function(hashes) {
var numstat_test = function(post){
var mykey = 'stats.statsd.numStats';
return _.include(_.keys(post),mykey) && (post[mykey] == 3);
};
test.ok(_.any(hashes,numstat_test), 'stats.statsd.numStats should be 3');

var testtimervalue_test = function(post){
var mykey = 'stats.timers.a_test_value.mean_90';
return _.include(_.keys(post),mykey) && (post[mykey] == testvalue);
};
var testtimerhistogramvalue_test = function(post){
var mykey = 'stats.timers.a_test_value.histogram.bin_1000';
return _.include(_.keys(post),mykey) && (post[mykey] == 1);
};
test.ok(_.any(hashes,testtimerhistogramvalue_test), 'stats.timers.a_test_value.histogram.bin_1000 should be ' + 1);
test.ok(_.any(hashes,testtimervalue_test), 'stats.timers.a_test_value.mean_90 should be ' + testvalue);

var count_test = function(post, metric){
var mykey = 'stats.timers.a_test_value.' + metric;
return _.first(_.filter(_.pluck(post, mykey), function (e) { return e; }));
};
test.equals(count_test(hashes, 'count_ps'), 5, 'count_ps should be 5');
test.equals(count_test(hashes, 'count'), 1, 'count should be 1');
test.done();
});
});
});
});
}
};