Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Point-in-time Data Operation #343

Merged
merged 34 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift click to select a range
3e92169
add period ops class
bxdd Mar 9, 2021
fead243
black format
bxdd Mar 9, 2021
61720c2
add pit data read
bxdd Mar 10, 2021
a0959a9
fix bug in period ops
bxdd Mar 10, 2021
bd46d14
update ops runnable
bxdd Mar 10, 2021
9f1cc64
update PIT test example
bxdd Mar 10, 2021
63e4895
black format
bxdd Mar 10, 2021
88b7926
update PIT test
bxdd Mar 10, 2021
a2dae5c
update tets_PIT
bxdd Mar 12, 2021
99db80d
update code format
bxdd Mar 12, 2021
c4bbe6b
add check_feature_exist
bxdd Mar 12, 2021
20bcf25
black format
bxdd Mar 12, 2021
6e23ff7
optimize the PIT Algorithm
bxdd Mar 12, 2021
88a0d3d
fix bug
bxdd Mar 12, 2021
f52462a
update example
bxdd Mar 12, 2021
b794e65
update test_PIT name
bxdd Mar 17, 2021
255ed0b
Merge https://github.com/microsoft/qlib
bxdd Apr 7, 2021
9df1fbd
add pit collector
bxdd Apr 7, 2021
71d5640
black format
bxdd Apr 7, 2021
ebe277b
fix bugs
bxdd Apr 8, 2021
655ff51
fix try
bxdd Apr 8, 2021
f6ca4d2
fix bug & add dump_pit.py
bxdd Apr 9, 2021
566a8f9
Successfully run and understand PIT
you-n-g Mar 4, 2022
63b5ed4
Merge remote-tracking branch 'origin/main' into PIT
you-n-g Mar 4, 2022
4997389
Add some docs and remove a bug
you-n-g Mar 4, 2022
561be64
Merge remote-tracking branch 'origin/main' into PIT
you-n-g Mar 8, 2022
6811a07
Merge remote-tracking branch 'origin/main' into PIT
you-n-g Mar 8, 2022
cf77cd0
mv crypto collector
you-n-g Mar 8, 2022
79422a1
black format
you-n-g Mar 8, 2022
48ea2c5
Run succesfully after merging master
you-n-g Mar 8, 2022
9c67303
Pass test and fix code
you-n-g Mar 10, 2022
69cf2ab
remove useless PIT code
you-n-g Mar 10, 2022
de8d6cb
fix PYlint
you-n-g Mar 10, 2022
2671dc2
Rename
you-n-g Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/main' into PIT
  • Loading branch information
you-n-g committed Mar 4, 2022
commit 63b5ed42c20f56c16421cce28ab69db94369a4d5
19 changes: 14 additions & 5 deletions qlib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 183,15 @@ def set_conf_from_C(self, config_c):
"value": float("NAN"),
"index": 0xFFFFFFFF,
},
# Default config for MongoDB
"mongo": {
"task_url": "mongodb://localhost:27017/",
"task_db_name": "default_task_db",
},
# Shift minute for highfreq minite data, used in backtest
# if min_data_shift == 0, use default market time [9:30, 11:29, 1:00, 2:59]
# if min_data_shift != 0, use shifted market time [9:30, 11:29, 1:00, 2:59] - shift*minute
"min_data_shift": 0,
}

MODE_CONF = {
Expand Down Expand Up @@ -434,11 443,11 @@ def set(self, default_conf: str = "client", **kwargs):
)

def register(self):
from .utils import init_instance_by_config
from .data.base import register_all_ops
from .data.data import register_all_wrappers
from .workflow import R, QlibRecorder
from .workflow.utils import experiment_exit_handler
from .utils import init_instance_by_config # pylint: disable=C0415
from .data.ops import register_all_ops # pylint: disable=C0415
from .data.data import register_all_wrappers # pylint: disable=C0415
from .workflow import R, QlibRecorder # pylint: disable=C0415
from .workflow.utils import experiment_exit_handler # pylint: disable=C0415

register_all_ops(self)
register_all_wrappers(self)
Expand Down
47 changes: 0 additions & 47 deletions qlib/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 235,6 @@ class ExpressionOps(Expression):
This kind of feature will use operator for feature
construction on the fly.
"""

pass


Expand Down Expand Up @@ -553,49 552,3 @@ class PExpressionOps(PExpression):
"""

pass


class OpsWrapper:
"""Ops Wrapper"""

def __init__(self):
self._ops = {}

def reset(self):
self._ops = {}

def register(self, ops_list):
for operator in ops_list:
if not issubclass(operator, ExpressionOps) and not issubclass(operator, PExpressionOps):
raise TypeError("operator must be subclass of ExpressionOps or PExpressionOps, not {}".format(operator))

if operator.__name__ in self._ops:
get_module_logger(self.__class__.__name__).warning(
"The custom operator [{}] will override the qlib default definition".format(operator.__name__)
)
self._ops[operator.__name__] = operator

def __getattr__(self, key):
if key not in self._ops:
raise AttributeError("The operator [{0}] is not registered".format(key))
return self._ops[key]


Operators = OpsWrapper()


def register_all_ops(C):
"""register all operator"""
logger = get_module_logger("base")

Operators.reset()

from .ops import OpsList
from .ops_period import PeriodOpsList

Operators.register(OpsList)
Operators.register(PeriodOpsList)

if getattr(C, "custom_ops", None) is not None:
Operators.register(C.custom_ops)
logger.debug("register custom period operator {}".format(C.custom_ops))
3 changes: 2 additions & 1 deletion qlib/data/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 32,8 @@
)

from ..log import get_module_logger
from .base import Feature, PFeature, Operators
from .base import Feature, PFeature
from .ops import Operators # pylint: disable=W0611


class QlibCacheException(RuntimeError):
Expand Down
48 changes: 39 additions & 9 deletions qlib/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 19,50 @@

from .cache import H
from ..config import C
from .inst_processor import InstProcessor

from ..log import get_module_logger
from .cache import DiskDatasetCache
from .base import Feature, PFeature
from ..utils import (
Wrapper,
init_instance_by_config,
register_wrapper,
get_module_by_module_path,
parse_field,
read_bin,
read_period_data,
get_period_list,
hash_args,
normalize_cache_fields,
code_to_fname,
set_log_with_config,
time_to_slc_point,
read_period_data,
get_period_list,
)
from .base import Feature, PFeature, Operators
from .cache import DiskDatasetCache, DiskExpressionCache
from ..utils import Wrapper, init_instance_by_config, register_wrapper, get_module_by_module_path
from ..utils.paral import ParallelExt
from .ops import Operators # pylint: disable=W0611


class ProviderBackendMixin:
"""
This helper class tries to make the provider based on storage backend more convenient
It is not necessary to inherent this class if that provider don't rely on the backend storage
"""

def get_default_backend(self):
backend = {}
provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2]
# set default storage class
backend.setdefault("class", f"File{provider_name}Storage")
# set default storage module
backend.setdefault("module_path", "qlib.data.storage.file_storage")
return backend

def backend_obj(self, **kwargs):
backend = self.backend if self.backend else self.get_default_backend()
backend = copy.deepcopy(backend)
backend.setdefault("kwargs", {}).update(**kwargs)
return init_instance_by_config(backend)
>>>>>>> origin/main


class CalendarProvider(abc.ABC):
Expand Down Expand Up @@ -546,9 577,8 @@ def inst_calculator(inst, start_time, end_time, freq, column_names, spans=None,
_calendar = Cal.calendar(freq=freq)
data.index = _calendar[data.index.values.astype(int)]
data.index.names = ["datetime"]
if spans is None:
return data
else:

if not data.empty and spans is not None:
mask = np.zeros(len(data), dtype=bool)
for begin, end in spans:
mask |= (data.index >= begin) & (data.index <= end)
Expand Down
54 changes: 52 additions & 2 deletions qlib/data/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 10,7 @@

from typing import Union, List, Type
from scipy.stats import percentileofscore

from .base import Expression, PExpression, ExpressionOps
from .base import Expression, ExpressionOps, Feature, PExpression
from ..log import get_module_logger
from ..utils import get_callable_kwargs

Expand Down Expand Up @@ -1592,6 1591,53 @@ def _load_internal(self, instrument, start_index, end_index, freq):
] [TResample]


class OpsWrapper:
"""Ops Wrapper"""

def __init__(self):
self._ops = {}

def reset(self):
self._ops = {}

def register(self, ops_list: List[Union[Type[ExpressionOps], dict]]):
"""register operator

Parameters
----------
ops_list : List[Union[Type[ExpressionOps], dict]]
- if type(ops_list) is List[Type[ExpressionOps]], each element of ops_list represents the operator class, which should be the subclass of `ExpressionOps`.
- if type(ops_list) is List[dict], each element of ops_list represents the config of operator, which has the following format:
{
"class": class_name,
"module_path": path,
}
Note: `class` should be the class name of operator, `module_path` should be a python module or path of file.
"""
for _operator in ops_list:
if isinstance(_operator, dict):
_ops_class, _ = get_callable_kwargs(_operator)
else:
_ops_class = _operator

if not issubclass(_ops_class, Expression):
raise TypeError("operator must be subclass of ExpressionOps, not {}".format(_ops_class))

if _ops_class.__name__ in self._ops:
get_module_logger(self.__class__.__name__).warning(
"The custom operator [{}] will override the qlib default definition".format(_ops_class.__name__)
)
self._ops[_ops_class.__name__] = _ops_class

def __getattr__(self, key):
if key not in self._ops:
raise AttributeError("The operator [{0}] is not registered".format(key))
return self._ops[key]


Operators = OpsWrapper()


def register_all_ops(C):
"""register all operator"""
logger = get_module_logger("ops")
Expand All @@ -1601,6 1647,10 @@ def register_all_ops(C):
# Operators.reset()
Operators.register(OpsList)

# FIXME: I don't think it is necessary
from .ops_period import PeriodOpsList
Operators.register(PeriodOpsList)

if getattr(C, "custom_ops", None) is not None:
Operators.register(C.custom_ops)
logger.debug("register custom operator {}".format(C.custom_ops))
16 changes: 9 additions & 7 deletions qlib/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 29,10 @@
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Union, Tuple, Text, Optional

from typing import List, Dict, Union, Tuple, Any, Text, Optional, Callable
from types import ModuleType
from urllib.parse import urlparse
from .file import get_or_create_path, save_multiple_parts_file, unpack_archive_with_buffer, get_tmp_file_with_buffer
from ..config import C
from ..log import get_module_logger, set_log_with_config

Expand Down Expand Up @@ -260,11 262,11 @@ def parse_field(field):

if not isinstance(field, str):
field = str(field)
return re.sub(
r"\$(\w )",
r'Feature("\1")',
re.sub(r"\$\$(\w )", r'PFeature("\1")', re.sub(r"(\w \s*)\(", r"Operators.\1(", field)),
)
for pattern, new in [(r"\$\$(\w )", r'PFeature("\1")'), # $$ must be before $
(r"\$(\w )", rf'Feature("\1")'),
(r"(\w \s*)\(", r"Operators.\1(")]: # Features # Operators
field = re.sub(pattern, new, field)
return field


def get_module_by_module_path(module_path: Union[str, ModuleType]):
Expand Down
13 changes: 7 additions & 6 deletions scripts/data_collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 187,13 @@ def cache_small_data(self, symbol, df):
def _collector(self, instrument_list):

error_symbol = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
with tqdm(total=len(instrument_list)) as p_bar:
for _symbol, _result in zip(instrument_list, executor.map(self._simple_collector, instrument_list)):
if _result != self.NORMAL_FLAG:
error_symbol.append(_symbol)
p_bar.update()
res = Parallel(n_jobs=self.max_workers)(
delayed(self._simple_collector)(_inst) for _inst in tqdm(instrument_list)
)
for _symbol, _result in zip(instrument_list, res):
if _result != self.NORMAL_FLAG:
error_symbol.append(_symbol)
print(error_symbol)
logger.info(f"error symbol nums: {len(error_symbol)}")
logger.info(f"current get symbol nums: {len(instrument_list)}")
error_symbol.extend(self.mini_symbol_map.keys())
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.