Skip to content

Commit

Permalink
sample data bucket and better testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Škoda committed Oct 9, 2022
1 parent 1b02e9e commit 26e83ea
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lakeapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 4,4 @@
__email__ = "[email protected]"
__version__ = "0.1.0"

from .main import load_data # noqa
from .main import load_data, set_default_bucket, use_sample_data # noqa
23 changes: 17 additions & 6 deletions lakeapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 9,33 @@
import lakeapi._read_parquet

cache = FSLRUCache(ttl=8 * 60 * 60, path="cache/boto", maxsize=1000)
default_bucket = 'qnt.data/market-data/cryptofeed'


def set_default_bucket(bucket: str) -> None:
global default_bucket
default_bucket = bucket

def use_sample_data() -> None:
set_default_bucket('sample.crypto.lake')

def load_data(
table: Literal["book", "trades"],
start: Optional[datetime.datetime],
end: Optional[datetime.datetime],
symbols: Optional[List[str]],
exchanges: Optional[List[str]],
table: Literal["book", "trades", "candles"],
start: Optional[datetime.datetime] = None,
end: Optional[datetime.datetime] = None,
symbols: Optional[List[str]] = None,
exchanges: Optional[List[str]] = None,
*,
bucket: Optional[str] = None,
use_threads: bool = True,
columns: Optional[List[str]] = None,
row_slice: Optional[slice] = None,
drop_partition_cols: bool = False,
) -> pd.DataFrame:
if end is None:
end = datetime.datetime.now()
if bucket is None:
bucket = default_bucket

def partition_filter(partition: Dict[str, str]) -> bool:
return (
Expand Down Expand Up @@ -55,7 66,7 @@ def partition_filter(partition: Dict[str, str]) -> bool:
s3_session = boto3.Session(region_name="eu-west-1")
# TODO: log & skip corrupted files
df = lakeapi._read_parquet.read_parquet(
path=f"s3://qnt.data/market-data/cryptofeed/{table}",
path=f"s3://{bucket}/{table}",
partition_filter=partition_filter,
categories=["side"] if table == "trades" else None,
dataset=True, # also adds partition columns
Expand Down
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 6,7 @@ coverage==4.5.4
Sphinx==5.2.3
twine==1.14.0
Click==7.1.2
pytest==6.2.4
pytest==7.1.3
jinja2==3.1.2

pandas==1.2.0; python_version < '3.8'
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 19,5 @@ exclude = docs
max-line-length = 120

[tool:pytest]
collect_ignore = ['setup.py']
filterwarnings =
ignore:.*distutils package is deprecated.*:DeprecationWarning
24 changes: 19 additions & 5 deletions tests/test_lakeapi.py
Original file line number Diff line number Diff line change
@@ -1,11 1,25 @@
#!/usr/bin/env python

"""Tests for `lakeapi` package."""
import datetime

import pytest # noqa

import lakeapi # noqa


def test_empty():
pass
@pytest.fixture
def candles():
lakeapi.use_sample_data()
return lakeapi.load_data(
table = 'candles',
symbols = ['BTC-USDT'],
exchanges = ['BINANCE'],
start = datetime.datetime(2022, 8, 28),
end = datetime.datetime(2022, 8, 30),
)

def test_load_data_loads_something(candles):
assert candles.shape[0] == 2 * 24 * 60

def test_load_data_dtypes(candles):
print(candles.dtypes)
assert 'str' not in set(candles.dtypes)
assert str(candles.symbol.dtype) == 'category'

0 comments on commit 26e83ea

Please sign in to comment.