Skip to content

Commit

Permalink
Add prefect+dask integration
Browse files Browse the repository at this point in the history
Still getting some errors on sending the prefect task to the Dask cluster but getting close.

Tweak the docs
  • Loading branch information
brent-stone committed Feb 27, 2024
1 parent 096ec80 commit 58c110a
Show file tree
Hide file tree
Showing 37 changed files with 3,517 additions and 384 deletions.
2 changes: 1 addition & 1 deletion DockerfileAvengercon
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ FROM python:3.11-slim as base
# https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#run
# "Always combine RUN apt-get update with apt-get install in the same RUN statement."
RUN apt-get update && apt-get upgrade -y && apt-get install -y --no-install-recommends \
curl # \
curl
# gcc \
# g++ \
# libre2-dev \
Expand Down
57 changes: 57 additions & 0 deletions DockerfileAvengerconDask
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
FROM python:3.11-slim as requirements

WORKDIR /tmp
RUN mkdir -p requirements
RUN pip install --upgrade pip poetry
RUN poetry self add poetry-plugin-export
COPY ./pyproject.toml pyproject.toml
COPY ./poetry.lock poetry.lock
COPY ./scripts/export_poetry_to_req_txt.sh export_poetry_to_req_txt.sh
RUN chmod +x export_poetry_to_req_txt.sh
RUN ./export_poetry_to_req_txt.sh

FROM ghcr.io/dask/dask:2024.2.1-py3.11 as dask

USER root
# Mitigate the issue when running a flow with DaskTaskRunner
# RuntimeError: Unable to find any timezone configuration
# https://github.com/PrefectHQ/prefect/issues/3061
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update -y && apt-get install -y tzdata
# Cleanup the apt cache
RUN apt-get purge -y --auto-remove
RUN rm -rf /var/lib/apt/lists/*
USER $NB_USER

COPY --from=requirements /tmp/requirements/requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Add Avengercon code base
COPY ./README.md README.md
COPY ./pyproject.toml pyproject.toml
COPY ./avengercon avengercon
RUN pip install ./

ENV TZ="America/New_York"
#RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

FROM ghcr.io/dask/dask-notebook:2024.2.1-py3.11 as notebook

USER root
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update -y && apt-get install -y tzdata
# Cleanup the apt cache
RUN apt-get purge -y --auto-remove
RUN rm -rf /var/lib/apt/lists/*
USER $NB_USER

COPY --from=requirements /tmp/requirements/requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Add Avengercon code base
COPY ./README.md README.md
COPY ./pyproject.toml pyproject.toml
COPY ./avengercon avengercon
RUN pip install ./

ENV TZ="America/New_York"
41 changes: 41 additions & 0 deletions DockerfileAvengerconPrefect
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
FROM python:3.11-slim as requirements

WORKDIR /tmp
RUN mkdir -p requirements
RUN pip install --upgrade pip poetry
RUN poetry self add poetry-plugin-export
COPY ./pyproject.toml pyproject.toml
COPY ./poetry.lock poetry.lock
COPY ./scripts/export_poetry_to_req_txt.sh export_poetry_to_req_txt.sh
RUN chmod +x export_poetry_to_req_txt.sh
RUN ./export_poetry_to_req_txt.sh

FROM prefecthq/prefect:2-python3.11 as development

# Mitigate the issue when running a flow with DaskTaskRunner
# RuntimeError: Unable to find any timezone configuration
# https://github.com/PrefectHQ/prefect/issues/3061
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update -y && apt-get install -y tzdata
# Cleanup the apt cache
RUN apt-get purge -y --auto-remove
RUN rm -rf /var/lib/apt/lists/*

COPY --from=requirements /tmp/requirements/requirements.txt requirements.txt
RUN pip install -r requirements.txt

# Add Avengercon code base
COPY ./README.md README.md
COPY ./pyproject.toml pyproject.toml
COPY ./avengercon avengercon
RUN pip install ./

ENV TZ="America/New_York"
#RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
EXPOSE 4200

FROM prefect as production

# Remove pip's cache to reduce final image size
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
RUN pip cache purge
31 changes: 31 additions & 0 deletions avengercon/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Dask client and related functions
https://docs.dask.org/en/latest/futures.html#distributed.Client
"""

from dask.distributed import Client
from avengercon.dask.config import dask_config
from prefect_dask import get_dask_client as prefect_get_dask_client

# Note: This will implicitly get configured via the DASK_SCHEDULER_ADDRESS environment
# variable
dask_client = Client(
address=dask_config.scheduler_address,
asynchronous=False,
name="DaskClientSync",
)


def get_dask_client() -> Client:
"""
Wrapper for the prefect_dask get_dask_client() function that pre-populates the Dask
cluster information.
Returns: A Dask client
"""
with prefect_get_dask_client(
address=dask_config.scheduler_address,
asynchronous=False,
name="DaskClientSync",
) as l_client:
yield l_client
30 changes: 30 additions & 0 deletions avengercon/dask/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Dask client configuration
"""

from pydantic import Field
from pydantic import ValidationError
from pydantic_settings import BaseSettings
from sys import exit

from avengercon.logger import logger


class DaskConfig(BaseSettings):
"""
Pydantic parser for Dask configuration environment variables
"""

scheduler_address: str = Field(alias="DASK_SCHEDULER_ADDRESS")


try:
# Note: As of 17FEB24 there's a conflict between Pydantic's dataclass behavior and
# mypy/pyright. Ignore for now:
# https://github.com/pydantic/pydantic-settings/issues/201#issuecomment-1950266275
# TODO: Remove the type ignore once a fix is in place
dask_config = DaskConfig() # type: ignore
logger.debug("Successfully found Dask configuration")
except ValidationError as e:
logger.error(f"Dask configuration error: {e}")
exit(1)
3 changes: 3 additions & 0 deletions avengercon/prefect/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Prefect client/API integrations
"""
32 changes: 32 additions & 0 deletions avengercon/prefect/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Prefect client configuration
"""

from pydantic import Field
from pydantic import ValidationError
from pydantic_settings import BaseSettings
from avengercon.logger import logger
from sys import exit


class PrefectConfig(BaseSettings):
"""
Pydantic parser for Prefect client configuration environment variables.
https://docs.prefect.io/latest/api-ref/prefect/settings/
"""

# bespoke configurations unrelated to the Prefect Python SDK official configurations
prefect_flows_bucket: str = Field(alias="PREFECT_MINIO_FLOWS_BUCKET_NAME")
prefect_artifacts_bucket: str = Field(alias="PREFECT_MINIO_ARTIFACTS_BUCKET_NAME")


try:
# Note: As of 17FEB24 there's a conflict between Pydantic's dataclass behavior and
# mypy/pyright. Ignore for now:
# https://github.com/pydantic/pydantic-settings/issues/201#issuecomment-1950266275
# TODO: Remove the type ignore once a fix is in place
prefect_config = PrefectConfig() # type: ignore
logger.debug("Successfully found Prefect client configuration")
except ValidationError as e:
logger.error(f"Prefect configuration error: {e}")
exit(1)
34 changes: 34 additions & 0 deletions avengercon/prefect/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Example Prefect flows
"""

from prefect import flow
from avengercon.prefect import tasks
from avengercon.logger import logger
from prefect_dask import DaskTaskRunner
from avengercon.dask.config import dask_config

_dask_task_runner = DaskTaskRunner(address=dask_config.scheduler_address)


@flow
def hello_prefect_flow() -> str:
"""
A minimal example of a Prefect flow
Returns: A greeting string
"""
l_greeting = tasks.hello_prefect()
logger.info(f"Hello, Prefect Flows!")
return l_greeting


@flow(task_runner=_dask_task_runner)
def hello_prefect_dask_flow():
"""
A minimal example of a Prefect flow using a Dask cluster
Returns: An example Dask dataframe
"""
prefect_future = tasks.hello_prefect_dask.submit()
return prefect_future.result()
68 changes: 68 additions & 0 deletions avengercon/prefect/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Prefect storage management
"""

from typing import List, Dict, Any

from avengercon.minio import create_buckets
from avengercon.prefect.config import prefect_config
from avengercon.minio.config import minio_config
from avengercon.minio.schemas import BucketCreationResult
from avengercon.logger import logger
from prefect.filesystems import RemoteFileSystem
from prefect.exceptions import PrefectException

_default_buckets: List[str] = []
try:
_default_buckets.append(prefect_config.prefect_flows_bucket)
_default_buckets.append(prefect_config.prefect_artifacts_bucket)
except (AttributeError, TypeError, ValueError) as e:
logger.error(f"Failed to configure default Prefect S3 buckets: {e}")


def create_default_prefect_buckets() -> bool:
"""
Attempt to create default buckets used by Prefect for flows and other artifacts
Returns: True if successful; False otherwise
"""
l_result: BucketCreationResult = create_buckets(_default_buckets)
if bool(l_result.failure):
logger.warning(f"Failed to create default Prefect buckets: {l_result.failure}")
return False
return True


def create_default_prefect_blocks() -> BucketCreationResult:
"""
Attempt to create default Prefect storage blocks using the default S3 buckets
Returns: True if successful; False otherwise
"""
l_result = BucketCreationResult()
l_settings: Dict[str, Any] = {
"use_ssl": minio_config.secure,
"key": minio_config.access_key,
"secret": minio_config.secret_key.get_secret_value(),
"client_kwargs": {
"endpoint_url": f"{minio_config.protocol}://{minio_config.endpoint}",
},
}
for l_bucket_name in _default_buckets:
try:
block_storage = RemoteFileSystem(
basepath=f"s3://{l_bucket_name}",
key_type="hash",
settings=dict(
use_ssl=minio_config.secure,
key=minio_config.access_key,
secret=minio_config.secret_key.get_secret_value(),
client_kwargs=l_settings,
),
)
block_storage.save(f"{l_bucket_name}", overwrite=True)
l_result.success.append(l_bucket_name)
except (ValueError, PrefectException) as e:
logger.warning(f"Failed to create Prefect block {l_bucket_name}: {e}")
l_result.failure.append(l_bucket_name)
return l_result
36 changes: 36 additions & 0 deletions avengercon/prefect/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
Prefect Tasks
"""

from prefect import task
from avengercon.logger import logger
from dask import datasets
from avengercon.dask import get_dask_client


@task
def hello_prefect() -> str:
"""
A minimal example of a Prefect task
Returns: A greeting str
"""
l_response: str = "Hello, Prefect Tasks!"
logger.info(l_response)
return l_response


@task
def hello_prefect_dask():
"""
A minimal example of a Prefect task sent to a distributed Dask cluster
Returns: A Dask dataframe
"""
with get_dask_client() as client:
df = datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = client.compute(df.describe()).result()
return summary_df
# l_response: str = "Hello, Prefect <3's Dask"
# logger.info(l_response)
# return l_response
Loading

0 comments on commit 58c110a

Please sign in to comment.