Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example DAG for demonstrating usage of GCS sensors #22808

Merged
merged 10 commits into from
Apr 8, 2022
47 changes: 39 additions & 8 deletions airflow/providers/google/cloud/example_dags/example_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 37,44 @@
from airflow.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensor,
GCSObjectsWithPrefixExistenceSensor,
GCSObjectUpdateSensor,
GCSUploadSessionCompleteSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator

START_DATE = datetime(2021, 1, 1)

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
GCS_ACL_BUCKET_ROLE = "OWNER"
GCS_ACL_OBJECT_ROLE = "OWNER"

BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")

temp_dir_path = gettempdir()
PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
PATH_TO_TRANSFORM_SCRIPT = os.getenv(
"GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
)
PATH_TO_UPLOAD_FILE = os.environ.get(
PATH_TO_UPLOAD_FILE = os.getenv(
"GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
)
PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
PATH_TO_SAVED_FILE = os.environ.get(
PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
PATH_TO_SAVED_FILE = os.getenv(
"GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
)

BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]

# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests shouldn't depend on manual steps - if the file is required we should:

  1. store it in resources
  2. upload using operator, for example LocalFilesystemToGCSOperator
  3. After the tests, remove any resources created during tests (usually it's enough to remove Bucket)

Copy link
Member Author

@pankajkoti pankajkoti Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bhirsz I wanted to do upload the file programatically like the other chain. However, the sensor task GCSUploadSessionCompleteSensor waits for change in number of files in the bucket. If we have the upload task before GCSUploadSessionCompleteSensor task, it won't detect any changes. On the other hand, if we add upload task after GCSUploadSessionCompleteSensor task, it would be blocked until GCSUploadSessionCompleteSensor task completes and does not solve the need. I am unsure how to add dependency between such tasks. Any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tricky in this case, indeed. How about starting sensors in parallel with upload_file?

Example:

    chain(
        # TEST SETUP
        create_bucket,
        upload_file,
        # TEST BODY
        [gcs_object_exists, gcs_object_with_prefix_exists],
        # TEST TEARDOWN
        delete_bucket,
    )
    chain(
        create_bucket,
        # TEST BODY
        [gcs_upload_session_complete, gcs_update_object_exists],
        delete_bucket
    )

image

We're starting the sensors and in meantime we're uploading the file - and sensors detect it:
image

image

I'm not sure though if it will not be flaky in some cases - running this in CI will show. I will update my PR with this change.

Copy link
Member Author

@pankajkoti pankajkoti Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Since, we are starting in parallel, it may happen that, the upload task is picked up before the sensor task begins and it may not detect the change as expected. Hence, added the comment to manually upload the file. :)

Also, the gcs_object_update_sensor_task needs to be activated after gcs_upload_session_complete_task (and not in parallel with it) as the object is expected to be detected by sensor and the object_update task is to confirm the manually upload has happened prior to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can alleviate it a bit with sleep (yes, the old the ugly sleep ;)). Run sensors separably and in the meantime trigger sleep (5s will suffice) and then upload task:

chain(
    # TEST SETUP
    create_bucket,
    sleep,
    upload_file,
    # TEST BODY
    [gcs_object_exists, gcs_object_with_prefix_exists],
    # TEST TEARDOWN
    delete_bucket,
)
chain(
    create_bucket,
    # TEST BODY
    gcs_upload_session_complete,
    gcs_update_object_exists,
    delete_bucket
)

And of course put some explanation in comments why we're doing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manual steps are unfortunately a big no for us since the plan is to run system tests in CI in community. We can't expect people performing manual task while running them so I'm looking for an automatic solution

Copy link
Member Author

@pankajkoti pankajkoti Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I was not aware of the fact that we're migrating these example DAGs to tests. I was of the impression that the example DAGs are for references to our community on how to implement DAGs for certain operators. Understand and totally agree to your fair point that tests should not need manual interventions.
Okay, so we can try introducing sleep if we feel right about it. Also, I am new to contributing to Airflow :)

cc: @kaxil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we had two goals with the migration - to make writing system tests easier and actually ensure that are example dags are runnable (what would be point in example that doesn't work, and it's actually often the case ;)). In the old design system tests actually "wrapped" and run examples - but not all example dags were used by system tests. In the new design we're also using examples as system tests, but without separating it to different files and now example dag is the system test itself.

Any change is highly welcome and it's good to see PRs such like yours - congrats! We only missed the notification on PR (sadly Github doesn't allow for an advanced notification system) and we were not able to have this discussion before merging the PR.

PATH_TO_MANUAL_UPLOAD_FILE = os.getenv(
"GCP_GCS_PATH_TO_MANUAL_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-manual-example-upload.txt")
)
BUCKET_MANUAL_UPLOAD_FILE_LOCATION = PATH_TO_MANUAL_UPLOAD_FILE.rpartition("/")[-1]
PATH_TO_MANUAL_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_MANUAL_UPLOAD_FILE_PREFIX", "test-gcs-manual-")

with models.DAG(
"example_gcs",
start_date=START_DATE,
Expand Down Expand Up @@ -208,9 217,31 @@
task_id="gcs_object_with_prefix_exists_task",
)
# [END howto_sensor_object_with_prefix_exists_task]

# [START howto_sensor_gcs_upload_session_complete_task]
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=BUCKET_1,
prefix=PATH_TO_MANUAL_UPLOAD_FILE_PREFIX,
inactivity_period=60,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
# [END howto_sensor_gcs_upload_session_complete_task]

# [START howto_sensor_object_update_exists_task]
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=BUCKET_1,
object=BUCKET_MANUAL_UPLOAD_FILE_LOCATION,
task_id="gcs_object_update_sensor_task",
)
# [END howto_sensor_object_update_exists_task]

delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)

create_bucket >> upload_file >> [gcs_object_exists, gcs_object_with_prefix_exists] >> delete_bucket
create_bucket >> gcs_upload_session_complete >> gcs_update_object_exists >> delete_bucket


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 41,7 @@


class IDTokenCredentialsAdapter(google_auth_credentials.Credentials):
"""Convert Credentials with "openid" scope to IDTokenCredentials."""
"""Convert Credentials with ``openid`` scope to IDTokenCredentials."""

def __init__(self, credentials: oauth2_credentials.Credentials):
super().__init__()
Expand Down
26 changes: 26 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 193,32 @@ Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefix
:start-after: [START howto_sensor_object_with_prefix_exists_task]
:end-before: [END howto_sensor_object_with_prefix_exists_task]

.. _howto/sensor:GCSUploadSessionCompleteSensor:

GCSUploadSessionCompleteSensor
------------------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor` to check for a change in the number of files with a specified prefix in Google Cloud Storage.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_gcs_upload_session_complete_task]
:end-before: [END howto_sensor_gcs_upload_session_complete_task]

.. _howto/sensor:GCSObjectUpdateSensor:

GCSObjectUpdateSensor
---------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor` to check if an object is updated in Google Cloud Storage.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_object_update_exists_task]
:end-before: [END howto_sensor_object_update_exists_task]

More information
""""""""""""""""

Expand Down