diff --git a/docs/api/sources.rst b/docs/api/sources.rst index f21015a..b9bca56 100644 --- a/docs/api/sources.rst +++ b/docs/api/sources.rst @@ -8,6 +8,13 @@ tensor.sources.network :members: :show-inheritance: +tensor.sources.nginx +====================== + +.. automodule:: tensor.sources.nginx + :members: + :show-inheritance: + tensor.sources.munin ==================== diff --git a/docs/index.rst b/docs/index.rst index 4c8b255..e323178 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -27,6 +27,7 @@ API Documentation: api/tensor.rst api/protocol.rst + api/logs.rst api/sources.rst api/outputs.rst diff --git a/docs/sources.rst b/docs/sources.rst index 2b737e9..34c38cd 100644 --- a/docs/sources.rst +++ b/docs/sources.rst @@ -73,6 +73,10 @@ Extending the above example to create a simple flip-flop metric event:: You could then place this in a Python module like `hello.py` and as long as it's in the Python path for Tensor it can be used as a source with `hello.HelloWorld` +A list of events can also be returned but be careful of overwhelming the output +buffer, and if you need to produce lots of metrics it may be worthwhile to +return nothing from `get` and call `self.queueBack` as needed. + Handling asynchronous tasks =========================== diff --git a/docs/start.rst b/docs/start.rst index 9ed7e86..83bad8d 100644 --- a/docs/start.rst +++ b/docs/start.rst @@ -10,11 +10,11 @@ Tensor can be installed from PyPi with pip :: This will also install Twisted, protobuf and PyYAML -Or you can use the .deb package :: +Or you can use the .deb package. Let the latest release from https://github.com/calston/tensor/releases/latest :: $ aptitude install python-twisted python-protobuf python-yaml - $ wget https://github.com/calston/tensor/releases/download/0.0.7/tensor_0.0.7_amd64.deb - $ dpkg -i tensor_0.0.7_amd64.deb + $ wget https://github.com/calston/tensor/releases/download/0.2.0/tensor_0.2.0_amd64.deb + $ dpkg -i tensor_0.2.0_amd64.deb This also gives you an init script and default config in /etc/tensor/ diff --git a/scripts/post-install.sh b/scripts/post-install.sh index 4912a5c..8e3aeb2 100755 --- a/scripts/post-install.sh +++ b/scripts/post-install.sh @@ -34,6 +34,12 @@ sources: EOL fi +if [ ! -d /var/lib/tensor ]; +then + mkdir -p /var/lib/tensor + cp /tmp/*.lf /var/lib/tensor/ +fi + update-rc.d tensor defaults service tensor status >/dev/null 2>&1 diff --git a/scripts/tensor b/scripts/tensor index 883b3ea..2167662 100755 --- a/scripts/tensor +++ b/scripts/tensor @@ -14,6 +14,7 @@ PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON=`which twistd` NAME=tensor DESC=tensor +FDHACK=4096 test -x $DAEMON || exit 0 @@ -24,6 +25,8 @@ DAEMON_OPTS="--pidfile=${PIDFILE} --logfile=${LOGDIR}/tensor.log tensor -c /etc/ set -e +ulimit -n $FDHACK + running_pid() { # Check if a given process pid's cmdline matches a given name diff --git a/setup.py b/setup.py index 92283b7..cc93eec 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="tensor", - version='0.1.16', + version='0.2.0', url='http://github.com/calston/tensor', license='MIT', description="A Twisted based monitoring agent for Riemann", @@ -21,6 +21,7 @@ 'PyYaml', 'protobuf', 'construct', + 'pysnmp', ], classifiers=[ 'Development Status :: 4 - Beta', diff --git a/tensor/logs/__init__.py b/tensor/logs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tensor/logs/follower.py b/tensor/logs/follower.py new file mode 100644 index 0000000..ffb973f --- /dev/null +++ b/tensor/logs/follower.py @@ -0,0 +1,87 @@ +import os + +class LogFollower(object): + """Provides a class for following log files between runs + + :param logfile: Full path to logfile + :type logfile: str + :param parser: Optional parser method for log lines + :type parser: str + """ + + def __init__(self, logfile, parser=None, tmp_path="/var/lib/tensor/"): + self.logfile = logfile + self.tmp = os.path.join(tmp_path, + '%s.lf' % self.logfile.lstrip('/').replace('/','-')) + + self.readLast() + + self.parser = parser + + def cleanStore(self): + os.unlink(self.tmp) + + def storeLast(self): + fi = open(self.tmp, 'wt') + fi.write('%s:%s' % (self.lastSize, self.lastInode)) + fi.close() + + def readLast(self): + if os.path.exists(self.tmp): + fi = open(self.tmp, 'rt') + ls, li = fi.read().split(':') + self.lastSize = int(ls) + self.lastInode = int(li) + else: + self.lastSize = 0 + self.lastInode = 0 + + def get_fn(self, fn, max_lines=None): + """Passes each parsed log line to `fn` + This is a better idea than storing a giant log file in memory + """ + stat = os.stat(self.logfile) + + if (stat.st_ino == self.lastInode) and (stat.st_size == self.lastSize): + # Nothing new + return [] + + # Handle rollover and rotations vaguely + if (stat.st_ino != self.lastInode) or (stat.st_size < self.lastSize): + self.lastSize = 0 + + fi = open(self.logfile, 'rt') + fi.seek(self.lastSize) + + self.lastInode = stat.st_ino + + lines = 0 + + for i in fi: + lines += 1 + if max_lines and (lines > max_lines): + self.storeLast() + fi.close() + return + + if '\n' in i: + self.lastSize += len(i) + if self.parser: + line = self.parser(i.strip('\n')) + else: + line = i.strip('\n') + + fn(line) + + self.storeLast() + + fi.close() + + def get(self, max_lines=None): + """Returns a big list of all log lines since the last run + """ + rows = [] + + self.get_fn(lambda row: rows.append(row), max_lines=max_lines) + + return rows diff --git a/tensor/logs/parsers.py b/tensor/logs/parsers.py new file mode 100644 index 0000000..58f28fe --- /dev/null +++ b/tensor/logs/parsers.py @@ -0,0 +1,158 @@ +import re +from datetime import datetime + +class ApacheLogParserError(Exception): + pass + +class ApacheLogParser: + """Parses Apache log format + + Adapted from http://code.google.com/p/apachelog + + :param format: Apache log format definition eg + r'%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"' + or one of 'common', 'vhcommon' or 'combined' + :type format: str + """ + def __init__(self, format): + formats = { + # Common Log Format (CLF) + 'common': r'%h %l %u %t \"%r\" %>s %b', + + # Common Log Format with Virtual Host + 'vhcommon': r'%v %h %l %u %t \"%r\" %>s %b', + + # NCSA extended/combined log format + 'combined': r'%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"', + } + + self._names = [] + self._types = [] + self._regex = None + self._pattern = '' + + self.types = { + '%h': ('client', str), + '%a': ('client-ip', str), + '%b': ('bytes', int), + '%B': ('bytes', int), + '%D': ('request-time', int), + '%f': ('filename', str), + '%l': ('logname', str), + '%u': ('user', str), + '%t': ('time', self._parse_date), + '%r': ('request', str), + '%>s': ('status', int), + '%v': ('vhost', str), + } + + if format in formats: + self._parse_format(formats[format]) + else: + self._parse_format(format) + + def _parse_date(self, date): + date = date.split()[0][1:] + return datetime.strptime(date, "%d/%b/%Y:%H:%M:%S") + + def alias(self, field): + if field in self.types: + return self.types[field][0] + else: + return field + + def _parse_format(self, format): + """ + Converts the input format to a regular + expression, as well as extracting fields + + Raises an exception if it couldn't compile + the generated regex. + """ + format = format.strip() + format = re.sub('[ \t]+',' ',format) + + subpatterns = [] + + findquotes = re.compile(r'^\\"') + findreferreragent = re.compile('Referer|User-Agent') + findpercent = re.compile('^%.*t$') + lstripquotes = re.compile(r'^\\"') + rstripquotes = re.compile(r'\\"$') + header = re.compile(r'.*%\{([^\}]+)\}i') + + for element in format.split(' '): + + hasquotes = 0 + if findquotes.search(element): hasquotes = 1 + + if hasquotes: + element = lstripquotes.sub('', element) + element = rstripquotes.sub('', element) + + head = header.match(element) + if head: + self._names.append(head.groups()[0].lower()) + self._types.append(str) + else: + self._names.append(self.alias(element)) + self._types.append(self.types.get(element, [None, str])[1]) + + subpattern = '(\S*)' + + if hasquotes: + if element == '%r' or findreferreragent.search(element): + subpattern = r'\"([^"\\]*(?:\\.[^"\\]*)*)\"' + else: + subpattern = r'\"([^\"]*)\"' + + elif findpercent.search(element): + subpattern = r'(\[[^\]]+\])' + + elif element == '%U': + subpattern = '(.+?)' + + subpatterns.append(subpattern) + + self._pattern = '^' + ' '.join(subpatterns) + '$' + try: + self._regex = re.compile(self._pattern) + except Exception, e: + raise ApacheLogParserError(e) + + def parse(self, line): + """ + Parses a single line from the log file and returns + a dictionary of it's contents. + + Raises and exception if it couldn't parse the line + """ + line = line.strip() + match = self._regex.match(line) + + if match: + data = {} + for i, e in enumerate(match.groups()): + if e == "-": + k, v = self._names[i], None + else: + k, v = self._names[i], self._types[i](e) + data[k] = v + return data + + raise ApacheLogParserError("Unable to parse: %s" % line) + + def pattern(self): + """ + Returns the compound regular expression the parser extracted + from the input format (a string) + """ + return self._pattern + + def names(self): + """ + Returns the field names the parser extracted from the + input format (a list) + """ + return self._names + diff --git a/tensor/objects.py b/tensor/objects.py index d1e06b0..e8ef17a 100644 --- a/tensor/objects.py +++ b/tensor/objects.py @@ -13,20 +13,17 @@ class Event(object): All sources pass these to the queue, which form a proxy object to create protobuf Event objects - **Arguments:** - - :state: Some sort of string < 255 chars describing the state - :service: The service name for this event - :description: A description for the event, ie. "My house is on fire!" - :metric: int or float metric for this event - - **Keyword arguments:** - - :tags: List of tag strings - :hostname: Hostname for the event (defaults to system fqdn) + :param state: Some sort of string < 255 chars describing the state + :param service: The service name for this event + :param description: A description for the event, ie. "My house is on fire!" + :param metric: int or float metric for this event + :param tags: List of tag strings + :param hostname: Hostname for the event (defaults to system fqdn) + :param aggregation: Aggregation function + :param evtime: Event timestamp override """ def __init__(self, state, service, description, metric, ttl, tags=[], - hostname=None, aggregation=None): + hostname=None, aggregation=None, evtime=None): self.state = state self.service = service self.description = description @@ -34,8 +31,11 @@ def __init__(self, state, service, description, metric, ttl, tags=[], self.ttl = ttl self.tags = tags self.aggregation = aggregation - - self.time = time.time() + + if evtime: + self.time = evtime + else: + self.time = time.time() if hostname: self.hostname = hostname else: @@ -70,19 +70,11 @@ class Output(object): Outputs can inherit this object which provides a construct for a working output - **Arguments:** - - :config: Dictionary config for this queue (usually read from the + :param config: Dictionary config for this queue (usually read from the yaml configuration) - :tensor: A TensorService object for interacting with the queue manager + :param tensor: A TensorService object for interacting with the queue manager """ def __init__(self, config, tensor): - """Consturct a Output object - - Arguments: - config -- Dictionary config for this output - tensor -- A TensorService object for interacting with the queue manager - """ self.config = config self.tensor = tensor @@ -110,23 +102,15 @@ class Source(object): Sources can inherit this object which provides a number of utility methods. - **Arguments:** - - :config: Dictionary config for this queue (usually read from the + :param config: Dictionary config for this queue (usually read from the yaml configuration) - :queueBack: A callback method to recieve a list of Event objects - :tensor: A TensorService object for interacting with the queue manager + :param queueBack: A callback method to recieve a list of Event objects + :param tensor: A TensorService object for interacting with the queue manager """ - def __init__(self, config, queueBack, tensor): - """Consturct a Source object + sync = False - Arguments: - config -- Dictionary config for this source - queueBack -- Callback method for events originating from this source - called on config['interval'] - tensor -- A TensorService object for interacting with the queue manager - """ + def __init__(self, config, queueBack, tensor): self.config = config self.t = task.LoopingCall(self.tick) @@ -142,6 +126,8 @@ def __init__(self, config, queueBack, tensor): self.queueBack = queueBack + self.running = False + def startTimer(self): """Starts the timer for this source""" self.t.start(self.inter) @@ -153,6 +139,12 @@ def tick(self): Returns a deferred""" + if self.sync: + if self.running: + defer.returnValue(None) + + self.running = True + try: event = yield defer.maybeDeferred(self.get) if self.config.get('debug', False): @@ -163,8 +155,10 @@ def tick(self): except Exception, e: log.msg("[%s] Unhandled error: %s" % (self.service, e)) + self.running = False + def createEvent(self, state, description, metric, prefix=None, - hostname=None, aggregation=None): + hostname=None, aggregation=None, evtime=None): """Creates an Event object from the Source configuration""" if prefix: service_name = self.service + "." + prefix @@ -172,7 +166,8 @@ def createEvent(self, state, description, metric, prefix=None, service_name = self.service return Event(state, service_name, description, metric, self.ttl, - hostname = hostname or self.hostname, aggregation=aggregation + hostname=hostname or self.hostname, aggregation=aggregation, + evtime=evtime ) def get(self): diff --git a/tensor/outputs/riemann.py b/tensor/outputs/riemann.py index f46c690..d56f1f4 100644 --- a/tensor/outputs/riemann.py +++ b/tensor/outputs/riemann.py @@ -7,6 +7,21 @@ class RiemannTCP(Output): + """Riemann TCP output + + **Configuration arguments:** + + :param server: Riemann server hostname (default: localhost) + :type server: str. + :param port: Riemann server port (default: 5555) + :type port: int. + :param maxrate: Maximum de-queue rate (0 is no limit) + :type maxrate: int. + :param interval: De-queue interval in seconds (default: 1.0) + :type interval: float. + :param pressure: Maximum backpressure (-1 is no limit) + :type pressure: int. + """ def __init__(self, *a): Output.__init__(self, *a) self.events = [] @@ -90,6 +105,16 @@ def eventsReceived(self, events): self.events.extend(events) class RiemannUDP(Output): + """Riemann UDP output (spray-and-pray mode) + + **Configuration arguments:** + + :param server: Riemann server IP address (default: 127.0.0.1) + :type server: str. + :param port: Riemann server port (default: 5555) + :type port: int. + """ + def __init__(self, *a): Output.__init__(self, *a) self.protocol = None diff --git a/tensor/protocol/sflow/server.py b/tensor/protocol/sflow/server.py index babbdcc..83f6bb3 100644 --- a/tensor/protocol/sflow/server.py +++ b/tensor/protocol/sflow/server.py @@ -9,6 +9,8 @@ from tensor.protocol.sflow.protocol import flows, counters class DatagramReceiver(DatagramProtocol): + """DatagramReceiver for sFlow packets + """ def datagramReceived(self, data, (host, port)): sflow = protocol.Sflow(data, host) diff --git a/tensor/sources/linux/ipsec.py b/tensor/sources/linux/ipsec.py new file mode 100644 index 0000000..751f6e9 --- /dev/null +++ b/tensor/sources/linux/ipsec.py @@ -0,0 +1,66 @@ +from zope.interface import implements + +from twisted.internet import defer + +from tensor.interfaces import ITensorSource +from tensor.objects import Source +from tensor.utils import fork + +class StrongSwan(Source): + """Returns the status of strongSwan IPSec tunnels + + **Metrics:** + + :(service name).(peer name): Tunnel status + """ + implements(ITensorSource) + + @defer.inlineCallbacks + def get(self): + out, err, code = yield fork('/usr/bin/sudo', args=( + 'ipsec', 'statusall')) + + connections = {} + + s = 0 + + for l in out.strip('\n').split('\n'): + if l == "Connections:": + s = 1 + continue + elif l == "Routed Connections:": + s = 2 + elif "Security Associations" in l: + s = 3 + elif l[0] == ' ' and ':' in l: + if s == 1: + con, detail = l.strip().split(': ', 1) + detail = detail.strip() + + if con not in connections: + connections[con] = { + 'source': detail.split('...')[0], + 'destination': detail.split('...')[1].split()[0], + 'up': False + } + elif s == 3: + con, detail = l.strip().split(': ', 1) + detail = detail.strip() + if '[' in con: + con = con.split('[')[0] + else: + con = con.split('{')[0] + + if 'ESTABLISHED' in detail: + connections[con]['up'] = True + + events = [] + for k, v in connections.items(): + if v['up']: + events.append(self.createEvent('ok', 'IPSec tunnel %s up' % k, + 1, prefix=k)) + else: + events.append(self.createEvent('critical', + 'IPSec tunnel %s down' % k, 0, prefix=k)) + + defer.returnValue(events) diff --git a/tensor/sources/media/libav.py b/tensor/sources/media/libav.py index 135588c..f48182b 100644 --- a/tensor/sources/media/libav.py +++ b/tensor/sources/media/libav.py @@ -15,7 +15,7 @@ class DarwinRTSP(Source): **Configuration arguments:** - :destination: Host name or IP address to check + :param destination: Host name or IP address to check :type method: str. **Metrics:** @@ -32,8 +32,12 @@ def get(self): host = self.config.get('destination', self.hostname) t0 = time.time() - out, err, code = yield fork('/usr/bin/avprobe', - args=('rtsp://%s/sample_100kbit.mp4' % host, ), timeout=30.0) + try: + out, err, code = yield fork('/usr/bin/avprobe', + args=('rtsp://%s/sample_100kbit.mp4' % host, ), timeout=30.0) + except: + code = 1 + err = None t_delta = (time.time() - t0) * 1000 @@ -41,7 +45,14 @@ def get(self): e = self.createEvent('ok', 'RTSP Request time to %s' % host, t_delta) else: - error = err.strip('\n').split('\n')[-2] + if err: + try: + error = err.strip('\n').split('\n')[-2] + except: + error = err.replace('\n', '-') + else: + error = "Execution error" + e = self.createEvent('critical', 'Unable to stream %s:%s' % (host, error), t_delta) diff --git a/tensor/sources/munin.py b/tensor/sources/munin.py index 60901f5..683018d 100644 --- a/tensor/sources/munin.py +++ b/tensor/sources/munin.py @@ -61,15 +61,15 @@ class MuninNode(Source): **Configuration arguments:** - :host: munin-node hostname (probably localhost) + :param host: munin-node hostname (probably localhost) :type host: str. - :port: munin-node port (probably 4949) + :param port: munin-node port (probably 4949) :type port: int. **Metrics:** - :(service name).(plugin name).(keys...): A dot separated tree of munin - plugin keys + :(service name).(plugin name).(keys...): A dot separated tree of + munin plugin keys """ implements(ITensorSource) diff --git a/tensor/sources/network.py b/tensor/sources/network.py index 62a6f61..7194bcb 100644 --- a/tensor/sources/network.py +++ b/tensor/sources/network.py @@ -24,11 +24,11 @@ class HTTP(Source): **Configuration arguments:** - :method: HTTP request method to use + :param method: HTTP request method to use :type method: str. - :match: A text string to match in the document when it is correct + :param match: A text string to match in the document when it is correct :type match: str. - :useragent: User-Agent header to use + :param useragent: User-Agent header to use :type useragent: str. **Metrics:** @@ -80,10 +80,12 @@ def get(self): class Ping(Source): """Performs an Ping checks against a destination + This is a horrible implementation which forks to `ping` + **Configuration arguments:** - :destination: Host name or IP address to ping - :type method: str. + :param destination: Host name or IP address to ping + :type destination: str. **Metrics:** @@ -100,8 +102,12 @@ class Ping(Source): def get(self): host = self.config.get('destination', self.hostname) - out, err, code = yield fork('/bin/ping', - args=('-q', '-n', '-c', '5', '-i', '0.2', host), timeout=30.0) + + try: + out, err, code = yield fork('/bin/ping', + args=('-q', '-n', '-c', '5', '-i', '0.2', host), timeout=30.0) + except: + code = 1 if code == 0: # Successful ping diff --git a/tensor/sources/nginx.py b/tensor/sources/nginx.py new file mode 100644 index 0000000..98e9304 --- /dev/null +++ b/tensor/sources/nginx.py @@ -0,0 +1,217 @@ +""" +.. module:: nginx + :platform: Unix + :synopsis: A source module for nginx stats + +.. moduleauthor:: Colin Alston +""" + +import time + +from twisted.internet import defer, reactor +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +from zope.interface import implements + +from tensor.interfaces import ITensorSource +from tensor.objects import Source + +from tensor.utils import BodyReceiver, fork +from tensor.aggregators import Counter64 +from tensor.logs import parsers, follower + +class Nginx(Source): + """Reads Nginx stub_status + + **Configuration arguments:** + + :param stats_url: URL to fetch stub_status from + :type stats_url: str. + + **Metrics:** + + :(service name).active: Active connections at this time + :(service name).accepts: Accepted connections + :(service name).handled: Handled connections + :(service name).requests: Total client requests + :(service name).reading: Reading requests + :(service name).writing: Writing responses + :(service name).waiting: Waiting connections + """ + + implements(ITensorSource) + + def _parse_nginx_stats(self, stats): + stats = stats.split('\n') + active = stats[0].split(': ')[-1] + + accepts, handled, requests = stats[2].split() + + _, reading, _, writing, _, waiting = stats[3].split() + + metrics = { + 'active': (float(active), None), + 'accepts': (float(accepts), Counter64), + 'requests': (float(requests), Counter64), + 'handled': (float(handled), Counter64), + 'reading': (float(reading), None), + 'writing': (float(writing), None), + 'waiting': (float(waiting), None), + } + + return metrics + + @defer.inlineCallbacks + def get(self): + agent = Agent(reactor) + + url = self.config.get('url', self.config.get('stats_url')) + + t0 = time.time() + + request = yield agent.request('GET', url, + Headers({'User-Agent': ['Tensor']}), + ) + + events = [] + + if request.length: + d = defer.Deferred() + request.deliverBody(BodyReceiver(d)) + b = yield d + body = b.read() + + metrics = self._parse_nginx_stats(body) + + for k,v in metrics.items(): + metric, aggr = v + events.append( + self.createEvent('ok', 'Nginx %s' % (k), metric, prefix=k, + aggregation=aggr) + ) + + defer.returnValue(events) + +class NginxLogMetrics(Source): + """Tails Nginx log files, parses them and returns metrics for data usage + and requests against other fields. + + **Configuration arguments:** + + :param log_format: Log format passed to parser, same as the config + definition + :type log_format: str. + :param file: Log file + :type file: str. + :param max_lines: Maximum number of log lines to read per interval to + prevent overwhelming Tensor when reading large logs + (default 2000) + :type max_lines: int. + :param resolution: Aggregate bucket resolution in seconds (default 10) + :type resolution: int. + + **Metrics:** + + :(service name).total_bytes: Bytes total for all requests + :(service name).total_requests: Total request count + :(service name).stats.(code).(requests|bytes): Metrics by status code + :(service name).user-agent.(agent).(requests|bytes): Metrics by user agent + :(service name).client.(ip).(requests|bytes): Metrics by client IP + :(service name).request.(request path).(requests|bytes): Metrics by request path + """ + + implements(ITensorSource) + + # Don't allow overlapping runs + sync = True + + def __init__(self, *a): + Source.__init__(self, *a) + + parser = parsers.ApacheLogParser(self.config.get('log_format', 'combined')) + + self.log = follower.LogFollower(self.config['file'], parser=parser.parse) + + self.max_lines = int(self.config.get('max_lines', 2000)) + self.bucket_res = int(self.config.get('resolution', 10)) + + self.bucket = 0 + + def _aggregate_fields(self, row, b, field, fil=None): + f = row.get(field, None) + + if f: + if fil: + f = fil(f) + if not (field in self.st): + self.st[field] = {} + + if not (f in self.st[field]): + self.st[field][f] = [b, 1] + + else: + self.st[field][f][0] += b + self.st[field][f][1] += 1 + + def dumpEvents(self, ts): + if self.st: + events = [ + self.createEvent('ok', 'Nginx bytes', self.bytes, prefix='total_bytes', + evtime=ts), + self.createEvent('ok', 'Nginx requests', self.requests, + prefix='total_requests', evtime=ts) + ] + + for field, block in self.st.items(): + for key, vals in block.items(): + bytes, requests = vals + events.extend([ + self.createEvent('ok', 'Nginx %s %s bytes' % (field, key), bytes, + prefix='%s.%s.bytes' % (field, key), evtime=ts), + self.createEvent('ok', 'Nginx %s %s requests' % (field, key), requests, + prefix='%s.%s.requests' % (field, key), evtime=ts) + ]) + + self.st = {} + self.bytes = 0 + self.requests = 0 + + self.queueBack(events) + + def got_line(self, line): + b = line.get('bytes', 0) + if b: + self.bytes += b + + self.requests += 1 + + t = time.mktime(line['time'].timetuple()) + + # Calculate the time bucket for this line + bucket = (int(t)/self.bucket_res)*self.bucket_res + + if self.bucket: + if (bucket != self.bucket): + self.dumpEvents(float(self.bucket)) + self.bucket = bucket + else: + self.bucket = bucket + + self._aggregate_fields(line, b, 'status') + self._aggregate_fields(line, b, 'client') + self._aggregate_fields(line, b, 'user-agent', + fil=lambda l: l.replace('.',',') + ) + self._aggregate_fields(line, b, 'request', + fil=lambda l: l.split()[1].split('?')[0].replace('.',',') + ) + + def get(self): + self.bytes = 0 + self.requests = 0 + self.st = {} + + self.log.get_fn(self.got_line, max_lines=self.max_lines) + + self.dumpEvents(float(self.bucket)) diff --git a/tensor/sources/rabbitmq.py b/tensor/sources/rabbitmq.py index 68eeaa5..e6d6018 100644 --- a/tensor/sources/rabbitmq.py +++ b/tensor/sources/rabbitmq.py @@ -14,7 +14,7 @@ class Queues(Source): **Configuration arguments:** - :vhost: Vhost name + :param vhost: Vhost name :type vhost: str. **Metrics:** diff --git a/tensor/sources/riak.py b/tensor/sources/riak.py index 47e7d66..ab788fd 100644 --- a/tensor/sources/riak.py +++ b/tensor/sources/riak.py @@ -25,9 +25,9 @@ class RiakStats(Source): **Configuration arguments:** - :url: Riak stats URL + :param url: Riak stats URL :type url: str. - :useragent: User-Agent header to use + :param useragent: User-Agent header to use :type useragent: str. **Metrics:** diff --git a/tensor/sources/sflow.py b/tensor/sources/sflow.py index 53a1bef..b5ddab9 100644 --- a/tensor/sources/sflow.py +++ b/tensor/sources/sflow.py @@ -175,9 +175,9 @@ class sFlow(Source): **Configuration arguments:** - :port: UDP port to listen on + :param port: UDP port to listen on :type port: int. - :dnslookup: Enable reverse DNS lookup for device IPs (default: True) + :param dnslookup: Enable reverse DNS lookup for device IPs (default: True) :type dnslookup: bool. **Metrics:** diff --git a/tensor/sources/snmp.py b/tensor/sources/snmp.py index b2035d7..943dd05 100644 --- a/tensor/sources/snmp.py +++ b/tensor/sources/snmp.py @@ -1,3 +1,11 @@ +""" +.. module:: snmp + :platform: Unix + :synopsis: A source module for polling SNMP. Requires PySNMP4 + +.. moduleauthor:: Colin Alston +""" + import time from twisted.internet import reactor, defer @@ -17,6 +25,15 @@ class SNMPConnection(object): """A wrapper class for PySNMP functions + + :param host: SNMP agent host + :type host: str. + :param port: SNMP port + :type port: int. + :param community: SNMP read community + :type community: str. + + (This is not a source and you shouldn't try to use it as one) """ def __init__(self, host, port, community): @@ -83,11 +100,11 @@ class SNMP(Source): **Configuration arguments:** - :ip: SNMP agent host (default: 127.0.0.1) + :param ip: SNMP agent host (default: 127.0.0.1) :type ip: str. - :port: SNMP port (default: 161) + :param port: SNMP port (default: 161) :type port: int. - :community: SNMP read community + :param community: SNMP read community :type community: str. """ @@ -165,11 +182,11 @@ class SNMPCisco837(SNMP): **Configuration arguments:** - :ip: SNMP agent host (default: 127.0.0.1) + :param ip: SNMP agent host (default: 127.0.0.1) :type ip: str. - :port: SNMP port (default: 161) + :param port: SNMP port (default: 161) :type port: int. - :community: SNMP read community + :param community: SNMP read community :type community: str. """ diff --git a/tensor/tests/test_logs.py b/tensor/tests/test_logs.py new file mode 100644 index 0000000..808ff39 --- /dev/null +++ b/tensor/tests/test_logs.py @@ -0,0 +1,88 @@ +from twisted.trial import unittest + +from tensor.logs import follower, parsers + +import datetime +import os + + +class TestLogs(unittest.TestCase): + + def test_logfollow(self): + try: + os.unlink('test.log.lf') + os.unlink('test.log') + except: + pass + + log = open('test.log', 'wt') + log.write('foo\nbar\n') + log.flush() + + f = follower.LogFollower('test.log', tmp_path=".") + + r = f.get() + + log.write('test') + log.flush() + + r2 = f.get() + + log.write('ing\n') + log.flush() + + r3 = f.get() + + self.assertEqual(r[0], 'foo') + self.assertEqual(r[1], 'bar') + + self.assertEqual(r2, []) + self.assertEqual(r3[0], 'testing') + + log.close() + + # Move inode + os.rename('test.log', 'testold.log') + + log = open('test.log', 'wt') + log.write('foo2\nbar2\n') + log.close() + + r = f.get() + + self.assertEqual(r[0], 'foo2') + self.assertEqual(r[1], 'bar2') + + # Go backwards + log = open('test.log', 'wt') + log.write('foo3\n') + log.close() + + r = f.get() + + self.assertEqual(r[0], 'foo3') + + os.unlink('test.log') + os.unlink('testold.log') + + def test_apache_parser(self): + log = parsers.ApacheLogParser('combined') + + line = '192.168.0.102 - - [16/Jan/2015:11:11:45 +0200] "GET / HTTP/1.1" 200 709 "-" "My browser"' + + want = { + 'status': 200, + 'request': 'GET / HTTP/1.1', + 'bytes': 709, + 'user-agent': 'My browser', + 'client': '192.168.0.102', + 'time': datetime.datetime(2015, 1, 16, 11, 11, 45), + 'referer': None, + 'logname': None, + 'user': None, + } + + p = log.parse(line) + + for k,v in want.items(): + self.assertEquals(p[k], v) diff --git a/tensor/tests/test_sources.py b/tensor/tests/test_sources.py index 6ae00d7..9a0d82b 100644 --- a/tensor/tests/test_sources.py +++ b/tensor/tests/test_sources.py @@ -7,7 +7,7 @@ from tensor.sources.linux import basic, process from tensor.sources import riak - +from tensor.sources import nginx class TestLinuxSources(unittest.TestCase): def skip_if_no_hostname(self): @@ -134,6 +134,81 @@ def test_network_stats(self): self.assertEquals(ev[4].metric, 1154168) self.assertEquals(ev[5].metric, 0) + def test_nginx_parse(self): + src = nginx.Nginx({ + 'interval': 1.0, + 'service': 'nginx', + 'ttl': 60, + 'hostname': 'localhost', + 'stats_url': 'http://localhost/nginx_stats' + }, self._qb, None) + + ngstats = """Active connections: 3 +server accepts handled requests + 20649 20649 686969 +Reading: 0 Writing: 1 Waiting: 2\n""" + + metrics = src._parse_nginx_stats(ngstats) + + self.assertEquals(metrics['handled'][0], 20649) + + def test_nginx_log(self): + try: + os.unlink('foo.log.lf') + os.unlink('foo.log') + except: + pass + + events = [] + + def qb(ev): + events.append(ev) + + f = open('foo.log', 'wt') + f.write('192.168.0.1 - - [16/Jan/2015:16:31:29 +0200] "GET /foo HTTP/1.1" 200 210 "-" "My Browser"\n') + f.write('192.168.0.1 - - [16/Jan/2015:16:51:29 +0200] "GET /foo HTTP/1.1" 200 410 "-" "My Browser"\n') + f.flush() + + src = nginx.NginxLogMetrics({ + 'interval': 1.0, + 'service': 'nginx', + 'ttl': 60, + 'hostname': 'localhost', + 'log_format': 'combined', + 'file': 'foo.log' + }, qb, None) + + src.log.tmp = 'foo.log.lf' + + src.get() + + ev1 = events[0] + ev2 = events[1] + + for i in ev1: + if i.service=='nginx.client.192.168.0.1.bytes': + self.assertEquals(i.metric, 210) + + for i in ev2: + if i.service=='nginx.client.192.168.0.1.bytes': + self.assertEquals(i.metric, 410) + + events = [] + + f.write('192.168.0.1 - - [16/Jan/2015:17:10:31 +0200] "GET /foo HTTP/1.1" 200 410 "-" "My Browser"\n') + f.write('192.168.0.1 - - [16/Jan/2015:17:10:34 +0200] "GET /bar HTTP/1.1" 200 410 "-" "My Browser"\n') + f.close() + + src.get() + + for i in events[0]: + if i.service=='nginx.client.192.168.0.1.requests': + self.assertEquals(i.metric, 2) + if i.service=='nginx.user-agent.My Browser.bytes': + self.assertEquals(i.metric, 820) + + if i.service=='nginx.request./foo.bytes': + self.assertEquals(i.metric, 410) class TestRiakSources(unittest.TestCase): def _qb(self, result): diff --git a/tensor/tests/test_tensor.py b/tensor/tests/test_tensor.py index cdbee31..9ab8fcb 100644 --- a/tensor/tests/test_tensor.py +++ b/tensor/tests/test_tensor.py @@ -1,6 +1,6 @@ from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, error from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol from tensor.protocol import riemann @@ -60,7 +60,7 @@ def test_utils_fork_timeout(self): died = False try: o, e, c = yield fork('sleep', args=('2',), timeout=0.1) - except: + except error.ProcessTerminated: died = True self.assertTrue(died) diff --git a/tensor/utils.py b/tensor/utils.py index 4c273fa..7b7ea1e 100644 --- a/tensor/utils.py +++ b/tensor/utils.py @@ -8,6 +8,9 @@ class Resolver(object): + """Helper class for DNS resolution + """ + def __init__(self): self.recs = {} @@ -81,8 +84,11 @@ def processEnded(self, reason): def connectionMade(self): @defer.inlineCallbacks def killIfAlive(): - log.msg('Killing source proccess: Timeout %s exceeded' % self.timeout) - yield self.transport.signalProcess('KILL') + try: + yield self.transport.signalProcess('KILL') + log.msg('Killed source proccess: Timeout %s exceeded' % self.timeout) + except error.ProcessExitedAlready: + pass self.timer = reactor.callLater(self.timeout, killIfAlive) @@ -90,18 +96,13 @@ def fork(executable, args=(), env={}, path=None, timeout=3600): """fork Provides a deferred wrapper function with a timeout function - **Arguments:** - - :executable: Executable + :param executable: Executable :type executable: str. - - **Keyword arguments:** - - :args: Tupple of arguments + :param args: Tupple of arguments :type args: tupple. - :env: Environment dictionary + :param env: Environment dictionary :type env: dict. - :timeout: Kill the child process if timeout is exceeded + :param timeout: Kill the child process if timeout is exceeded :type timeout: int. """ d = defer.Deferred()