Skip to content

Commit

Permalink
add source, fix flake8, add github actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Škoda committed Oct 8, 2022
1 parent 41b777c commit 2ec48ab
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 51 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 1,44 @@
# This is a basic workflow to help you get started with Actions

name: dev CI workflow

# Controls when the action will run.
on:
# Triggers the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request:
branches: [ master ]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
test:
# The type of runner that the job will run on
strategy:
matrix:
python-versions: [3.6, 3.7, 3.8, 3.9]
os: [ubuntu-20.04, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-versions }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install tox tox-gh-actions poetry
- name: test with tox
run: tox

- name: list files
run: ls -l .
49 changes: 49 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 1,49 @@
# Publish package on release branch if it's tagged with 'v*'

name: release & publish

# Controls when the action will run.
on:
# Triggers the workflow on push or pull request events but only for the master branch
push:
tags:
- 'v*'

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
release:
name: Publish documentation
runs-on: ubuntu-latest

strategy:
matrix:
python-versions: [3.8]

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2

- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-versions }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry
- name: build documentation
run: |
poetry install -E dev
poetry run mkdocs build
- name: publish documentation
uses: peaceiris/actions-gh-pages@v3
with:
personal_token: ${{ secrets.PERSONAL_TOKEN }}
publish_dir: ./site
6 changes: 4 additions & 2 deletions lakeapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 1,7 @@
"""Top-level package for Lake API."""

__author__ = """Jan Skoda"""
__email__ = '[email protected]'
__version__ = '0.1.0'
__email__ = "[email protected]"
__version__ = "0.1.0"

from .lakeapi import load_data # noqa
16 changes: 0 additions & 16 deletions lakeapi/cli.py

This file was deleted.

106 changes: 105 additions & 1 deletion lakeapi/lakeapi.py
Original file line number Diff line number Diff line change
@@ -1 1,105 @@
"""Main module."""
from typing import List, Dict, Optional, Literal
import datetime

import boto3
import pandas as pd
import awswrangler as wr
from cachetools_ext.fs import FSLRUCache
from botocache.botocache import botocache_context

cache = FSLRUCache(ttl=8 * 60 * 60, path="cache/boto", maxsize=1000)


def load_data(
table: Literal["book", "trades"],
start: Optional[datetime.datetime],
end: Optional[datetime.datetime],
symbols: Optional[List[str]],
exchanges: Optional[List[str]],
*,
use_threads: bool = True,
columns: Optional[List[str]] = None,
row_slice: Optional[slice] = None,
drop_partition_cols: bool = False,
) -> pd.DataFrame:
if end is None:
end = datetime.datetime.now()

def partition_filter(partition: Dict[str, str]) -> bool:
return (
(
start is None
or start.date() <= datetime.date.fromisoformat(partition["dt"])
)
and (
end is None or end.date() > datetime.date.fromisoformat(partition["dt"])
)
and (symbols is None or partition["symbol"] in symbols)
and (exchanges is None or partition["exchange"] in exchanges)
)

if symbols:
assert symbols[0].upper() == symbols[0]
if exchanges:
assert exchanges[0].upper() == exchanges[0]

with botocache_context(
cache=cache,
action_regex_to_cache=["List.*"],
# This helps in logging all calls made to AWS. Useful while debugging. Default value is False.
call_log=True,
# This supresses warning messages encountered while caching. Default value is False.
supress_warning_message=False,
):
s3_session = boto3.Session(region_name="eu-west-1")
# TODO: log & skip corrupted files
df = wr.s3.read_parquet(
path=f"s3://qnt.data/market-data/cryptofeed/{table}",
partition_filter=partition_filter,
categories=["side"] if table == "trades" else None,
dataset=True, # also adds partition columns
boto3_session=s3_session,
columns=columns,
use_threads=use_threads,
ignore_index=True,
)
if drop_partition_cols:
# useful when loading just one symbol and exchange
df.drop(columns=["symbol", "exchange", "dt"], inplace=True)
else:
# dt is contained in time columns
df.drop(columns=["dt"], inplace=True)
if row_slice:
df = df.iloc[row_slice]

# For compatibility
if "amount" in df.columns:
df.rename(columns={"amount": "quantity"}, inplace=True)
if "receipt_timestamp" in df.columns:
df.rename(columns={"receipt_timestamp": "received_time"}, inplace=True)
df["received_time"] = pd.to_datetime(df["received_time"], unit="ns", cache=True)
if "timestamp" in df.columns:
df.rename(columns={"timestamp": "origin_time"}, inplace=True)
df["origin_time"] = pd.to_datetime(df["origin_time"], unit="ns", cache=True)
if table == "trades":
df.rename(columns={"id": "trade_id"}, inplace=True)
return df


if __name__ == "__main__":
# Test
# df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 3), end = None, symbols = ['BTC-USDT'], exchanges = ['BINANCE']) # noqa
# df = load_data(table = 'trades', start = datetime.datetime.now() - datetime.timedelta(days = 2), end = None, symbols = None, exchanges = ['BINANCE']) # noqa
df = load_data(
table="book",
start=datetime.datetime.now() - datetime.timedelta(days=2),
end=None,
symbols=["FRONT-BUSD"],
exchanges=None,
)
pd.set_option("display.width", 1000)
pd.set_option("display.max_columns", 30)
print(df)
# print(df.sample(20))
print(df.dtypes)
print(df.memory_usage().sum() / 1e6, "MB")
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 16,7 @@ universal = 1

[flake8]
exclude = docs
max-line-length = 120

[tool:pytest]
collect_ignore = ['setup.py']
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 12,7 @@

requirements = [
'Click>=7.0', 'pandas>=1.0.5', 'boto3>=1.24', 'cachetools_ext>=0.0.8,<0.1.0', 'botocache>=0.0.4,<0.1.0',
'awswrangler==2.16.1@git ssh://[email protected]/leftys/aws-sdk-pandas#egg=awswrangler',

'awswrangler@git ssh://[email protected]/leftys/aws-sdk-pandas#egg=awswrangler',
]

test_requirements = ['pytest>=3', ]
Expand Down
34 changes: 4 additions & 30 deletions tests/test_lakeapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 2,10 @@

"""Tests for `lakeapi` package."""

import pytest
import pytest # noqa

from click.testing import CliRunner
from lakeapi import lakeapi # noqa

from lakeapi import lakeapi
from lakeapi import cli


@pytest.fixture
def response():
"""Sample pytest fixture.
See more at: http://doc.pytest.org/en/latest/fixture.html
"""
# import requests
# return requests.get('https://github.com/audreyr/cookiecutter-pypackage')


def test_content(response):
"""Sample pytest test function with the pytest fixture as an argument."""
# from bs4 import BeautifulSoup
# assert 'GitHub' in BeautifulSoup(response.content).title.string


def test_command_line_interface():
"""Test the CLI."""
runner = CliRunner()
result = runner.invoke(cli.main)
assert result.exit_code == 0
assert 'lakeapi.cli.main' in result.output
help_result = runner.invoke(cli.main, ['--help'])
assert help_result.exit_code == 0
assert '--help Show this message and exit.' in help_result.output
def test_empty():
pass

0 comments on commit 2ec48ab

Please sign in to comment.