Skip to content

Commit

Permalink
Adding files
Browse files Browse the repository at this point in the history
  • Loading branch information
drzymalanet committed Apr 27, 2017
1 parent 67bd282 commit a181390
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 0 deletions.
5 changes: 5 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 1,5 @@
# -*- coding: utf-8 -*-

from histograph.histograph import Histograph


68 changes: 68 additions & 0 deletions histograph.py
Original file line number Diff line number Diff line change
@@ -0,0 1,68 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import re
import sys
import math
import datetime


class Histograph():

def __init__(self):
self.db = {}
self.first = None
self.last = None

def add_marker(self, marker):
self.db[marker] = self.db.get(marker, 0) 1
self.first = marker if self.first is None else min(self.first, marker)
self.last = marker if self.last is None else max(self.last, marker)

def get_timespan(self):
return self.first, self.last

def get_histogram(self, resolution=10, first=None, last=None):
if resolution < 1 or type(resolution) is not int:
# In such case cannot create histogram
raise Exception('Invalid resolution')
first = first if first is not None else self.first
last = last if last is not None else self.last
if first is None or last is None:
# No markers, histogram will be empty.
return [0 for x in range(resolution)]
timespan = abs(last - first)
timestep = timespan / resolution
result = []
if timespan == 0:
# If the time range is a sinlge point, redistribute the value
total = self.db[first] if first in self.db.keys() else 0
for index in range(resolution):
chunk = math.ceil(total / (resolution - index))
total -= chunk
result.append(chunk)
return result
else:
# For every point in the graph
for index in range(resolution):
# Get all of the values from the corresponding time range
begin = first (index * timestep)
# This is to overcome the "near zero" problem in floats.
end = last - ((resolution - (index 1)) * timestep)
if index == 0:
# This is the first data point - Include the left-most and right-most marker.
values = [self.db[marker] for marker in self.db.keys() if marker >= begin and marker <= end]
else:
# Do not include the left-most marker. It was already included in the last data point.
values = [self.db[marker] for marker in self.db.keys() if marker > begin and marker <= end]
result.append(sum(values))
return result

def make_marker(self, year, month, day, hour, minute, second):
microsecond = 0
timezone = datetime.timezone(datetime.timedelta())
return int(datetime.datetime(year, month, day, hour, minute, second, microsecond, timezone).timestamp())




3 changes: 3 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
@@ -0,0 1,3 @@
#!/usr/bin/env sh
python3 -m unittest

3 changes: 3 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 1,3 @@
# -*- coding: utf-8 -*-


200 changes: 200 additions & 0 deletions tests/test_histograph.py
Original file line number Diff line number Diff line change
@@ -0,0 1,200 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import histograph
import unittest


# To debug any specific case, add this the line above:
# import pdb; pdb.set_trace()



class HistographTestCase(unittest.TestCase):

def setUp(self):
self.histograph = histograph.Histograph()
self.hist = self.histograph.get_histogram



class MakeMarkerTest(HistographTestCase):

def test_returns_integer(self):
marker = self.histograph.make_marker(2000,1,2,3,4,5)
self.assertEqual(int, type(marker))

def test_returns_valid_unix_timestamp(self):
unix_time = self.histograph.make_marker
self.assertEqual(unix_time(1970,1,1,0,0,1), 1)
self.assertEqual(unix_time(2020,5,4,3,2,1), 1588561321)



class InvalidParametersTest(HistographTestCase):

def test_raises_error_on_invalid_resolution(self):
self.assertRaises(Exception, self.hist, resolution=5.5)
self.assertRaises(Exception, self.hist, resolution=0)
self.assertRaises(Exception, self.hist, resolution=-1)
self.assertRaises(Exception, self.hist, resolution=-77)



class EmptyHistograph(HistographTestCase):

def test_histogram_is_all_zeros(self):
self.assertEqual(self.hist(1), [0])
self.assertEqual(self.hist(2), [0,0])
self.assertEqual(self.hist(3), [0,0,0])
self.assertEqual(self.hist(4), [0,0,0,0])
self.assertEqual(self.hist(5), [0,0,0,0,0])
self.assertEqual(self.hist(5, first=50), [0,0,0,0,0])
self.assertEqual(self.hist(5, last=100), [0,0,0,0,0])
self.assertEqual(self.hist(5, first=50, last=100), [0,0,0,0,0])
self.assertEqual(self.hist(5, first=100, last=50), [0,0,0,0,0])
self.assertEqual(self.hist(5, first=-50, last=-100), [0,0,0,0,0])
self.assertEqual(self.hist(5, first=-100, last=-50), [0,0,0,0,0])



class SingleValueHistograph(HistographTestCase):

def setUp(self):
super(SingleValueHistograph, self).setUp()
self.histograph.add_marker(0)

def test_histogram_is_correct(self):
self.assertEqual(self.hist(1), [1])
self.assertEqual(self.hist(2), [1,0])
self.assertEqual(self.hist(3), [1,0,0])
self.assertEqual(self.hist(4), [1,0,0,0])
self.assertEqual(self.hist(5), [1,0,0,0,0])

def test_obeys_first_bounding_parameter(self):
self.assertEqual(self.hist(resolution=1, first=-1), [1])
self.assertEqual(self.hist(resolution=2, first=-1), [0,1])
self.assertEqual(self.hist(resolution=3, first=-1), [0,0,1])
self.assertEqual(self.hist(resolution=4, first=-1), [0,0,0,1])
self.assertEqual(self.hist(resolution=5, first=-1), [0,0,0,0,1])

def test_obeys_last_bounding_parameter(self):
self.assertEqual(self.hist(resolution=1, last=1), [1])
self.assertEqual(self.hist(resolution=2, last=1), [1,0])
self.assertEqual(self.hist(resolution=3, last=1), [1,0,0])
self.assertEqual(self.hist(resolution=4, last=1), [1,0,0,0])
self.assertEqual(self.hist(resolution=5, last=1), [1,0,0,0,0])

def test_obeys_both_bounding_parameters(self):
self.assertEqual(self.hist(resolution=1, first=-1, last=1), [1])
self.assertEqual(self.hist(resolution=2, first=-1, last=1), [1,0])
self.assertEqual(self.hist(resolution=3, first=-1, last=1), [0,1,0])
self.assertEqual(self.hist(resolution=4, first=-1, last=1), [0,1,0,0])
self.assertEqual(self.hist(resolution=5, first=-1, last=1), [0,0,1,0,0])
self.assertEqual(self.hist(resolution=9, first=-1, last=1), [0,0,0,0,1,0,0,0,0])
self.assertEqual(self.hist(resolution=10, first=-1, last=1), [0,0,0,0,1,0,0,0,0,0])



class FlatHistogramCheck(HistographTestCase):

def setUp(self):
super(FlatHistogramCheck, self).setUp()
# Add 10 points evenly distributed
self.histograph.add_marker(0)
self.histograph.add_marker(1)
self.histograph.add_marker(2)
self.histograph.add_marker(3)
self.histograph.add_marker(4)
self.histograph.add_marker(5)
self.histograph.add_marker(6)
self.histograph.add_marker(7)
self.histograph.add_marker(8)
self.histograph.add_marker(9)

def test_histogram_is_correct(self):
self.assertEqual(self.hist(1), [10])
self.assertEqual(self.hist(2), [5,5])
self.assertEqual(self.hist(3), [4,3,3])
self.assertEqual(self.hist(4), [3,2,2,3])
self.assertEqual(self.hist(5), [2,2,2,2,2])
self.assertEqual(self.hist(6), [2,2,1,2,1,2])
self.assertEqual(self.hist(7), [2,1,1,2,1,1,2])
self.assertEqual(self.hist(8), [2,1,1,1,1,1,1,2])
self.assertEqual(self.hist(9), [2,1,1,1,1,1,1,1,1])
self.assertEqual(self.hist(10), [1,1,1,1,1,1,1,1,1,1])
self.assertEqual(self.hist(11), [1,1,1,1,1,0,1,1,1,1,1])
self.assertEqual(self.hist(12), [1,1,1,1,0,1,1,1,0,1,1,1])
self.assertEqual(self.hist(13), [1,1,1,0,1,1,0,1,1,0,1,1,1])
self.assertEqual(self.hist(14), [1,1,0,1,1,0,1,1,0,1,1,0,1,1])
self.assertEqual(self.hist(15), [1,1,0,1,1,0,1,0,1,1,0,1,0,1,1])
self.assertEqual(self.hist(16), [1,1,0,1,0,1,0,1,1,0,1,0,1,0,1,1])
self.assertEqual(self.hist(17), [1,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,1])
self.assertEqual(self.hist(18), [1,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1])
self.assertEqual(self.hist(19), [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1])
self.assertEqual(self.hist(20), [1,0,1,0,1,0,1,0,1,0,0,1,0,1,0,1,0,1,0,1])
self.assertEqual(self.hist(21), [1,0,1,0,1,0,1,0,0,1,0,1,0,1,0,0,1,0,1,0,1])
self.assertEqual(self.hist(22), [1,0,1,0,1,0,0,1,0,1,0,0,1,0,1,0,0,1,0,1,0,1])
self.assertEqual(self.hist(23), [1,0,1,0,0,1,0,1,0,0,1,0,1,0,0,1,0,1,0,0,1,0,1])
self.assertEqual(self.hist(24), [1,0,1,0,0,1,0,1,0,0,1,0,0,1,0,1,0,0,1,0,0,1,0,1])
self.assertEqual(self.hist(25), [1,0,1,0,0,1,0,0,1,0,0,1,0,1,0,0,1,0,0,1,0,0,1,0,1])
self.assertEqual(self.hist(26), [1,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,1])
self.assertEqual(self.hist(27), [1,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1])
self.assertEqual(self.hist(28), [1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1])
self.assertEqual(self.hist(29), [1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1])
self.assertEqual(self.hist(30), [1,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,1,0,0,1])

def test_obeys_first_bounding_parameter(self):
self.assertEqual(self.hist(resolution=1, first=5), [5])
self.assertEqual(self.hist(resolution=2, first=5), [3,2])
self.assertEqual(self.hist(resolution=3, first=5), [2,1,2])
self.assertEqual(self.hist(resolution=4, first=5), [2,1,1,1])
self.assertEqual(self.hist(resolution=5, first=5), [1,1,1,1,1])

def test_obeys_last_bounding_parameter(self):
self.assertEqual(self.hist(resolution=1, last=4), [5])
self.assertEqual(self.hist(resolution=2, last=4), [3,2])
self.assertEqual(self.hist(resolution=3, last=4), [2,1,2])
self.assertEqual(self.hist(resolution=4, last=4), [2,1,1,1])
self.assertEqual(self.hist(resolution=5, last=4), [1,1,1,1,1])

def test_obeys_both_bounding_parameters(self):
self.assertEqual(self.hist(resolution=1, first=3, last=6), [4])
self.assertEqual(self.hist(resolution=2, first=3, last=6), [2,2])
self.assertEqual(self.hist(resolution=3, first=3, last=6), [2,1,1])
self.assertEqual(self.hist(resolution=4, first=3, last=6), [1,1,1,1])
self.assertEqual(self.hist(resolution=5, first=3, last=6), [1,1,0,1,1])



class GiantPeakHistogramCheck(HistographTestCase):

def setUp(self):
super(GiantPeakHistogramCheck, self).setUp()
# Add one giant peak
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)
self.histograph.add_marker(5)

def test_still_flat_if_no_bounds_given(self):
self.assertEqual(self.hist(resolution=1), [10])
self.assertEqual(self.hist(resolution=2), [5,5])
self.assertEqual(self.hist(resolution=3), [4,3,3])
self.assertEqual(self.hist(resolution=4), [3,3,2,2])
self.assertEqual(self.hist(resolution=5), [2,2,2,2,2])

def test_keep_the_peak_if_bounds_given(self):
self.assertEqual(self.hist(resolution=1, first=0, last=10), [10])
self.assertEqual(self.hist(resolution=2, first=0, last=10), [10,0])
self.assertEqual(self.hist(resolution=3, first=0, last=10), [0,10,0])
self.assertEqual(self.hist(resolution=4, first=0, last=10), [0,10,0,0])
self.assertEqual(self.hist(resolution=5, first=0, last=10), [0,0,10,0,0])


0 comments on commit a181390

Please sign in to comment.