-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Still getting some errors on sending the prefect task to the Dask cluster but getting close. Tweak the docs
- Loading branch information
1 parent
096ec80
commit 58c110a
Showing
37 changed files
with
3,517 additions
and
384 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
FROM python:3.11-slim as requirements | ||
|
||
WORKDIR /tmp | ||
RUN mkdir -p requirements | ||
RUN pip install --upgrade pip poetry | ||
RUN poetry self add poetry-plugin-export | ||
COPY ./pyproject.toml pyproject.toml | ||
COPY ./poetry.lock poetry.lock | ||
COPY ./scripts/export_poetry_to_req_txt.sh export_poetry_to_req_txt.sh | ||
RUN chmod +x export_poetry_to_req_txt.sh | ||
RUN ./export_poetry_to_req_txt.sh | ||
|
||
FROM ghcr.io/dask/dask:2024.2.1-py3.11 as dask | ||
|
||
USER root | ||
# Mitigate the issue when running a flow with DaskTaskRunner | ||
# RuntimeError: Unable to find any timezone configuration | ||
# https://github.com/PrefectHQ/prefect/issues/3061 | ||
ENV DEBIAN_FRONTEND=noninteractive | ||
RUN apt-get update -y && apt-get install -y tzdata | ||
# Cleanup the apt cache | ||
RUN apt-get purge -y --auto-remove | ||
RUN rm -rf /var/lib/apt/lists/* | ||
USER $NB_USER | ||
|
||
COPY --from=requirements /tmp/requirements/requirements.txt requirements.txt | ||
RUN pip install -r requirements.txt | ||
|
||
# Add Avengercon code base | ||
COPY ./README.md README.md | ||
COPY ./pyproject.toml pyproject.toml | ||
COPY ./avengercon avengercon | ||
RUN pip install ./ | ||
|
||
ENV TZ="America/New_York" | ||
#RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone | ||
|
||
FROM ghcr.io/dask/dask-notebook:2024.2.1-py3.11 as notebook | ||
|
||
USER root | ||
ENV DEBIAN_FRONTEND=noninteractive | ||
RUN apt-get update -y && apt-get install -y tzdata | ||
# Cleanup the apt cache | ||
RUN apt-get purge -y --auto-remove | ||
RUN rm -rf /var/lib/apt/lists/* | ||
USER $NB_USER | ||
|
||
COPY --from=requirements /tmp/requirements/requirements.txt requirements.txt | ||
RUN pip install -r requirements.txt | ||
|
||
# Add Avengercon code base | ||
COPY ./README.md README.md | ||
COPY ./pyproject.toml pyproject.toml | ||
COPY ./avengercon avengercon | ||
RUN pip install ./ | ||
|
||
ENV TZ="America/New_York" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
FROM python:3.11-slim as requirements | ||
|
||
WORKDIR /tmp | ||
RUN mkdir -p requirements | ||
RUN pip install --upgrade pip poetry | ||
RUN poetry self add poetry-plugin-export | ||
COPY ./pyproject.toml pyproject.toml | ||
COPY ./poetry.lock poetry.lock | ||
COPY ./scripts/export_poetry_to_req_txt.sh export_poetry_to_req_txt.sh | ||
RUN chmod +x export_poetry_to_req_txt.sh | ||
RUN ./export_poetry_to_req_txt.sh | ||
|
||
FROM prefecthq/prefect:2-python3.11 as development | ||
|
||
# Mitigate the issue when running a flow with DaskTaskRunner | ||
# RuntimeError: Unable to find any timezone configuration | ||
# https://github.com/PrefectHQ/prefect/issues/3061 | ||
ENV DEBIAN_FRONTEND=noninteractive | ||
RUN apt-get update -y && apt-get install -y tzdata | ||
# Cleanup the apt cache | ||
RUN apt-get purge -y --auto-remove | ||
RUN rm -rf /var/lib/apt/lists/* | ||
|
||
COPY --from=requirements /tmp/requirements/requirements.txt requirements.txt | ||
RUN pip install -r requirements.txt | ||
|
||
# Add Avengercon code base | ||
COPY ./README.md README.md | ||
COPY ./pyproject.toml pyproject.toml | ||
COPY ./avengercon avengercon | ||
RUN pip install ./ | ||
|
||
ENV TZ="America/New_York" | ||
#RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone | ||
EXPOSE 4200 | ||
|
||
FROM prefect as production | ||
|
||
# Remove pip's cache to reduce final image size | ||
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching | ||
RUN pip cache purge |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
""" | ||
Dask client and related functions | ||
https://docs.dask.org/en/latest/futures.html#distributed.Client | ||
""" | ||
|
||
from dask.distributed import Client | ||
from avengercon.dask.config import dask_config | ||
from prefect_dask import get_dask_client as prefect_get_dask_client | ||
|
||
# Note: This will implicitly get configured via the DASK_SCHEDULER_ADDRESS environment | ||
# variable | ||
dask_client = Client( | ||
address=dask_config.scheduler_address, | ||
asynchronous=False, | ||
name="DaskClientSync", | ||
) | ||
|
||
|
||
def get_dask_client() -> Client: | ||
""" | ||
Wrapper for the prefect_dask get_dask_client() function that pre-populates the Dask | ||
cluster information. | ||
Returns: A Dask client | ||
""" | ||
with prefect_get_dask_client( | ||
address=dask_config.scheduler_address, | ||
asynchronous=False, | ||
name="DaskClientSync", | ||
) as l_client: | ||
yield l_client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
""" | ||
Dask client configuration | ||
""" | ||
|
||
from pydantic import Field | ||
from pydantic import ValidationError | ||
from pydantic_settings import BaseSettings | ||
from sys import exit | ||
|
||
from avengercon.logger import logger | ||
|
||
|
||
class DaskConfig(BaseSettings): | ||
""" | ||
Pydantic parser for Dask configuration environment variables | ||
""" | ||
|
||
scheduler_address: str = Field(alias="DASK_SCHEDULER_ADDRESS") | ||
|
||
|
||
try: | ||
# Note: As of 17FEB24 there's a conflict between Pydantic's dataclass behavior and | ||
# mypy/pyright. Ignore for now: | ||
# https://github.com/pydantic/pydantic-settings/issues/201#issuecomment-1950266275 | ||
# TODO: Remove the type ignore once a fix is in place | ||
dask_config = DaskConfig() # type: ignore | ||
logger.debug("Successfully found Dask configuration") | ||
except ValidationError as e: | ||
logger.error(f"Dask configuration error: {e}") | ||
exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
""" | ||
Prefect client/API integrations | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
""" | ||
Prefect client configuration | ||
""" | ||
|
||
from pydantic import Field | ||
from pydantic import ValidationError | ||
from pydantic_settings import BaseSettings | ||
from avengercon.logger import logger | ||
from sys import exit | ||
|
||
|
||
class PrefectConfig(BaseSettings): | ||
""" | ||
Pydantic parser for Prefect client configuration environment variables. | ||
https://docs.prefect.io/latest/api-ref/prefect/settings/ | ||
""" | ||
|
||
# bespoke configurations unrelated to the Prefect Python SDK official configurations | ||
prefect_flows_bucket: str = Field(alias="PREFECT_MINIO_FLOWS_BUCKET_NAME") | ||
prefect_artifacts_bucket: str = Field(alias="PREFECT_MINIO_ARTIFACTS_BUCKET_NAME") | ||
|
||
|
||
try: | ||
# Note: As of 17FEB24 there's a conflict between Pydantic's dataclass behavior and | ||
# mypy/pyright. Ignore for now: | ||
# https://github.com/pydantic/pydantic-settings/issues/201#issuecomment-1950266275 | ||
# TODO: Remove the type ignore once a fix is in place | ||
prefect_config = PrefectConfig() # type: ignore | ||
logger.debug("Successfully found Prefect client configuration") | ||
except ValidationError as e: | ||
logger.error(f"Prefect configuration error: {e}") | ||
exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
""" | ||
Example Prefect flows | ||
""" | ||
|
||
from prefect import flow | ||
from avengercon.prefect import tasks | ||
from avengercon.logger import logger | ||
from prefect_dask import DaskTaskRunner | ||
from avengercon.dask.config import dask_config | ||
|
||
_dask_task_runner = DaskTaskRunner(address=dask_config.scheduler_address) | ||
|
||
|
||
@flow | ||
def hello_prefect_flow() -> str: | ||
""" | ||
A minimal example of a Prefect flow | ||
Returns: A greeting string | ||
""" | ||
l_greeting = tasks.hello_prefect() | ||
logger.info(f"Hello, Prefect Flows!") | ||
return l_greeting | ||
|
||
|
||
@flow(task_runner=_dask_task_runner) | ||
def hello_prefect_dask_flow(): | ||
""" | ||
A minimal example of a Prefect flow using a Dask cluster | ||
Returns: An example Dask dataframe | ||
""" | ||
prefect_future = tasks.hello_prefect_dask.submit() | ||
return prefect_future.result() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
""" | ||
Prefect storage management | ||
""" | ||
|
||
from typing import List, Dict, Any | ||
|
||
from avengercon.minio import create_buckets | ||
from avengercon.prefect.config import prefect_config | ||
from avengercon.minio.config import minio_config | ||
from avengercon.minio.schemas import BucketCreationResult | ||
from avengercon.logger import logger | ||
from prefect.filesystems import RemoteFileSystem | ||
from prefect.exceptions import PrefectException | ||
|
||
_default_buckets: List[str] = [] | ||
try: | ||
_default_buckets.append(prefect_config.prefect_flows_bucket) | ||
_default_buckets.append(prefect_config.prefect_artifacts_bucket) | ||
except (AttributeError, TypeError, ValueError) as e: | ||
logger.error(f"Failed to configure default Prefect S3 buckets: {e}") | ||
|
||
|
||
def create_default_prefect_buckets() -> bool: | ||
""" | ||
Attempt to create default buckets used by Prefect for flows and other artifacts | ||
Returns: True if successful; False otherwise | ||
""" | ||
l_result: BucketCreationResult = create_buckets(_default_buckets) | ||
if bool(l_result.failure): | ||
logger.warning(f"Failed to create default Prefect buckets: {l_result.failure}") | ||
return False | ||
return True | ||
|
||
|
||
def create_default_prefect_blocks() -> BucketCreationResult: | ||
""" | ||
Attempt to create default Prefect storage blocks using the default S3 buckets | ||
Returns: True if successful; False otherwise | ||
""" | ||
l_result = BucketCreationResult() | ||
l_settings: Dict[str, Any] = { | ||
"use_ssl": minio_config.secure, | ||
"key": minio_config.access_key, | ||
"secret": minio_config.secret_key.get_secret_value(), | ||
"client_kwargs": { | ||
"endpoint_url": f"{minio_config.protocol}://{minio_config.endpoint}", | ||
}, | ||
} | ||
for l_bucket_name in _default_buckets: | ||
try: | ||
block_storage = RemoteFileSystem( | ||
basepath=f"s3://{l_bucket_name}", | ||
key_type="hash", | ||
settings=dict( | ||
use_ssl=minio_config.secure, | ||
key=minio_config.access_key, | ||
secret=minio_config.secret_key.get_secret_value(), | ||
client_kwargs=l_settings, | ||
), | ||
) | ||
block_storage.save(f"{l_bucket_name}", overwrite=True) | ||
l_result.success.append(l_bucket_name) | ||
except (ValueError, PrefectException) as e: | ||
logger.warning(f"Failed to create Prefect block {l_bucket_name}: {e}") | ||
l_result.failure.append(l_bucket_name) | ||
return l_result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
""" | ||
Prefect Tasks | ||
""" | ||
|
||
from prefect import task | ||
from avengercon.logger import logger | ||
from dask import datasets | ||
from avengercon.dask import get_dask_client | ||
|
||
|
||
@task | ||
def hello_prefect() -> str: | ||
""" | ||
A minimal example of a Prefect task | ||
Returns: A greeting str | ||
""" | ||
l_response: str = "Hello, Prefect Tasks!" | ||
logger.info(l_response) | ||
return l_response | ||
|
||
|
||
@task | ||
def hello_prefect_dask(): | ||
""" | ||
A minimal example of a Prefect task sent to a distributed Dask cluster | ||
Returns: A Dask dataframe | ||
""" | ||
with get_dask_client() as client: | ||
df = datasets.timeseries("2000", "2001", partition_freq="4w") | ||
summary_df = client.compute(df.describe()).result() | ||
return summary_df | ||
# l_response: str = "Hello, Prefect <3's Dask" | ||
# logger.info(l_response) | ||
# return l_response |
Oops, something went wrong.