Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create guide for Machine Learning Engine operators #8207 #8968

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
155 commits
Select commit Hold shift click to select a range
a28c66f
[AIRFLOW-4734] Upsert functionality for PostgresHook.insert_rows() (#…
oxymor0n Apr 30, 2020
4a1d71d
Fix the process of requirements generations (#8648)
potiuk Apr 30, 2020
b185b36
Reduce response payload size of /dag_stats and /task_stats (#8633)
XD-DENG Apr 30, 2020
4421f01
Improve template capabilities of EMR job and step operators (#8572)
oripwk May 1, 2020
6560f29
Enhanced documentation around Cluster Policy (#8661)
vardancse May 1, 2020
511d98e
[AIRFLOW-4363] Fix JSON encoding error (#8287)
retornam May 1, 2020
ce50538
Add check for pre-2.0 style hostname_callable config value (#8637)
dimberman May 1, 2020
0a7b500
Fix displaying Executor Class Name in "Base Job" table (#8679)
kaxil May 2, 2020
d92e848
Persist start/end date and duration for DummyOperator Task Instance (…
XD-DENG May 2, 2020
0954140
Ensure "started"/"ended" in tooltips are not shown if job not started…
XD-DENG May 2, 2020
19ac45a
Add support for fetching logs from running pods (#8626)
msumit May 3, 2020
1100cea
Remove _get_pretty_exception_message in PrestoHook
fusuiyi123 May 3, 2020
62796b9
Improve tutorial - Include all imports statements (#8670)
Lyalpha May 3, 2020
dd6a7bc
Group Google services in one section (#8623)
mik-laj May 3, 2020
ac59735
Refactor test_variable_command.py (#8535)
May 3, 2020
bc45fa6
Add system test and docs for Facebook Ads operators (#8503)
randr97 May 3, 2020
0b598a2
Fix connection add/edit for spark (#8685)
XD-DENG May 3, 2020
ffbbbfc
Sort connection type list in add/edit page alphabetically (#8692)
XD-DENG May 3, 2020
d8cb0b5
Support k8s auth method in Vault Secrets provider (#8640)
May 3, 2020
67caae0
Add system test for gcs_to_bigquery (#8556)
joppevos May 4, 2020
aec768b
[AIRFLOW-7008] Add perf kit with common used decorators/contexts (#7650)
mik-laj May 4, 2020
c3a46b9
Invalid output in test_variable assertion (#8698)
mik-laj May 4, 2020
5ddc458
Change provider:GCP to provider:Google for Labeler Bot (#8697)
mik-laj May 4, 2020
caa60b1
Remove config side effects from tests (#8607)
turbaszek May 4, 2020
923f423
Check consistency between the reference list and howto directory (#8690)
mik-laj May 4, 2020
b31ad51
Prevent clickable sorting on non sortable columns in TI view (#8681)
Acehaidrey May 4, 2020
6600e47
Import Connection directly from multiprocessing.connection. (#8711)
jhtimmins May 4, 2020
2c92a29
Fix typo in Google Display & Video 360 guide
michalslowikowski00 May 5, 2020
41b4c27
Carefully parse warning messages when building documentation (#8693)
mik-laj May 5, 2020
8d6f1aa
Support num_retries field in env var for GCP connection (#8700)
mik-laj May 5, 2020
c717d12
Add __repr__ for DagTag so tags display properly in /dagmodel/show (#…
XD-DENG May 5, 2020
487b5cc
Add guide for Apache Spark operators (#8305)
May 5, 2020
520aeed
Fix pickling failure when spawning processes (#8671)
jhtimmins May 6, 2020
25ee421
Support all RuntimeEnvironment parameters in DataflowTemplatedJobStar…
mik-laj May 6, 2020
d923b5b
Add jinja template test for AirflowVersion (#8505)
mik-laj May 6, 2020
e673413
Avoid loading executors in jobs (#7888)
mik-laj May 6, 2020
3437bea
Optimize count query on /home (#8729)
mik-laj May 6, 2020
336aa27
Correctly deserialize dagrun_timeout field on DAGs (#8735)
ashb May 6, 2020
2e9ef45
Stop Stalebot on Github issues (#8738)
kaxil May 6, 2020
fd6e057
Make loading plugins from entrypoint fault-tolerant (#8732)
kaxil May 6, 2020
bd29ee3
Ensure test_logging_config.test_reload_module works in spawn mode. (#…
jhtimmins May 6, 2020
d15839d
Latest debian-buster release broke image build (#8758)
potiuk May 7, 2020
ff5b701
Add google_api_to_s3_transfer example dags and system tests (#8581)
feluelle May 7, 2020
7c04604
Add google_api_to_s3_transfer docs howto link (#8761)
feluelle May 7, 2020
723c52c
Add documentation for SpannerDeployInstanceOperator (#8750)
ephraimbuddy May 7, 2020
6e4f5fa
[AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise …
lokeshlal May 7, 2020
b7566e1
Add SQL query tracking for pytest (#8754)
mik-laj May 8, 2020
58aefb2
Added SDFtoGCSOperator (#8740)
michalslowikowski00 May 8, 2020
b37ce29
Patch Pool.DEFAULT_POOL_NAME in BaseOperator (#8587)
vshshjn7 May 8, 2020
2bd3e76
Support same short flags for `create user` as 1.10 did for `user_crea…
ashb May 8, 2020
09770e4
Add WorldRemit as Airflow user (#8786)
May 8, 2020
a091c1f
fix typing errors reported by dmypy (#8773)
May 8, 2020
42c5975
Update example SingularityOperator DAG (#8790)
ashb May 8, 2020
791d1a7
Backport packages are renamed to include backport in their name (#8767)
potiuk May 9, 2020
100f530
Fixed test-target command (#8795)
potiuk May 9, 2020
db1b51d
Make celery worker_prefetch_multiplier configurable (#8695)
nadflinn May 9, 2020
bc19778
[AIP-31] Implement XComArg to pass output from one operator to the ne…
jonathanshir May 9, 2020
7506c73
Add default `conf` parameter to Spark JDBC Hook (#8787)
May 9, 2020
5e1c33a
Fix docs on creating CustomOperator (#8678)
JonnyWaffles May 10, 2020
21cc7d7
Document default timeout value for SSHOperator (#8744)
abhilash1in May 10, 2020
cd635dd
[AIRFLOW-5906] Add authenticator parameter to snowflake_hook (#8642)
koszti May 10, 2020
c7788a6
Add imap_attachment_to_s3 example dag and system test (#8669)
feluelle May 10, 2020
a715aa6
Correctly store non-default Nones in serialized tasks/dags (#8772)
ashb May 10, 2020
280f1f0
Correctly restore upstream_task_ids when deserializing Operators (#8775)
ashb May 10, 2020
cbebed2
Allow passing backend_kwargs to AWS SSM client (#8802)
kaxil May 10, 2020
79ef8be
Added Upload Multiple Entity Read Files to specified big query datase…
michalslowikowski00 May 10, 2020
e1cc17e
Remove old airflow logger causing side effects in tests (#8746)
kaxil May 10, 2020
9bb91ef
Add comments to breeze scripts (#8797)
potiuk May 10, 2020
493b685
Add separate example DAGs and system tests for google cloud speech (#…
ephraimbuddy May 10, 2020
bed1995
Avoid color info in response of /dag_stats & /task_stats (#8742)
XD-DENG May 11, 2020
b59adab
Support cron presets in date_range function (#7777)
Rcharriol May 11, 2020
5f3774a
[AIRFLOW-6921] Fetch celery states in bulk (#7542)
mik-laj May 11, 2020
d5c4001
Useful help information in test-target and docker-compose commands (#…
potiuk May 11, 2020
a6434a5
Fix bash command in performance test dag (#8812)
ashb May 11, 2020
0c3db84
[AIRFLOW-7068] Create EC2 Hook, Operator and Sensor (#7731)
mustafagok May 11, 2020
5ae76d8
Option to set end_date for performance testing dag. (#8817)
ashb May 11, 2020
2ec0130
[AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream (#7…
teddyhartanto May 11, 2020
1fb9f07
Synchronize extras between airflow and providers (#8819)
potiuk May 11, 2020
d590e5e
Add option to propagate tags in ECSOperator (#8811)
JPonte May 11, 2020
f410d64
Use fork when test relies on mock.patch in parent process. (#8794)
jhtimmins May 11, 2020
3ad4f96
[AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an …
kaxil May 11, 2020
4375607
Fix typo. 'zobmies' => 'zombies'. (#8832)
jhtimmins May 12, 2020
78a48db
Add support for non-default orientation in `dag show` command (#8834)
klsnreddy May 12, 2020
7533378
Access function to be pickled as attribute, not method, to avoid erro…
jhtimmins May 12, 2020
1d12c34
Refactor BigQuery check operators (#8813)
turbaszek May 12, 2020
4b06fde
Fix Flake8 errors (#8841)
kaxil May 12, 2020
6911dfe
Fix template fields in Google operators (#8840)
turbaszek May 12, 2020
01db738
Azure storage 0.37.0 is not installable any more (#8833)
potiuk May 12, 2020
578fc51
[AIRFLOW-4543] Update slack operator to support slackclient v2 (#5519)
serkef May 12, 2020
7236862
[AIRFLOW-2310] Enable AWS Glue Job Integration (#6007)
abdulbasitds May 12, 2020
8b54919
Refactor BigQuery hook methods to use python library (#8631)
turbaszek May 12, 2020
7d69987
Remove duplicate code from perf_kit (#8843)
kaxil May 12, 2020
e1e833b
Update GoogleBaseHook to not follow 308 and use 60s timeout (#8816)
waiyan1612 May 13, 2020
8a94d18
Fix Environment Variable in perf/scheduler_dag_execution_timing.py (#…
kaxil May 13, 2020
ed3f513
Correctly pass sleep time from AWSAthenaOperator down to the hook. (#…
ashb May 13, 2020
f1dc2e0
The librabbitmq library stopped installing for python3.7 (#8853)
potiuk May 13, 2020
c3af681
Convert tests/jobs/test_base_job.py to pytest (#8856)
ashb May 13, 2020
81fb9d6
Add metric for monitoring email notification failures (#8771)
May 13, 2020
2878f17
Relax Flask-Appbuilder version to ~=2.3.4 (#8857)
feluelle May 13, 2020
e61b9bb
Add AWS EMR System tests (#8618)
xinbinhuang May 13, 2020
fc862a3
Do not create a separate process for one task in CeleryExecutor (#8855)
mik-laj May 14, 2020
961c710
Make Custom XCom backend a subsection of XCom docs (#8869)
turbaszek May 14, 2020
fe42191
Don't use ProcessorAgent to test ProcessorManager (#8871)
ashb May 14, 2020
4813b94
Create log file w/abs path so tests pass on MacOS (#8820)
jhtimmins May 14, 2020
35c523f
Fix list formatting of plugins doc. (#8873)
ashb May 15, 2020
85bbab2
Add EMR operators howto docs (#8863)
xinbinhuang May 15, 2020
f82ad45
Fix KubernetesPodOperator pod name length validation (#8829)
dsaiztc May 15, 2020
92585ca
Added automated release notes generation for backport operators (#8807)
potiuk May 15, 2020
82de6f7
Spend less time waiting for DagFileProcessor processes to complete (#…
ashb May 15, 2020
a3a4bac
JIRA and Github issues explanation (#8539)
mschickensoup May 16, 2020
f4edd90
Speed up TestAwsLambdaHook by not actually running a function (#8882)
ashb May 16, 2020
15273f0
Check for same task instead of Equality to detect Duplicate Tasks (#8…
kaxil May 16, 2020
a3a3411
Fix master failing on generating requirements (#8885)
potiuk May 16, 2020
f3521fb
Regenerate readme files for backport package release (#8886)
potiuk May 16, 2020
f6d5917
Updated docs for experimental API /dags/<DAG_ID>/dag_runs (#8800)
randr97 May 16, 2020
707bb0c
[AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7…
jstern May 16, 2020
a546a10
Add Snowflake system test (#8422)
dhuang May 16, 2020
8985df0
Monitor pods by labels instead of names (#6377)
dimberman May 16, 2020
ff342fc
Added SalesforceHook missing method to return only dataframe (#8565) …
pranjalmittal May 17, 2020
12c5e5d
Prepare release candidate for backport packages (#8891)
potiuk May 17, 2020
2121f49
Avoid failure on transient requirements in CI image (#8892)
potiuk May 17, 2020
841d816
Allow setting the pooling time in DLPHook (#8824)
xuan616 May 19, 2020
dd57ec9
Fix task and dag stats on home page (#8865)
May 19, 2020
375d1ca
Release candidate 2 for backport packages 2020.05.20 (#8898)
potiuk May 19, 2020
bae5cc2
Fix race in Celery tests by pre-creating result tables (#8909)
ashb May 19, 2020
499493c
[AIRFLOW-6586] Improvements to gcs sensor (#7197)
May 19, 2020
ce7fdea
UX Fix: Prevent undesired text selection with DAG title selection in …
ryanahamilton May 19, 2020
fef00e5
Use Debian's provided JRE from Buster (#8919)
ashb May 20, 2020
5360045
Fix incorrect Env Var to stop Scheduler from creating DagRuns (#8920)
kaxil May 20, 2020
51d9557
Re-run all tests when Dockerfile or Github worflow change (#8924)
ashb May 20, 2020
c6224e2
Remove unused self.max_threads argument in SchedulerJob (#8935)
kaxil May 21, 2020
12c22e0
Added Greytip to Airflow Users list (#8887)
Sarankrishna May 21, 2020
8476c1e
Hive/Hadoop minicluster needs JDK8 and JAVA_HOME to work (#8938)
ashb May 21, 2020
f17b4bb
Fix DagRun Prefix for Performance script (#8934)
kaxil May 21, 2020
a9dfd7d
Remove side-effect of session in FAB (#8940)
mik-laj May 21, 2020
f3f74c7
Add TaskInstance state to TI Tooltip to be colour-blind friendlier (#…
harrisjoseph May 21, 2020
8d3acd7
Fix docstring in DagFileProcessor._schedule_task_instances (#8948)
kaxil May 21, 2020
47413d9
Remove singularity from CI images (#8945)
ashb May 21, 2020
16206cd
Update example webserver_config.py to show correct CSRF config (#8944)
ashb May 21, 2020
97b6cc7
Add note in Updating.md about the removel of DagRun.ID_PREFIX (#8949)
kaxil May 21, 2020
41481bb
Python base images are stored in cache (#8943)
potiuk May 21, 2020
b26b3ca
Don't hard-code constants in scheduler_dag_execution_timing (#8950)
ashb May 21, 2020
113982b
Make scheduler_dag_execution_timing grok dynamic start date of elasti…
ashb May 21, 2020
90a07d8
Cache 1 10 ci images (#8955)
dimberman May 21, 2020
dd72040
Pin Version of Azure Cosmos to <4 (#8956)
kaxil May 21, 2020
94a7673
Pin google-cloud-datacatalog to <0.8 (#8957)
kaxil May 21, 2020
9a4a2d1
[AIRFLOW-5262] Update timeout exception to include dag (#8466)
curiousjazz77 May 22, 2020
b055151
Add context to execution_date_fn in ExternalTaskSensor (#8702)
Acehaidrey May 22, 2020
f107338
Add support for spark python and submit tasks in Databricks operator(…
siddartha-ravichandran May 22, 2020
e742ef7
Fix typo in test_project_structure (#8978)
mik-laj May 23, 2020
4d67704
Remove duplicate line from CONTRIBUTING.rst (#8981)
kaxil May 23, 2020
db70da2
Flush pending Sentry exceptions before exiting (#7232)
mikeclarke May 23, 2020
cf5cf45
Support YAML input for CloudBuildCreateOperator (#8808)
joppevos May 23, 2020
bdb8369
Add secrets to test_deprecated_packages (#8979)
mik-laj May 23, 2020
f3456b1
Fix formatting code block in TESTING.rst (#8985)
ad-m May 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Check for same task instead of Equality to detect Duplicate Tasks (#8828
)
  • Loading branch information
kaxil authored May 16, 2020
commit 15273f0ea05ec579c631ce26b5d620233ebdc4d2
2 changes: 1 addition & 1 deletion airflow/example_dags/example_complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 81,7 @@
)

create_tag_template_field_result2 = BashOperator(
task_id="create_tag_template_field_result", bash_command="echo create_tag_template_field_result"
task_id="create_tag_template_field_result2", bash_command="echo create_tag_template_field_result"
)

# Delete
Expand Down
7 changes: 3 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 36,7 @@
from sqlalchemy.orm import Session

from airflow.configuration import conf
from airflow.exceptions import AirflowException, DuplicateTaskIdFound
from airflow.exceptions import AirflowException
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.base import Operator
from airflow.models.pool import Pool
Expand Down Expand Up @@ -600,9 600,8 @@ def dag(self, dag: Any):
"The DAG assigned to {} can not be changed.".format(self))
elif self.task_id not in dag.task_dict:
dag.add_task(self)
elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] != self:
raise DuplicateTaskIdFound(
"Task id '{}' has already been added to the DAG".format(self.task_id))
elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self:
dag.add_task(self)

self._dag = dag # pylint: disable=attribute-defined-outside-init

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 1337,7 @@ def add_task(self, task):
elif task.end_date and self.end_date:
task.end_date = min(task.end_date, self.end_date)

if task.task_id in self.task_dict and self.task_dict[task.task_id] != task:
if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:
raise DuplicateTaskIdFound(
"Task id '{}' has already been added to the DAG".format(task.task_id))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 181,7 @@

# [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
create_tag_template_field_result2 = BashOperator(
task_id="create_tag_template_field_result",
task_id="create_tag_template_field_result2",
bash_command="echo \"{{ task_instance.xcom_pull('create_tag_template_field') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 100,7 @@

# [START howto_operator_gcs_to_gcs_delimiter]
copy_files_with_delimiter = GCSToGCSOperator(
task_id="copy_files_with_wildcard",
task_id="copy_files_with_delimiter",
source_bucket=BUCKET_1_SRC,
source_object="data/",
destination_bucket=BUCKET_1_DST,
Expand Down
18 changes: 1 addition & 17 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,34 979,18 @@ def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):

self.assertEqual(dag.task_dict, {op1.task_id: op1})

# Also verify that DAGs with duplicate task_ids don't raise errors
with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
op3 = DummyOperator(task_id="t3")
op4 = BashOperator(task_id="t4", bash_command="sleep 1")
op3 >> op4

self.assertEqual(dag1.task_dict, {op3.task_id: op3, op4.task_id: op4})

def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
"""Verify tasks with Duplicate task_id raises error"""
with self.assertRaisesRegex(
DuplicateTaskIdFound, "Task id 't1' has already been added to the DAG"
):
dag = DAG("test_dag", start_date=DEFAULT_DATE)
op1 = DummyOperator(task_id="t1", dag=dag)
op2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
op2 = DummyOperator(task_id="t1", dag=dag)
op1 >> op2

self.assertEqual(dag.task_dict, {op1.task_id: op1})

# Also verify that DAGs with duplicate task_ids don't raise errors
dag1 = DAG("test_dag_1", start_date=DEFAULT_DATE)
op3 = DummyOperator(task_id="t3", dag=dag1)
op4 = DummyOperator(task_id="t4", dag=dag1)
op3 >> op4

self.assertEqual(dag1.task_dict, {op3.task_id: op3, op4.task_id: op4})

def test_duplicate_task_ids_for_same_task_is_allowed(self):
"""Verify that same tasks with Duplicate task_id do not raise error"""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
Expand Down
16 changes: 8 additions & 8 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,20 373,20 @@ def test_ti_updates_with_task(self, session=None):
"""
test that updating the executor_config propogates to the TaskInstance DB
"""
dag = models.DAG(dag_id='test_run_pooling_task')
task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow',
executor_config={'foo': 'bar'},
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
with models.DAG(dag_id='test_run_pooling_task') as dag:
task = DummyOperator(task_id='test_run_pooling_task_op', owner='airflow',
executor_config={'foo': 'bar'},
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
ti = TI(
task=task, execution_date=timezone.utcnow())

ti.run(session=session)
tis = dag.get_task_instances()
self.assertEqual({'foo': 'bar'}, tis[0].executor_config)

task2 = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow',
executor_config={'bar': 'baz'},
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
with models.DAG(dag_id='test_run_pooling_task') as dag:
task2 = DummyOperator(task_id='test_run_pooling_task_op', owner='airflow',
executor_config={'bar': 'baz'},
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))

ti = TI(
task=task2, execution_date=timezone.utcnow())
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/amazon/aws/operators/test_s3_to_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 133,7 @@ def test_s3_to_sftp_operation(self):
def delete_remote_resource(self):
# check the remote file content
remove_file_task = SSHOperator(
task_id="test_check_file",
task_id="test_rm_file",
ssh_hook=self.hook,
command="rm {0}".format(self.sftp_path),
do_xcom_push=True,
Expand Down
42 changes: 24 additions & 18 deletions tests/providers/google/cloud/operators/test_mlengine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 131,18 @@ def test_successful_run(self):
self.assertEqual('err=0.9', result)

def test_failures(self):
dag = DAG(
'test_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'end_date': DEFAULT_DATE,
'project_id': 'test-project',
'region': 'us-east1',
},
schedule_interval='@daily')
def create_test_dag(dag_id):
dag = DAG(
dag_id,
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'end_date': DEFAULT_DATE,
'project_id': 'test-project',
'region': 'us-east1',
},
schedule_interval='@daily')
return dag

input_with_model = self.INPUT_MISSING_ORIGIN.copy()
other_params_but_models = {
Expand All @@ -151,26 153,30 @@ def test_failures(self):
'prediction_path': input_with_model['outputPath'],
'metric_fn_and_keys': (self.metric_fn, ['err']),
'validate_fn': (lambda x: 'err=%.1f' % x['err']),
'dag': dag,
}

with self.assertRaisesRegex(AirflowException, 'Missing model origin'):
mlengine_operator_utils.create_evaluate_ops(**other_params_but_models)
mlengine_operator_utils.create_evaluate_ops(
dag=create_test_dag('test_dag_1'), **other_params_but_models)

with self.assertRaisesRegex(AirflowException, 'Ambiguous model origin'):
mlengine_operator_utils.create_evaluate_ops(model_uri='abc', model_name='cde',
**other_params_but_models)
mlengine_operator_utils.create_evaluate_ops(
dag=create_test_dag('test_dag_2'), model_uri='abc', model_name='cde',
**other_params_but_models)

with self.assertRaisesRegex(AirflowException, 'Ambiguous model origin'):
mlengine_operator_utils.create_evaluate_ops(model_uri='abc', version_name='vvv',
**other_params_but_models)
mlengine_operator_utils.create_evaluate_ops(
dag=create_test_dag('test_dag_3'), model_uri='abc', version_name='vvv',
**other_params_but_models)

with self.assertRaisesRegex(AirflowException, '`metric_fn` param must be callable'):
params = other_params_but_models.copy()
params['metric_fn_and_keys'] = (None, ['abc'])
mlengine_operator_utils.create_evaluate_ops(model_uri='gs://blah', **params)
mlengine_operator_utils.create_evaluate_ops(
dag=create_test_dag('test_dag_4'), model_uri='gs://blah', **params)

with self.assertRaisesRegex(AirflowException, '`validate_fn` param must be callable'):
params = other_params_but_models.copy()
params['validate_fn'] = None
mlengine_operator_utils.create_evaluate_ops(model_uri='gs://blah', **params)
mlengine_operator_utils.create_evaluate_ops(
dag=create_test_dag('test_dag_5'), model_uri='gs://blah', **params)
4 changes: 2 additions & 2 deletions tests/providers/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 205,7 @@ def setUp(self):
self.dag = dag

self.sensor = GCSUploadSessionCompleteSensor(
task_id='sensor',
task_id='sensor_1',
bucket='test-bucket',
prefix='test-prefix/path',
inactivity_period=12,
Expand All @@ -227,7 227,7 @@ def test_files_deleted_between_pokes_throw_error(self):
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_files_deleted_between_pokes_allow_delete(self):
self.sensor = GCSUploadSessionCompleteSensor(
task_id='sensor',
task_id='sensor_2',
bucket='test-bucket',
prefix='test-prefix/path',
inactivity_period=12,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 45,7 @@ def setUp(self):

def test_init(self):
operator = FileToWasbOperator(
task_id='wasb_operator',
task_id='wasb_operator_1',
dag=self.dag,
**self._config
)
Expand All @@ -58,7 58,7 @@ def test_init(self):
self.assertEqual(operator.retries, self._config['retries'])

operator = FileToWasbOperator(
task_id='wasb_operator',
task_id='wasb_operator_2',
dag=self.dag,
load_options={'timeout': 2},
**self._config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 42,7 @@ def setUp(self):

def test_init(self):
operator = WasbDeleteBlobOperator(
task_id='wasb_operator',
task_id='wasb_operator_1',
dag=self.dag,
**self._config
)
Expand All @@ -53,7 53,7 @@ def test_init(self):
self.assertEqual(operator.ignore_if_missing, False)

operator = WasbDeleteBlobOperator(
task_id='wasb_operator',
task_id='wasb_operator_2',
dag=self.dag,
is_prefix=True,
ignore_if_missing=True,
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/microsoft/azure/sensors/test_wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 43,7 @@ def setUp(self):

def test_init(self):
sensor = WasbBlobSensor(
task_id='wasb_sensor',
task_id='wasb_sensor_1',
dag=self.dag,
**self._config
)
Expand All @@ -54,7 54,7 @@ def test_init(self):
self.assertEqual(sensor.timeout, self._config['timeout'])

sensor = WasbBlobSensor(
task_id='wasb_sensor',
task_id='wasb_sensor_2',
dag=self.dag,
check_options={'timeout': 2},
**self._config
Expand Down Expand Up @@ -94,7 94,7 @@ def setUp(self):

def test_init(self):
sensor = WasbPrefixSensor(
task_id='wasb_sensor',
task_id='wasb_sensor_1',
dag=self.dag,
**self._config
)
Expand All @@ -105,7 105,7 @@ def test_init(self):
self.assertEqual(sensor.timeout, self._config['timeout'])

sensor = WasbPrefixSensor(
task_id='wasb_sensor',
task_id='wasb_sensor_2',
dag=self.dag,
check_options={'timeout': 2},
**self._config
Expand Down
20 changes: 10 additions & 10 deletions tests/providers/sftp/operators/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 75,7 @@ def test_pickle_file_transfer_put(self):

# put test file to remote
put_test_task = SFTPOperator(
task_id="test_sftp",
task_id="put_test_task",
ssh_hook=self.hook,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
Expand All @@ -89,7 89,7 @@ def test_pickle_file_transfer_put(self):

# check the remote file content
check_file_task = SSHOperator(
task_id="test_check_file",
task_id="check_file_task",
ssh_hook=self.hook,
command="cat {0}".format(self.test_remote_filepath),
do_xcom_push=True,
Expand All @@ -99,7 99,7 @@ def test_pickle_file_transfer_put(self):
ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow())
ti3.run()
self.assertEqual(
ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
ti3.xcom_pull(task_ids=check_file_task.task_id, key='return_value').strip(),
test_local_file_content)

@conf_vars({('core', 'enable_xcom_pickling'): 'True'})
Expand Down Expand Up @@ -178,7 178,7 @@ def test_json_file_transfer_put(self):

# put test file to remote
put_test_task = SFTPOperator(
task_id="test_sftp",
task_id="put_test_task",
ssh_hook=self.hook,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
Expand All @@ -191,7 191,7 @@ def test_json_file_transfer_put(self):

# check the remote file content
check_file_task = SSHOperator(
task_id="test_check_file",
task_id="check_file_task",
ssh_hook=self.hook,
command="cat {0}".format(self.test_remote_filepath),
do_xcom_push=True,
Expand All @@ -201,7 201,7 @@ def test_json_file_transfer_put(self):
ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow())
ti3.run()
self.assertEqual(
ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
ti3.xcom_pull(task_ids=check_file_task.task_id, key='return_value').strip(),
b64encode(test_local_file_content).decode('utf-8'))

@conf_vars({('core', 'enable_xcom_pickling'): 'True'})
Expand Down Expand Up @@ -362,7 362,7 @@ def test_arg_checking(self):
with self.assertRaisesRegex(AirflowException,
"Cannot operate without ssh_hook or ssh_conn_id."):
task_0 = SFTPOperator(
task_id="test_sftp",
task_id="test_sftp_0",
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
operation=SFTPOperation.PUT,
Expand All @@ -372,7 372,7 @@ def test_arg_checking(self):

# if ssh_hook is invalid/not provided, use ssh_conn_id to create SSHHook
task_1 = SFTPOperator(
task_id="test_sftp",
task_id="test_sftp_1",
ssh_hook="string_rather_than_SSHHook", # invalid ssh_hook
ssh_conn_id=TEST_CONN_ID,
local_filepath=self.test_local_filepath,
Expand All @@ -387,7 387,7 @@ def test_arg_checking(self):
self.assertEqual(task_1.ssh_hook.ssh_conn_id, TEST_CONN_ID)

task_2 = SFTPOperator(
task_id="test_sftp",
task_id="test_sftp_2",
ssh_conn_id=TEST_CONN_ID, # no ssh_hook provided
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
Expand All @@ -402,7 402,7 @@ def test_arg_checking(self):

# if both valid ssh_hook and ssh_conn_id are provided, ignore ssh_conn_id
task_3 = SFTPOperator(
task_id="test_sftp",
task_id="test_sftp_3",
ssh_hook=self.hook,
ssh_conn_id=TEST_CONN_ID,
local_filepath=self.test_local_filepath,
Expand Down
Loading