forked from statsd/statsd
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request statsd#284 from etsy/cluster_proxy
Merging in the Statsd Cluster Proxy
- Loading branch information
Showing
5 changed files
with
205 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,33 @@ | ||
Statsd Cluster Proxy | ||
============== | ||
|
||
Statsd Cluster Proxy is a udp proxy that sits infront of multiple statsd instances. | ||
|
||
|
||
Create a proxyConfig.js file | ||
cp exampleProxyConfig.js proxyConfig.js | ||
|
||
Once you have a config file run: | ||
node proxy.js proxyConfig.js | ||
|
||
|
||
It uses a consistent hashring to send the unique metric names to the same statsd instances so that | ||
the aggregation works properly. | ||
|
||
It handles a simple health check that dynamically recalculates the hashring if a statsd instance goes offline. | ||
|
||
Config Options | ||
------ | ||
|
||
nodes the array of node objects | ||
host the ip of the statsd instance | ||
port port of the statsd receiver | ||
adminport port of the admin interface for this instance | ||
|
||
udp_version the string 'udp4' or udp6' | ||
|
||
host the host ip to listen on | ||
port the port the proxy listens on | ||
|
||
checkInterval the interval between healthchecks | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,30 @@ | ||
/* | ||
Required Variables: | ||
port: StatsD Cluster Proxy listening port [default: 8125] | ||
nodes: list of StatsD instances | ||
host: address of an instance of StatsD | ||
port: port that this instance is listening on | ||
adminport: port that this instance is listening on for the admininterface | ||
Optional Variables: | ||
udp_version: defines if the address is an IPv4 or IPv6 address ['udp4' or 'udp6', default: 'udp4'] | ||
host: address to listen on over UDP [default: 0.0.0.0] | ||
checkInterval: health status check interval [default: 10000] | ||
cacheSize: size of the cache to store for hashring key lookups [default: 10000] | ||
*/ | ||
{ | ||
nodes: [ | ||
{host: '127.0.0.1', port: 8129, adminport: 8126}, | ||
{host: '127.0.0.1', port: 8127, adminport: 8128}, | ||
{host: '127.0.0.1', port: 8129, adminport: 8130} | ||
], | ||
udp_version: 'udp4', | ||
host: '0.0.0.0', | ||
port: '8125', | ||
checkInterval: 1000, | ||
cacheSize: 10000 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 1,139 @@ | ||
var dgram = require('dgram') | ||
, net = require('net') | ||
, events = require('events') | ||
, logger = require('./lib/logger') | ||
, hashring = require('hashring') | ||
, configlib = require('./lib/config'); | ||
|
||
var packet = new events.EventEmitter(); | ||
var node_status = []; | ||
var node_ring = {}; | ||
var config; | ||
var l; // logger | ||
|
||
configlib.configFile(process.argv[2], function (conf, oldConfig) { | ||
config = conf; | ||
var udp_version = config.udp_version | ||
, nodes = config.nodes; | ||
l = new logger.Logger(config.log || {}); | ||
|
||
//load the node_ring object with the available nodes and a weight of 100 | ||
// weight is currently arbitrary but the same for all | ||
nodes.forEach(function(element, index, array) { | ||
node_ring[element.host ':' element.port] = 100; | ||
}); | ||
|
||
var ring = new hashring( | ||
node_ring, 'md5', { | ||
'max cache size': config.cacheSize || 10000, | ||
//We don't want duplicate keys sent so replicas set to 0 | ||
'replicas': 0 | ||
}); | ||
|
||
// Do an initial rount of health checks prior to starting up the server | ||
doHealthChecks(); | ||
|
||
|
||
// Setup the udp listener | ||
var server = dgram.createSocket(udp_version, function (msg, rinfo) { | ||
// Convert the raw packet to a string (defaults to UTF8 encoding) | ||
var packet_data = msg.toString(); | ||
// If the packet contains a \n then it contains multiple metrics | ||
var metrics; | ||
if (packet_data.indexOf("\n") > -1) { | ||
metrics = packet_data.split("\n"); | ||
} else { | ||
// metrics needs to be an array to fake it for single metric packets | ||
metrics = [ packet_data ] ; | ||
} | ||
|
||
// Loop through the metrics and split on : to get mertric name for hashing | ||
for (var midx in metrics) { | ||
var bits = metrics[midx].toString().split(':'); | ||
var key = bits.shift(); | ||
packet.emit('send', key, msg); | ||
} | ||
}); | ||
|
||
// Listen for the send message, and process the metric key and msg | ||
packet.on('send', function(key, msg) { | ||
// retreives the destination for this key | ||
var statsd_host = ring.get(key); | ||
|
||
// break the retreived host to pass to the send function | ||
if (statsd_host === undefined) { | ||
l.log('Warning: No backend statsd nodes available!'); | ||
} else { | ||
var host_config = statsd_host.split(':'); | ||
|
||
var client = dgram.createSocket(udp_version); | ||
// Send the mesg to the backend | ||
client.send(msg, 0, msg.length, host_config[1], host_config[0], function(err, bytes) { | ||
client.close(); | ||
}); | ||
} | ||
}); | ||
|
||
// Bind the listening udp server to the configured port and host | ||
server.bind(config.port, config.host || undefined); | ||
|
||
// Set the interval for healthchecks | ||
setInterval(doHealthChecks, config.checkInterval || 10000); | ||
|
||
// Perform health check on all nodes | ||
function doHealthChecks() { | ||
nodes.forEach(function(element, index, array) { | ||
healthcheck(element); | ||
}); | ||
} | ||
|
||
// Perform health check on node | ||
function healthcheck(node) { | ||
var node_id = node.host ':' node.port; | ||
var client = net.connect({port: node.adminport, host: node.host}, | ||
function() { | ||
client.write('health\r\n'); | ||
}); | ||
client.on('data', function(data) { | ||
var health_status = data.toString(); | ||
client.end(); | ||
if (health_status.indexOf('up') < 0) { | ||
if (node_status[node_id] === undefined) { | ||
node_status[node_id] = 1; | ||
} else { | ||
node_status[node_id] ; | ||
} | ||
if (node_status[node_id] < 2) { | ||
l.log('Removing node ' node_id ' from the ring.'); | ||
ring.remove(node_id); | ||
} | ||
} else { | ||
if (node_status[node_id] !== undefined) { | ||
if (node_status[node_id] > 0) { | ||
var new_server = {}; | ||
new_server[node_id] = 100; | ||
l.log('Adding node ' node_id ' to the ring.'); | ||
ring.add(new_server); | ||
} | ||
} | ||
node_status[node_id] = 0; | ||
} | ||
}); | ||
client.on('error', function(e) { | ||
if (e.code == 'ECONNREFUSED') { | ||
if (node_status[node_id] === undefined) { | ||
node_status[node_id] = 1; | ||
} else { | ||
node_status[node_id] ; | ||
} | ||
if (node_status[node_id] < 2) { | ||
l.log('Removing node ' node_id ' from the ring.'); | ||
ring.remove(node_id); | ||
} | ||
} else { | ||
l.log('Error during healthcheck on node ' node_id ' with ' e.code); | ||
} | ||
}); | ||
} | ||
|
||
}); |