Skip to content

Commit

Permalink
Merge pull request statsd#284 from etsy/cluster_proxy
Browse files Browse the repository at this point in the history
Merging in the Statsd Cluster Proxy
  • Loading branch information
draco2003 committed Oct 3, 2013
2 parents d4ff54b ef77751 commit 1dd5244
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 1 deletion.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 52,7 @@ More Specific Topics
* [Admin TCP Interface][docs_admin_interface]
* [Backend Interface][docs_backend_interface]
* [Metric Namespacing][docs_namespacing]

* [Statsd Cluster Proxy][docs_cluster_proxy]

Debugging
---------
Expand Down Expand Up @@ -121,5 121,6 @@ https://github.com/etsy/statsd/graphs/contributors
[docs_admin_interface]: https://github.com/etsy/statsd/blob/master/docs/admin_interface.md
[docs_backend_interface]: https://github.com/etsy/statsd/blob/master/docs/backend_interface.md
[docs_namespacing]: https://github.com/etsy/statsd/blob/master/docs/namespacing.md
[docs_cluster_proxy]: https://github.com/etsy/statsd/blob/master/docs/cluster_proxy.md
[travis-ci_status_img]: https://travis-ci.org/etsy/statsd.png?branch=backends-as-packages
[travis-ci_statsd]: https://travis-ci.org/etsy/statsd
33 changes: 33 additions & 0 deletions docs/cluster_proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 1,33 @@
Statsd Cluster Proxy
==============

Statsd Cluster Proxy is a udp proxy that sits infront of multiple statsd instances.


Create a proxyConfig.js file
cp exampleProxyConfig.js proxyConfig.js

Once you have a config file run:
node proxy.js proxyConfig.js


It uses a consistent hashring to send the unique metric names to the same statsd instances so that
the aggregation works properly.

It handles a simple health check that dynamically recalculates the hashring if a statsd instance goes offline.

Config Options
------

nodes the array of node objects
host the ip of the statsd instance
port port of the statsd receiver
adminport port of the admin interface for this instance

udp_version the string 'udp4' or udp6'

host the host ip to listen on
port the port the proxy listens on

checkInterval the interval between healthchecks

30 changes: 30 additions & 0 deletions exampleProxyConfig.js
Original file line number Diff line number Diff line change
@@ -0,0 1,30 @@
/*
Required Variables:
port: StatsD Cluster Proxy listening port [default: 8125]
nodes: list of StatsD instances
host: address of an instance of StatsD
port: port that this instance is listening on
adminport: port that this instance is listening on for the admininterface
Optional Variables:
udp_version: defines if the address is an IPv4 or IPv6 address ['udp4' or 'udp6', default: 'udp4']
host: address to listen on over UDP [default: 0.0.0.0]
checkInterval: health status check interval [default: 10000]
cacheSize: size of the cache to store for hashring key lookups [default: 10000]
*/
{
nodes: [
{host: '127.0.0.1', port: 8129, adminport: 8126},
{host: '127.0.0.1', port: 8127, adminport: 8128},
{host: '127.0.0.1', port: 8129, adminport: 8130}
],
udp_version: 'udp4',
host: '0.0.0.0',
port: '8125',
checkInterval: 1000,
cacheSize: 10000
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 22,7 @@
},
"optionalDependencies": {
"node-syslog":"1.1.7",
"hashring":"1.0.1",
"winser": "=0.0.11"
},
"engines": {
Expand Down
139 changes: 139 additions & 0 deletions proxy.js
Original file line number Diff line number Diff line change
@@ -0,0 1,139 @@
var dgram = require('dgram')
, net = require('net')
, events = require('events')
, logger = require('./lib/logger')
, hashring = require('hashring')
, configlib = require('./lib/config');

var packet = new events.EventEmitter();
var node_status = [];
var node_ring = {};
var config;
var l; // logger

configlib.configFile(process.argv[2], function (conf, oldConfig) {
config = conf;
var udp_version = config.udp_version
, nodes = config.nodes;
l = new logger.Logger(config.log || {});

//load the node_ring object with the available nodes and a weight of 100
// weight is currently arbitrary but the same for all
nodes.forEach(function(element, index, array) {
node_ring[element.host ':' element.port] = 100;
});

var ring = new hashring(
node_ring, 'md5', {
'max cache size': config.cacheSize || 10000,
//We don't want duplicate keys sent so replicas set to 0
'replicas': 0
});

// Do an initial rount of health checks prior to starting up the server
doHealthChecks();


// Setup the udp listener
var server = dgram.createSocket(udp_version, function (msg, rinfo) {
// Convert the raw packet to a string (defaults to UTF8 encoding)
var packet_data = msg.toString();
// If the packet contains a \n then it contains multiple metrics
var metrics;
if (packet_data.indexOf("\n") > -1) {
metrics = packet_data.split("\n");
} else {
// metrics needs to be an array to fake it for single metric packets
metrics = [ packet_data ] ;
}

// Loop through the metrics and split on : to get mertric name for hashing
for (var midx in metrics) {
var bits = metrics[midx].toString().split(':');
var key = bits.shift();
packet.emit('send', key, msg);
}
});

// Listen for the send message, and process the metric key and msg
packet.on('send', function(key, msg) {
// retreives the destination for this key
var statsd_host = ring.get(key);

// break the retreived host to pass to the send function
if (statsd_host === undefined) {
l.log('Warning: No backend statsd nodes available!');
} else {
var host_config = statsd_host.split(':');

var client = dgram.createSocket(udp_version);
// Send the mesg to the backend
client.send(msg, 0, msg.length, host_config[1], host_config[0], function(err, bytes) {
client.close();
});
}
});

// Bind the listening udp server to the configured port and host
server.bind(config.port, config.host || undefined);

// Set the interval for healthchecks
setInterval(doHealthChecks, config.checkInterval || 10000);

// Perform health check on all nodes
function doHealthChecks() {
nodes.forEach(function(element, index, array) {
healthcheck(element);
});
}

// Perform health check on node
function healthcheck(node) {
var node_id = node.host ':' node.port;
var client = net.connect({port: node.adminport, host: node.host},
function() {
client.write('health\r\n');
});
client.on('data', function(data) {
var health_status = data.toString();
client.end();
if (health_status.indexOf('up') < 0) {
if (node_status[node_id] === undefined) {
node_status[node_id] = 1;
} else {
node_status[node_id] ;
}
if (node_status[node_id] < 2) {
l.log('Removing node ' node_id ' from the ring.');
ring.remove(node_id);
}
} else {
if (node_status[node_id] !== undefined) {
if (node_status[node_id] > 0) {
var new_server = {};
new_server[node_id] = 100;
l.log('Adding node ' node_id ' to the ring.');
ring.add(new_server);
}
}
node_status[node_id] = 0;
}
});
client.on('error', function(e) {
if (e.code == 'ECONNREFUSED') {
if (node_status[node_id] === undefined) {
node_status[node_id] = 1;
} else {
node_status[node_id] ;
}
if (node_status[node_id] < 2) {
l.log('Removing node ' node_id ' from the ring.');
ring.remove(node_id);
}
} else {
l.log('Error during healthcheck on node ' node_id ' with ' e.code);
}
});
}

});

0 comments on commit 1dd5244

Please sign in to comment.