Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create guide for Machine Learning Engine operators #8207 #8968

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
155 commits
Select commit Hold shift click to select a range
a28c66f
[AIRFLOW-4734] Upsert functionality for PostgresHook.insert_rows() (#…
oxymor0n Apr 30, 2020
4a1d71d
Fix the process of requirements generations (#8648)
potiuk Apr 30, 2020
b185b36
Reduce response payload size of /dag_stats and /task_stats (#8633)
XD-DENG Apr 30, 2020
4421f01
Improve template capabilities of EMR job and step operators (#8572)
oripwk May 1, 2020
6560f29
Enhanced documentation around Cluster Policy (#8661)
vardancse May 1, 2020
511d98e
[AIRFLOW-4363] Fix JSON encoding error (#8287)
retornam May 1, 2020
ce50538
Add check for pre-2.0 style hostname_callable config value (#8637)
dimberman May 1, 2020
0a7b500
Fix displaying Executor Class Name in "Base Job" table (#8679)
kaxil May 2, 2020
d92e848
Persist start/end date and duration for DummyOperator Task Instance (…
XD-DENG May 2, 2020
0954140
Ensure "started"/"ended" in tooltips are not shown if job not started…
XD-DENG May 2, 2020
19ac45a
Add support for fetching logs from running pods (#8626)
msumit May 3, 2020
1100cea
Remove _get_pretty_exception_message in PrestoHook
fusuiyi123 May 3, 2020
62796b9
Improve tutorial - Include all imports statements (#8670)
Lyalpha May 3, 2020
dd6a7bc
Group Google services in one section (#8623)
mik-laj May 3, 2020
ac59735
Refactor test_variable_command.py (#8535)
May 3, 2020
bc45fa6
Add system test and docs for Facebook Ads operators (#8503)
randr97 May 3, 2020
0b598a2
Fix connection add/edit for spark (#8685)
XD-DENG May 3, 2020
ffbbbfc
Sort connection type list in add/edit page alphabetically (#8692)
XD-DENG May 3, 2020
d8cb0b5
Support k8s auth method in Vault Secrets provider (#8640)
May 3, 2020
67caae0
Add system test for gcs_to_bigquery (#8556)
joppevos May 4, 2020
aec768b
[AIRFLOW-7008] Add perf kit with common used decorators/contexts (#7650)
mik-laj May 4, 2020
c3a46b9
Invalid output in test_variable assertion (#8698)
mik-laj May 4, 2020
5ddc458
Change provider:GCP to provider:Google for Labeler Bot (#8697)
mik-laj May 4, 2020
caa60b1
Remove config side effects from tests (#8607)
turbaszek May 4, 2020
923f423
Check consistency between the reference list and howto directory (#8690)
mik-laj May 4, 2020
b31ad51
Prevent clickable sorting on non sortable columns in TI view (#8681)
Acehaidrey May 4, 2020
6600e47
Import Connection directly from multiprocessing.connection. (#8711)
jhtimmins May 4, 2020
2c92a29
Fix typo in Google Display & Video 360 guide
michalslowikowski00 May 5, 2020
41b4c27
Carefully parse warning messages when building documentation (#8693)
mik-laj May 5, 2020
8d6f1aa
Support num_retries field in env var for GCP connection (#8700)
mik-laj May 5, 2020
c717d12
Add __repr__ for DagTag so tags display properly in /dagmodel/show (#…
XD-DENG May 5, 2020
487b5cc
Add guide for Apache Spark operators (#8305)
May 5, 2020
520aeed
Fix pickling failure when spawning processes (#8671)
jhtimmins May 6, 2020
25ee421
Support all RuntimeEnvironment parameters in DataflowTemplatedJobStar…
mik-laj May 6, 2020
d923b5b
Add jinja template test for AirflowVersion (#8505)
mik-laj May 6, 2020
e673413
Avoid loading executors in jobs (#7888)
mik-laj May 6, 2020
3437bea
Optimize count query on /home (#8729)
mik-laj May 6, 2020
336aa27
Correctly deserialize dagrun_timeout field on DAGs (#8735)
ashb May 6, 2020
2e9ef45
Stop Stalebot on Github issues (#8738)
kaxil May 6, 2020
fd6e057
Make loading plugins from entrypoint fault-tolerant (#8732)
kaxil May 6, 2020
bd29ee3
Ensure test_logging_config.test_reload_module works in spawn mode. (#…
jhtimmins May 6, 2020
d15839d
Latest debian-buster release broke image build (#8758)
potiuk May 7, 2020
ff5b701
Add google_api_to_s3_transfer example dags and system tests (#8581)
feluelle May 7, 2020
7c04604
Add google_api_to_s3_transfer docs howto link (#8761)
feluelle May 7, 2020
723c52c
Add documentation for SpannerDeployInstanceOperator (#8750)
ephraimbuddy May 7, 2020
6e4f5fa
[AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise …
lokeshlal May 7, 2020
b7566e1
Add SQL query tracking for pytest (#8754)
mik-laj May 8, 2020
58aefb2
Added SDFtoGCSOperator (#8740)
michalslowikowski00 May 8, 2020
b37ce29
Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
vshshjn7 May 8, 2020
2bd3e76
Support same short flags for `create user` as 1.10 did for `user_crea…
ashb May 8, 2020
09770e4
Add WorldRemit as Airflow user (#8786)
May 8, 2020
a091c1f
fix typing errors reported by dmypy (#8773)
May 8, 2020
42c5975
Update example SingularityOperator DAG (#8790)
ashb May 8, 2020
791d1a7
Backport packages are renamed to include backport in their name (#8767)
potiuk May 9, 2020
100f530
Fixed test-target command (#8795)
potiuk May 9, 2020
db1b51d
Make celery worker_prefetch_multiplier configurable (#8695)
nadflinn May 9, 2020
bc19778
[AIP-31] Implement XComArg to pass output from one operator to the ne…
jonathanshir May 9, 2020
7506c73
Add default `conf` parameter to Spark JDBC Hook (#8787)
May 9, 2020
5e1c33a
Fix docs on creating CustomOperator (#8678)
JonnyWaffles May 10, 2020
21cc7d7
Document default timeout value for SSHOperator (#8744)
abhilash1in May 10, 2020
cd635dd
[AIRFLOW-5906] Add authenticator parameter to snowflake_hook (#8642)
koszti May 10, 2020
c7788a6
Add imap_attachment_to_s3 example dag and system test (#8669)
feluelle May 10, 2020
a715aa6
Correctly store non-default Nones in serialized tasks/dags (#8772)
ashb May 10, 2020
280f1f0
Correctly restore upstream_task_ids when deserializing Operators (#8775)
ashb May 10, 2020
cbebed2
Allow passing backend_kwargs to AWS SSM client (#8802)
kaxil May 10, 2020
79ef8be
Added Upload Multiple Entity Read Files to specified big query datase…
michalslowikowski00 May 10, 2020
e1cc17e
Remove old airflow logger causing side effects in tests (#8746)
kaxil May 10, 2020
9bb91ef
Add comments to breeze scripts (#8797)
potiuk May 10, 2020
493b685
Add separate example DAGs and system tests for google cloud speech (#…
ephraimbuddy May 10, 2020
bed1995
Avoid color info in response of /dag_stats & /task_stats (#8742)
XD-DENG May 11, 2020
b59adab
Support cron presets in date_range function (#7777)
Rcharriol May 11, 2020
5f3774a
[AIRFLOW-6921] Fetch celery states in bulk (#7542)
mik-laj May 11, 2020
d5c4001
Useful help information in test-target and docker-compose commands (#…
potiuk May 11, 2020
a6434a5
Fix bash command in performance test dag (#8812)
ashb May 11, 2020
0c3db84
[AIRFLOW-7068] Create EC2 Hook, Operator and Sensor (#7731)
mustafagok May 11, 2020
5ae76d8
Option to set end_date for performance testing dag. (#8817)
ashb May 11, 2020
2ec0130
[AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream (#7…
teddyhartanto May 11, 2020
1fb9f07
Synchronize extras between airflow and providers (#8819)
potiuk May 11, 2020
d590e5e
Add option to propagate tags in ECSOperator (#8811)
JPonte May 11, 2020
f410d64
Use fork when test relies on mock.patch in parent process. (#8794)
jhtimmins May 11, 2020
3ad4f96
[AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an …
kaxil May 11, 2020
4375607
Fix typo. 'zobmies' => 'zombies'. (#8832)
jhtimmins May 12, 2020
78a48db
Add support for non-default orientation in `dag show` command (#8834)
klsnreddy May 12, 2020
7533378
Access function to be pickled as attribute, not method, to avoid erro…
jhtimmins May 12, 2020
1d12c34
Refactor BigQuery check operators (#8813)
turbaszek May 12, 2020
4b06fde
Fix Flake8 errors (#8841)
kaxil May 12, 2020
6911dfe
Fix template fields in Google operators (#8840)
turbaszek May 12, 2020
01db738
Azure storage 0.37.0 is not installable any more (#8833)
potiuk May 12, 2020
578fc51
[AIRFLOW-4543] Update slack operator to support slackclient v2 (#5519)
serkef May 12, 2020
7236862
[AIRFLOW-2310] Enable AWS Glue Job Integration (#6007)
abdulbasitds May 12, 2020
8b54919
Refactor BigQuery hook methods to use python library (#8631)
turbaszek May 12, 2020
7d69987
Remove duplicate code from perf_kit (#8843)
kaxil May 12, 2020
e1e833b
Update GoogleBaseHook to not follow 308 and use 60s timeout (#8816)
waiyan1612 May 13, 2020
8a94d18
Fix Environment Variable in perf/scheduler_dag_execution_timing.py (#…
kaxil May 13, 2020
ed3f513
Correctly pass sleep time from AWSAthenaOperator down to the hook. (#…
ashb May 13, 2020
f1dc2e0
The librabbitmq library stopped installing for python3.7 (#8853)
potiuk May 13, 2020
c3af681
Convert tests/jobs/test_base_job.py to pytest (#8856)
ashb May 13, 2020
81fb9d6
Add metric for monitoring email notification failures (#8771)
May 13, 2020
2878f17
Relax Flask-Appbuilder version to ~=2.3.4 (#8857)
feluelle May 13, 2020
e61b9bb
Add AWS EMR System tests (#8618)
xinbinhuang May 13, 2020
fc862a3
Do not create a separate process for one task in CeleryExecutor (#8855)
mik-laj May 14, 2020
961c710
Make Custom XCom backend a subsection of XCom docs (#8869)
turbaszek May 14, 2020
fe42191
Don't use ProcessorAgent to test ProcessorManager (#8871)
ashb May 14, 2020
4813b94
Create log file w/abs path so tests pass on MacOS (#8820)
jhtimmins May 14, 2020
35c523f
Fix list formatting of plugins doc. (#8873)
ashb May 15, 2020
85bbab2
Add EMR operators howto docs (#8863)
xinbinhuang May 15, 2020
f82ad45
Fix KubernetesPodOperator pod name length validation (#8829)
dsaiztc May 15, 2020
92585ca
Added automated release notes generation for backport operators (#8807)
potiuk May 15, 2020
82de6f7
Spend less time waiting for DagFileProcessor processes to complete (#…
ashb May 15, 2020
a3a4bac
JIRA and Github issues explanation (#8539)
mschickensoup May 16, 2020
f4edd90
Speed up TestAwsLambdaHook by not actually running a function (#8882)
ashb May 16, 2020
15273f0
Check for same task instead of Equality to detect Duplicate Tasks (#8…
kaxil May 16, 2020
a3a3411
Fix master failing on generating requirements (#8885)
potiuk May 16, 2020
f3521fb
Regenerate readme files for backport package release (#8886)
potiuk May 16, 2020
f6d5917
Updated docs for experimental API /dags/<DAG_ID>/dag_runs (#8800)
randr97 May 16, 2020
707bb0c
[AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7…
jstern May 16, 2020
a546a10
Add Snowflake system test (#8422)
dhuang May 16, 2020
8985df0
Monitor pods by labels instead of names (#6377)
dimberman May 16, 2020
ff342fc
Added SalesforceHook missing method to return only dataframe (#8565) …
pranjalmittal May 17, 2020
12c5e5d
Prepare release candidate for backport packages (#8891)
potiuk May 17, 2020
2121f49
Avoid failure on transient requirements in CI image (#8892)
potiuk May 17, 2020
841d816
Allow setting the pooling time in DLPHook (#8824)
xuan616 May 19, 2020
dd57ec9
Fix task and dag stats on home page (#8865)
May 19, 2020
375d1ca
Release candidate 2 for backport packages 2020.05.20 (#8898)
potiuk May 19, 2020
bae5cc2
Fix race in Celery tests by pre-creating result tables (#8909)
ashb May 19, 2020
499493c
[AIRFLOW-6586] Improvements to gcs sensor (#7197)
May 19, 2020
ce7fdea
UX Fix: Prevent undesired text selection with DAG title selection in …
ryanahamilton May 19, 2020
fef00e5
Use Debian's provided JRE from Buster (#8919)
ashb May 20, 2020
5360045
Fix incorrect Env Var to stop Scheduler from creating DagRuns (#8920)
kaxil May 20, 2020
51d9557
Re-run all tests when Dockerfile or Github worflow change (#8924)
ashb May 20, 2020
c6224e2
Remove unused self.max_threads argument in SchedulerJob (#8935)
kaxil May 21, 2020
12c22e0
Added Greytip to Airflow Users list (#8887)
Sarankrishna May 21, 2020
8476c1e
Hive/Hadoop minicluster needs JDK8 and JAVA_HOME to work (#8938)
ashb May 21, 2020
f17b4bb
Fix DagRun Prefix for Performance script (#8934)
kaxil May 21, 2020
a9dfd7d
Remove side-effect of session in FAB (#8940)
mik-laj May 21, 2020
f3f74c7
Add TaskInstance state to TI Tooltip to be colour-blind friendlier (#…
harrisjoseph May 21, 2020
8d3acd7
Fix docstring in DagFileProcessor._schedule_task_instances (#8948)
kaxil May 21, 2020
47413d9
Remove singularity from CI images (#8945)
ashb May 21, 2020
16206cd
Update example webserver_config.py to show correct CSRF config (#8944)
ashb May 21, 2020
97b6cc7
Add note in Updating.md about the removel of DagRun.ID_PREFIX (#8949)
kaxil May 21, 2020
41481bb
Python base images are stored in cache (#8943)
potiuk May 21, 2020
b26b3ca
Don't hard-code constants in scheduler_dag_execution_timing (#8950)
ashb May 21, 2020
113982b
Make scheduler_dag_execution_timing grok dynamic start date of elasti…
ashb May 21, 2020
90a07d8
Cache 1 10 ci images (#8955)
dimberman May 21, 2020
dd72040
Pin Version of Azure Cosmos to <4 (#8956)
kaxil May 21, 2020
94a7673
Pin google-cloud-datacatalog to <0.8 (#8957)
kaxil May 21, 2020
9a4a2d1
[AIRFLOW-5262] Update timeout exception to include dag (#8466)
curiousjazz77 May 22, 2020
b055151
Add context to execution_date_fn in ExternalTaskSensor (#8702)
Acehaidrey May 22, 2020
f107338
Add support for spark python and submit tasks in Databricks operator(…
siddartha-ravichandran May 22, 2020
e742ef7
Fix typo in test_project_structure (#8978)
mik-laj May 23, 2020
4d67704
Remove duplicate line from CONTRIBUTING.rst (#8981)
kaxil May 23, 2020
db70da2
Flush pending Sentry exceptions before exiting (#7232)
mikeclarke May 23, 2020
cf5cf45
Support YAML input for CloudBuildCreateOperator (#8808)
joppevos May 23, 2020
bdb8369
Add secrets to test_deprecated_packages (#8979)
mik-laj May 23, 2020
f3456b1
Fix formatting code block in TESTING.rst (#8985)
ad-m May 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[AIRFLOW-6586] Improvements to gcs sensor (#7197)
* [AIRFLOW-6586] Improvements to gcs sensor

refactors GoogleCloudStorageUploadSessionCompleteSensor to use set instead of number of objects

add poke mode only decorator

assert that poke_mode_only applied to child of BaseSensorOperator

refactor tests

remove assert

[AIRFLOW-6586] Improvements to gcs sensor

refactors GoogleCloudStorageUploadSessionCompleteSensor to use set instead of number of objects

add poke mode only decorator

assert that poke_mode_only applied to child of BaseSensorOperator

remove assert

fix static checks

add back inadvertently remove requirements

pre-commit

fix typo

* gix gcs sensor unit test

* move poke_mode_only to base_sensor_operator module

* add sensor / poke_mode_only docs

* fix ci check add sensor how-to docs

* Update airflow/providers/google/cloud/sensors/gcs.py

Co-authored-by: Tomek Urbaszek <[email protected]>

* Update airflow/sensors/base_sensor_operator.py

Co-authored-by: Tomek Urbaszek <[email protected]>

* Update airflow/sensors/base_sensor_operator.py

Co-authored-by: Kamil Breguła <[email protected]>

* simplify class decorator

* remove type hint

* add note to UPDATING.md

* remove unecessary declaration of class member

* Fix to kwargs in UPDATING.md

Co-authored-by: Tomek Urbaszek <[email protected]>
Co-authored-by: Kamil Breguła <[email protected]>
  • Loading branch information
3 people authored May 19, 2020
commit 499493c5c5cf324ab8452ead80a10b71ce0c3b14
23 changes: 23 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 172,29 @@ plugins =
- Added optional project_id argument to DataflowCreatePythonJobOperator
constructor.

### GCSUploadSessionCompleteSensor signature change

To provide more precise control in handling of changes to objects in
underlying GCS Bucket the constructor of this sensor now has changed.

- Old Behavior: This constructor used to optionally take ``previous_num_objects: int``.
- New replacement constructor kwarg: ``previous_objects: Optional[Set[str]]``.

Most users would not specify this argument because the bucket begins empty
and the user wants to treat any files as new.

Example of Updating usage of this sensor:
Users who used to call:

``GCSUploadSessionCompleteSensor(bucket='my_bucket', prefix='my_prefix', previous_num_objects=1)``

Will now call:

``GCSUploadSessionCompleteSensor(bucket='my_bucket', prefix='my_prefix', previous_num_objects={'.keep'})``

Where '.keep' is a single file at your prefix that the sensor should not consider new.


### Rename pool statsd metrics

Used slot has been renamed to running slot to make the name self-explanatory
Expand Down
52 changes: 32 additions & 20 deletions airflow/providers/google/cloud/sensors/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 21,11 @@

import os
from datetime import datetime
from typing import Callable, List, Optional
from typing import Callable, List, Optional, Set

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator, poke_mode_only
from airflow.utils.decorators import apply_defaults


Expand Down Expand Up @@ -189,12 189,14 @@ def get_time():
return datetime.now()


@poke_mode_only
class GCSUploadSessionCompleteSensor(BaseSensorOperator):
"""
Checks for changes in the number of objects at prefix in Google Cloud Storage
bucket and returns True if the inactivity period has passed with no
increase in the number of objects. Note, it is recommended to use reschedule
mode if you expect this sensor to run for hours.
increase in the number of objects. Note, this sensor will no behave correctly
in reschedule mode, as the state of the listed objects in the GCS bucket will
be lost between rescheduled invocations.

:param bucket: The Google Cloud Storage bucket where the objects are.
expected.
Expand All @@ -209,8 211,8 @@ class GCSUploadSessionCompleteSensor(BaseSensorOperator):
:param min_objects: The minimum number of objects needed for upload session
to be considered valid.
:type min_objects: int
:param previous_num_objects: The number of objects found during the last poke.
:type previous_num_objects: int
:param previous_objects: The set of object ids found during the last poke.
:type previous_objects: set[str]
:param allow_delete: Should this sensor consider objects being deleted
between pokes valid behavior. If true a warning message will be logged
when this happens. If false an error will be raised.
Expand All @@ -233,7 235,7 @@ def __init__(self,
prefix: str,
inactivity_period: float = 60 * 60,
min_objects: int = 1,
previous_num_objects: int = 0,
previous_objects: Optional[Set[str]] = None,
allow_delete: bool = True,
google_cloud_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
Expand All @@ -243,45 245,56 @@ def __init__(self,

self.bucket = bucket
self.prefix = prefix
if inactivity_period < 0:
raise ValueError("inactivity_period must be non-negative")
self.inactivity_period = inactivity_period
self.min_objects = min_objects
self.previous_num_objects = previous_num_objects
self.previous_objects = previous_objects if previous_objects else set()
self.inactivity_seconds = 0
self.allow_delete = allow_delete
self.google_cloud_conn_id = google_cloud_conn_id
self.delegate_to = delegate_to
self.last_activity_time = None
self.hook = None

def is_bucket_updated(self, current_num_objects: int) -> bool:
def _get_gcs_hook(self):
if not self.hook:
self.hook = GCSHook()
return self.hook

def is_bucket_updated(self, current_objects: Set[str]) -> bool:
"""
Checks whether new objects have been uploaded and the inactivity_period
has passed and updates the state of the sensor accordingly.

:param current_num_objects: number of objects in bucket during last poke.
:type current_num_objects: int
:param current_objects: set of object ids in bucket during last poke.
:type current_objects: set[str]
"""

if current_num_objects > self.previous_num_objects:
current_num_objects = len(current_objects)
if current_objects > self.previous_objects:
# When new objects arrived, reset the inactivity_seconds
# previous_num_objects for the next poke.
# and update previous_objects for the next poke.
self.log.info("New objects found at %s resetting last_activity_time.",
os.path.join(self.bucket, self.prefix))
self.log.debug("New objects: %s",
"\n".join(current_objects - self.previous_objects))
self.last_activity_time = get_time()
self.inactivity_seconds = 0
self.previous_num_objects = current_num_objects
self.previous_objects = current_objects
return False

if current_num_objects < self.previous_num_objects:
if self.previous_objects - current_objects:
# During the last poke interval objects were deleted.
if self.allow_delete:
self.previous_num_objects = current_num_objects
self.previous_objects = current_objects
self.last_activity_time = get_time()
self.log.warning(
"""
Objects were deleted during the last
poke interval. Updating the file counter and
resetting last_activity_time.
"""
%s
""", self.previous_objects - current_objects
)
return False

Expand Down Expand Up @@ -314,5 327,4 @@ def is_bucket_updated(self, current_num_objects: int) -> bool:
return False

def poke(self, context):
hook = GCSHook()
return self.is_bucket_updated(len(hook.list(self.bucket, prefix=self.prefix)))
return self.is_bucket_updated(set(self._get_gcs_hook().list(self.bucket, prefix=self.prefix)))
39 changes: 39 additions & 0 deletions airflow/sensors/base_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 17,7 @@
# under the License.

import hashlib
import os
from datetime import timedelta
from time import sleep
from typing import Any, Dict, Iterable
Expand Down Expand Up @@ -176,3 177,41 @@ def deps(self):
if self.reschedule:
return BaseOperator.deps.fget(self) | {ReadyToRescheduleDep()}
return BaseOperator.deps.fget(self)


def poke_mode_only(cls):
"""
Class Decorator for child classes of BaseSensorOperator to indicate
that instances of this class are only safe to use poke mode.

Will decorate all methods in the class to assert they did not change
the mode from 'poke'.

:param cls: BaseSensor class to enforce methods only use 'poke' mode.
:type cls: type
"""
def decorate(cls_type):
def mode_getter(_):
return 'poke'

def mode_setter(_, value):
if value != 'poke':
raise ValueError(
f"cannot set mode to 'poke'.")

if not issubclass(cls_type, BaseSensorOperator):
raise ValueError(f"poke_mode_only decorator should only be "
f"applied to subclasses of BaseSensorOperator,"
f" got:{cls_type}.")

cls_type.mode = property(mode_getter, mode_setter)

return cls_type

return decorate(cls)


if 'BUILDING_AIRFLOW_DOCS' in os.environ:
# flake8: noqa: F811
# Monkey patch hook to get good function headers while building docs
apply_defaults = lambda x: x
24 changes: 24 additions & 0 deletions docs/howto/custom-operator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 206,27 @@ Define an operator extra link
For your operator, you can :doc:`Define an extra link <define_extra_link>` that can
redirect users to external systems. For example, you can add a link that redirects
the user to the operator's manual.

Sensors
^^^^^^^^
Airflow provides a primitive for a special kind of operator, whose purpose is to
poll some state (e.g. presence of a file) on a regular interval until a
success criteria is met.

You can create any sensor your want by extending the :class:`airflow.sensors.base_sensor_operator.BaseSensorOperator`
defining a ``poke`` method to poll your external state and evaluate the success criteria.

Sensors have a powerful feature called ``'reschedule'`` mode which allows the sensor to
task to be rescheduled, rather than blocking a worker slot between pokes.
This is useful when you can tolerate a longer poll interval and expect to be
polling for a long time.

Reschedule mode comes with a caveat that your sensor cannot maintain internal state
between rescheduled executions. In this case you should decorate your sensor with
:meth:`airflow.sensors.base_sensor_operator.poke_mode_only`. This will let users know
that your sensor is not suitable for use with reschedule mode.

An example of a sensor that keeps internal state and cannot be used with reschedule mode
is :class:`airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor`.
It polls the number of objects at a prefix (this number is the internal state of the sensor)
and succeeds when there a certain amount of time has passed without the number of objects changing.
44 changes: 22 additions & 22 deletions tests/providers/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 42,7 @@
DEFAULT_DATE = datetime(2015, 1, 1)

MOCK_DATE_ARRAY = [datetime(2019, 2, 24, 12, 0, 0) - i * timedelta(seconds=10)
for i in range(20)]
for i in range(25)]


def next_time_side_effect():
Expand Down Expand Up @@ -212,17 212,16 @@ def setUp(self):
poke_interval=10,
min_objects=1,
allow_delete=False,
previous_num_objects=0,
dag=self.dag
)

self.last_mocked_date = datetime(2019, 4, 24, 0, 0, 0)

@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_files_deleted_between_pokes_throw_error(self):
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a', 'b'})
with self.assertRaises(AirflowException):
self.sensor.is_bucket_updated(1)
self.sensor.is_bucket_updated({'a'})

@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_files_deleted_between_pokes_allow_delete(self):
Expand All @@ -234,48 233,49 @@ def test_files_deleted_between_pokes_allow_delete(self):
poke_interval=10,
min_objects=1,
allow_delete=True,
previous_num_objects=0,
dag=self.dag
)
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a', 'b'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(1)
self.assertEqual(self.sensor.previous_num_objects, 1)
self.sensor.is_bucket_updated({'a'})
self.assertEqual(len(self.sensor.previous_objects), 1)
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a', 'c'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a', 'd'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated({'a', 'd'})
self.assertEqual(self.sensor.inactivity_seconds, 10)
self.assertTrue(self.sensor.is_bucket_updated(2))
self.assertTrue(self.sensor.is_bucket_updated({'a', 'd'}))

@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_incoming_data(self):
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(3)
self.sensor.is_bucket_updated({'a', 'b'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(4)
self.sensor.is_bucket_updated({'a', 'b', 'c'})
self.assertEqual(self.sensor.inactivity_seconds, 0)

@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_no_new_data(self):
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a'})
self.assertEqual(self.sensor.inactivity_seconds, 10)

@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_no_new_data_success_criteria(self):
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a'})
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(2)
self.sensor.is_bucket_updated({'a'})
self.assertEqual(self.sensor.inactivity_seconds, 10)
self.assertTrue(self.sensor.is_bucket_updated(2))
self.assertTrue(self.sensor.is_bucket_updated({'a'}))

@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_not_enough_objects(self):
self.sensor.is_bucket_updated(0)
self.sensor.is_bucket_updated(set())
self.assertEqual(self.sensor.inactivity_seconds, 0)
self.sensor.is_bucket_updated(0)
self.sensor.is_bucket_updated(set())
self.assertEqual(self.sensor.inactivity_seconds, 10)
self.assertFalse(self.sensor.is_bucket_updated(0))
self.assertFalse(self.sensor.is_bucket_updated(set()))
Loading