Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy of [AIRFLOW-5071] JIRA: Thousands of Executor reports task instance X finished (success) although the task says its queued. Was the task killed externally? #10790

Closed
dmariassy opened this issue Sep 8, 2020 · 60 comments
Assignees
Labels
area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug pinned Protect from Stalebot auto closing Stale Bug Report

Comments

@dmariassy
Copy link

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version): Server: v1.10.13, Client: v1.17.0

Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release): Debian GNU/Linux 9 (stretch)
  • Kernel (e.g. uname -a): Linux airflow-web-54fc4fb694-ftkp5 4.19.123-coreos #1 SMP Fri May 22 19:21:11 -00 2020 x86_64 GNU/Linux
  • Others: Redis, CeleryExecutor

What happened:

In line with the guidelines laid out in AIRFLOW-7120, I'm copying over a JIRA for a bug that has significant negative impact on our pipeline SLAs. The original ticket is AIRFLOW-5071 which has a lot of details from various users who use ExternalTaskSensors in reschedule mode and see their tasks going through the following unexpected state transitions:

running -> up_for_reschedule -> scheduled -> queued -> up_for_retry

In our case, this issue seems to affect approximately ~2000 tasks per day.

Screenshot 2020-09-08 at 09 01 03

What you expected to happen:

I would expect that tasks would go through the following state transitions instead: running -> up_for_reschedule -> scheduled -> queued -> running

How to reproduce it:

Unfortunately, I don't have configuration available that could be used to easily reproduce the issue at the moment. However, based on the thread in AIRFLOW-5071, the problem seems to arise in deployments that use a large number of sensors in reschedule mode.

@dmariassy dmariassy added the kind:bug This is a clearly a bug label Sep 8, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 8, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@turbaszek
Copy link
Member

turbaszek commented Sep 8, 2020

Thanks @dmariassy for bringing this issue to Github! I think this one is quite important to fix but as long as we don't know how to replicate it we are going blind.

I spent some time trying to reproduce it on 2.0 and 1.10.9 but to no effect :<

@turbaszek turbaszek added area:Scheduler Scheduler or dag parsing Issues pinned Protect from Stalebot auto closing labels Sep 8, 2020
@dmariassy
Copy link
Author

Thanks for your reply @turbaszek . What did your reproduction set-up look like? If I have the time, I would like to have a go at trying to reproduce it myself in the coming weeks.

@turbaszek
Copy link
Member

turbaszek commented Sep 8, 2020

As it was reported in original issue and comments this behavior should be possible to reproduce in case of fast sensors in reschedule mode. That's why I was trying to use many DAGs like this:

from random import choice
from airflow.utils.dates import days_ago
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
import time


class TestSensor(BaseSensorOperator):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.mode = "RESCHEDULE"

    def poke(self, context):
        time.sleep(5)
        return choice([True, False, False])


args = {"owner": "airflow", "start_date": days_ago(1)}


with DAG(
    dag_id="%s",
    is_paused_upon_creation=False,
    max_active_runs=100,
    default_args=args,
    schedule_interval="0 * * * *",
) as dag:
    start = BashOperator(task_id="start", bash_command="echo 42")
    end = BashOperator(task_id="end", bash_command="echo 42")
    for i in range(3):
        next = TestSensor(task_id=f"next_{i}")
        start >> next >> end

And I was also playing with airflow config settings as described in comments. Although I saw failing tasks there was no issue like this one or... eventually the log was missing?

I did some tests with external task sensor but also no results.

@yuqian90
Copy link
Contributor

Hi @turbaszek, any finding on this? We have a CeleryExecutor Redis setup with three workers (apache-airflow 1.10.12). The airflow-scheduler log has a lot of lines like this. I remember this was already a problem when we were using older versions such as 1.10.10. It's just we never paid much attention to it.

{taskinstance.py:1150} ERROR - Executor reports task instance <TaskInstance: ... [queued]> finished (success) although the task says its queued. Was the task killed externally?

Same with others in this thread, we have a lot of sensors in "reschedule" mode with poke_interval set to 60s. These are the ones that most often hit this error. So far our workaround has been to add a retries=3 to these sensors. That way when this error happens it retries and we don't get any spam. This is definitely not a great long term solution though. Such sensors go into up_for_retry state when this happen.

I also tried to tweak these parameters. They don't seem to matter much as far as this error is concerned:

parallelism = 1024
dag_concurrency = 128
max_threads = 8

The way to reproduce this issue seems to be to create a DAG with a bunch of parallel reschedule sensors. And make the DAG slow to import. For example, like this. If we add a time.sleep(30) at the end to simulate the experience of slow-to-import DAGs, this error happens a lot for such sensors. You may also need to tweak the dagbag_import_timeout and dag_file_processor_timeout if adding the sleep causes dags to fail to import altogether.

When the scheduler starts to process this DAG, we then start to see the above error happening to these sensors. And the go into up_for_retry.

import datetime
import pendulum
import time

from airflow.models.dag import DAG
from airflow.contrib.sensors.python_sensor import PythonSensor

with DAG(
    dag_id="test_dag_slow",
    start_date=datetime.datetime(2020, 9, 8),
    schedule_interval="@daily",
) as dag:
    sensors = [
        PythonSensor(
            task_id=f"sensor_{i}",
            python_callable=lambda: False,
            mode="reschedule",
            retries=2,
        ) for i in range(20)
    ]
    time.sleep(30)

@turbaszek
Copy link
Member

@yuqian90 thanks you so much for pointing to the DAG! I will check it and let you know. Once we can replicate the problem it will be much more easier to solve it 👍

@sgrzemski-ias
Copy link

sgrzemski-ias commented Sep 10, 2020

@yuqian90

I also tried to tweak these parameters. They don't seem to matter much as far as this error is concerned:

parallelism = 1024
dag_concurrency = 128
max_threads = 8

The way to reproduce this issue seems to be to create a DAG with a bunch of parallel reschedule sensors. And make the DAG slow to import. For example, like this. If we add a time.sleep(30) at the end to simulate the experience of slow-to-import DAGs, this error happens a lot for such sensors. You may also need to tweak the dagbag_import_timeout and dag_file_processor_timeout if adding the sleep causes dags to fail to import altogether.

Those parameters won't help you much. I was struggling to somehow workaround this issue and I believe I've found the right solution now. In my case the biggest hint while debugging was not scheduler/worker logs but the Celery Flower Web UI. We have a setup of 3 Celery workers, 4 CPU each. It often happened that Celery was running 8 or more python reschedule sensors on one worker but 0 on the others and that was the exact time when sensors started to fail. There are two Celery settings that are responsible for this behavior: worker_concurrency with a default value of "16" and worker_autoscale with a default value of "16,12" (it basically means that minimum Celery process # on the worker is 12 and can be scaled up to 16). With those set with default values Celery was configured to load up to 16 tasks (mainly reschedule sensors) to one node. After setting worker_concurrency to match the CPU number and worker_autoscale to "4,2" the problem is literally gone. Maybe that might be anothe clue for @turbaszek.

I've been trying a lot to setup a local docker compose file with scheduler, webserver, flower, postgres and RabbitMQ as a Celery backend but I was not able to replicate the issue as well. I tried to start a worker container with limited CPU to somehow imitate this situation, but I failed. There are in fact tasks killed and shown as failed in Celery Flower, but not with the "killed externally" reason.

@turbaszek
Copy link
Member

@sgrzemski-ias I will setup an environment to first observe the behavior and then if it will occur I will check your suggestion! Hope that we will be able to understand what's going on here 🚀

@turbaszek turbaszek self-assigned this Sep 10, 2020
@turbaszek
Copy link
Member

Ok @yuqian90 @sgrzemski-ias what is you setting for core.dagbag_import_timeout ?

As I'm hitting:

Traceback (most recent call last): File "/usr/local/lib/airflow/airflow/models/dagbag.py", line 237, in process_file m = imp.load_source(mod_name, filepath) File "/opt/python3.6/lib/python3.6/imp.py", line 172, in load_source module = _load(spec) File "", line 684, in _load File "", line 665, in _load_unlocked File "", line 678, in exec_module File "", line 219, in _call_with_frames_removed File "/home/airflow/gcs/dags/test_dag_1.py", line 24, in time.sleep(30) File "/usr/local/lib/airflow/airflow/utils/timeout.py", line 43, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 6217

@yuqian90
Copy link
Contributor

yuqian90 commented Sep 10, 2020

Ok @yuqian90 @sgrzemski-ias what is you setting for core.dagbag_import_timeout ?

As I'm hitting:

Traceback (most recent call last): File "/usr/local/lib/airflow/airflow/models/dagbag.py", line 237, in process_file m = imp.load_source(mod_name, filepath) File "/opt/python3.6/lib/python3.6/imp.py", line 172, in load_source module = _load(spec) File "", line 684, in _load File "", line 665, in _load_unlocked File "", line 678, in exec_module File "", line 219, in _call_with_frames_removed File "/home/airflow/gcs/dags/test_dag_1.py", line 24, in time.sleep(30) File "/usr/local/lib/airflow/airflow/utils/timeout.py", line 43, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 6217

Hi, @turbaszek in my case I have dagbag_import_timeout = 100 and dag_file_processor_timeout = 300. Most of the time dag import takes about 10s. dag file processing can take 60s that's why it's set to a large number.

After digging further, I think the slowness that causes the error for our case is in this function: SchedulerJob._process_dags(). If this function takes around 60s, those reschedule sensors will hit the ERROR - Executor reports task instance ... killed externally? error. My previous comment about adding the time.sleep(30) is just one way to replicate this issue. Anything that causes _process_dags() to slow down should be able to replicate this error.

@dmariassy
Copy link
Author

Here's another potential hint: We have increased the poke_interval value for a subset of our sensors yesterday to 5 minutes (from the default 1 minute), and the issue seems to have disappeared for the affected sensors.

@kaxil
Copy link
Member

kaxil commented Sep 11, 2020

I can confirm that one of our customers also faced a similar issue with poke='reschedule' and increasing poke_interval had fixed the issue for them.

It feels some sort of race condition.

@zbigniev
Copy link

zbigniev commented Sep 18, 2020

We are on Airflow 1.10.10

Besides the DAGs which have sensor tasks in them, we are even encountering this in tasks which have no sensors in them, for example a DAG which only has PythonOperator and HiveOperator in it.
Also, weirdly not all dags, even with similar signatures (in terms of operators being used) are not affected, but ones that are, are severely affected and keep getting:
ERROR - Executor reports task instance <TaskInstance: task_id 2020-09-11 00:00:00 00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?

@yuqian90
Copy link
Contributor

After digging further, I think the slowness that causes the error for our case is in this function: SchedulerJob._process_dags(). If this function takes around 60s, those reschedule sensors will hit the ERROR - Executor reports task instance ... killed externally? error. My previous comment about adding the time.sleep(30) is just one way to replicate this issue. Anything that causes _process_dags() to slow down should be able to replicate this error.

Some further investigation shows that the slow down that caused this issue for our case (Airflow 1.10.12) was in SchedulerJob._process_task_instances. This is periodically called in the DagFileProcessor process spawned by the airflow scheduler. Anything that causes this function to take more than 60s seems to cause these ERROR - Executor reports task instance ... killed externally? errors for sensors in reschedule mode with poke_interval of 60s. I'm trying to address one of the cause of the SchedulerJob._process_task_instances slowdown for our own case here #11010, but that's not a fix for the other causes of this same error.

@rafalkozik
Copy link

We have just introduced ExternalTaskSensor into our pipeline and faced the same issue. When initially tested on our dev instance (~200 DAGs) it worked fine, after running it on our prod environment (~400 DAGs) it was always failing after reschedule.

After digging into the code, it looks that this is simply race condition in the scheduler.

We have child_dag.parent_dag_completed task that waits for business process to complete calculations in parent_dag, task execution logs:

[2020-10-01 11:48:03,038] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00 00:00 [queued]>
[2020-10-01 11:48:03,065] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00 00:00 [queued]>
[2020-10-01 11:48:03,066] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-10-01 11:48:03,066] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-10-01 11:48:03,066] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-10-01 11:48:03,095] {taskinstance.py:900} INFO - Executing <Task(ExternalTaskSensor): parent_dag_completed> on 2020-09-30T11:45:00 00:00
[2020-10-01 11:48:03,100] {standard_task_runner.py:53} INFO - Started process 26131 to run task
[2020-10-01 11:48:03,235] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00 00:00 [running]> ip-10-200-100-113.eu-west-1.compute.internal
[2020-10-01 11:48:03,318] {external_task_sensor.py:117} INFO - Poking for parent_dag on 2020-09-30T11:45:00 00:00 ... 
[2020-10-01 11:48:03,397] {taskinstance.py:1136} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2020-10-01 11:48:12,994] {logging_mixin.py:112} INFO - [2020-10-01 11:48:12,993] {local_task_job.py:103} INFO - Task exited with return code 0
[2020-10-01 11:50:53,744] {taskinstance.py:663} INFO - Dependencies not met for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00 00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run.
[2020-10-01 11:50:53,747] {logging_mixin.py:112} INFO - [2020-10-01 11:50:53,747] {local_task_job.py:91} INFO - Task is not able to be run

Scheduler logs:

<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [scheduled]>
[2020-10-01 11:47:59,428] {scheduler_job.py:1010} INFO - DAG child_dag has 0/16 running and queued tasks
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [scheduled]>
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [queued]>
[2020-10-01 11:47:59,565] {scheduler_job.py:1170} INFO - Sending ('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-10-01 11:47:59,565] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'child_dag', 'parent_dag_completed', '2020-09-30T11:45:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [scheduled]>
[2020-10-01 11:50:50,118] {scheduler_job.py:1010} INFO - DAG child_dag has 0/16 running and queued tasks
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [scheduled]>
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [queued]>
[2020-10-01 11:50:50,148] {scheduler_job.py:1170} INFO - Sending ('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-10-01 11:50:50,148] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'child_dag', 'parent_dag_completed', '2020-09-30T11:45:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
[2020-10-01 11:50:50,595] {scheduler_job.py:1313} INFO - Executor reports execution of child_dag.parent_dag_completed execution_date=2020-09-30 11:45:00 00:00 exited with status success for try_number 1
[2020-10-01 11:50:50,599] {scheduler_job.py:1330} ERROR - Executor reports task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?
[2020-10-01 11:50:50,803] {taskinstance.py:1145} ERROR - Executor reports task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00 00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?
[2020-10-01 11:50:50,804] {taskinstance.py:1202} INFO - Marking task as FAILED.dag_id=child_dag, task_id=parent_dag_completed, execution_date=20200930T114500, start_date=20201001T114803, end_date=20201001T115050

From scheduler log it's visible that event from executor is processed after task is already queued for the second time.

Logic related to those logs is here:

    def _validate_and_run_task_instances(self, simple_dag_bag):
        if len(simple_dag_bag.simple_dags) > 0:
            try:
                self._process_and_execute_tasks(simple_dag_bag) # <-- task state is changed to queued here
            except Exception as e:
                self.log.error("Error queuing tasks")
                self.log.exception(e)
                return False

        # Call heartbeats
        self.log.debug("Heartbeating the executor")
        self.executor.heartbeat()

        self._change_state_for_tasks_failed_to_execute()

        # Process events from the executor
        self._process_executor_events(simple_dag_bag) # <-- notification of previous execution is processed and there is state mismatch 
        return True

This is the place where task state is changes:

    def _process_executor_events(self, simple_dag_bag, session=None):
       
       # ...
       
                if ti.try_number == try_number and ti.state == State.QUEUED:
                    msg = ("Executor reports task instance {} finished ({}) "
                           "although the task says its {}. Was the task "
                           "killed externally?".format(ti, state, ti.state))
                    Stats.incr('scheduler.tasks.killed_externally')
                    self.log.error(msg)
                    try:
                        simple_dag = simple_dag_bag.get_dag(dag_id)
                        dagbag = models.DagBag(simple_dag.full_filepath)
                        dag = dagbag.get_dag(dag_id)
                        ti.task = dag.get_task(task_id)
                        ti.handle_failure(msg)
                    except Exception:
                        self.log.error("Cannot load the dag bag to handle failure for %s"
                                       ". Setting task to FAILED without callbacks or "
                                       "retries. Do you have enough resources?", ti)
                        ti.state = State.FAILED
                        session.merge(ti)
                        session.commit()

Unfortunately I think that moving _process_executor_events before _process_and_execute_tasks would not solve the issue as event might arrive from executor while _process_and_execute_tasks is executing. Increasing poke_interval reduces chance of this race condition happening when scheduler is under a heavy load.

I'm not too familiar with Airflow code base, but it seems that the root cause is the way how reschedule works and the fact that try_number is not changing. Because of that scheduler thinks that event for past execution is for the ongoing one.

@freedom1989
Copy link

The cause is clear as @rafalkozik mentioned. After scheduler schedule the task at the second time(put it in queue) and then it start process the executor events of the task's first-try. It occurs when the scheduling loop time > sensor task reschedule interval.
Either reducing the scheduler looping time(dag processing time, etc) or increasing the sensor task reschedule interval will work.

The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.

    def _process_executor_events(self, simple_dag_bag, session=None):
       
       # ...
       
                if ti.try_number == try_number and ti.state == State.QUEUED:  # <-- try number for a sensor task is always the same
                    msg = ("Executor reports task instance {} finished ({}) "
                           "although the task says its {}. Was the task "
                           "killed externally?".format(ti, state, ti.state))
                    Stats.incr('scheduler.tasks.killed_externally')
                    self.log.error(msg)
                    try:
                        simple_dag = simple_dag_bag.get_dag(dag_id)
                        dagbag = models.DagBag(simple_dag.full_filepath)
                        dag = dagbag.get_dag(dag_id)
                        ti.task = dag.get_task(task_id)
                        ti.handle_failure(msg)
                    except Exception:
                        self.log.error("Cannot load the dag bag to handle failure for %s"
                                       ". Setting task to FAILED without callbacks or "
                                       "retries. Do you have enough resources?", ti)
                        ti.state = State.FAILED
                        session.merge(ti)
                        session.commit()

@turbaszek
Copy link
Member

The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.

I saw customers doing this (custom fork). I'm curious if this error will occur in Airflow 2.0

@yuqian90
Copy link
Contributor

The bug can also be fixed if the rescheduled task instance use a different try number, but this will cause a lot of log files.

I saw customers doing this (custom fork). I'm curious if this error will occur in Airflow 2.0

Hi @turbaszek I did not test this in Airflow 2.0 so I may be wrong. I don't see any attempts to address this in Airflow 2.0 so this is likely going to happen in 2.0 too. That said, the scheduler loop is faster in Airflow 2.0, the chance of running into this ERROR - Executor reports task instance ... killed externally issue should become smaller.

@nathadfield
Copy link
Collaborator

nathadfield commented Dec 2, 2020

@turbaszek I am currently testing Airflow v2.0.0b3 against the same DAGS we currently run on production against 1.10.12 and I can confirm that this is still an issue.

Combined with #12552 it makes the problem even worse too.

@nathadfield
Copy link
Collaborator

To add some further context, I can consistently replicate this error on 2.0.0b3 on a very simple environment running two Docker containers - webserver and postgres - on a Python 3.7 image using LocalExecutor and with a poke_interval of 60 * 5.

[2020-12-03 11:52:04,649] {scheduler_job.py:946} INFO - 1 tasks up for execution:
	<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00 00:00 [scheduled]>
[2020-12-03 11:52:04,655] {scheduler_job.py:980} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2020-12-03 11:52:04,656] {scheduler_job.py:1007} INFO - DAG target_dag has 0/16 running and queued tasks
[2020-12-03 11:52:04,657] {scheduler_job.py:1068} INFO - Setting the following tasks to queued state:
	<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00 00:00 [scheduled]>
[2020-12-03 11:52:04,661] {scheduler_job.py:1110} INFO - Sending TaskInstanceKey(dag_id='target_dag', task_id='wait-task', execution_date=datetime.datetime(2020, 12, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
[2020-12-03 11:52:04,663] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'target_dag', 'wait-task', '2020-12-02T00:00:00 00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/target_dag.py']
[2020-12-03 11:52:04,675] {local_executor.py:80} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'target_dag', 'wait-task', '2020-12-02T00:00:00 00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/target_dag.py']
[2020-12-03 11:52:04,710] {dagbag.py:440} INFO - Filling up the DagBag from /usr/local/airflow/dags/target_dag.py
Running <TaskInstance: target_dag.wait-task 2020-12-02T00:00:00 00:00 [queued]> on host 5acdea444946
[2020-12-03 11:52:05  0000] [568] [INFO] Handling signal: ttin
[2020-12-03 11:52:05  0000] [11260] [INFO] Booting worker with pid: 11260
[2020-12-03 11:52:05,776] {scheduler_job.py:946} INFO - 1 tasks up for execution:
	<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00 00:00 [scheduled]>
[2020-12-03 11:52:05,783] {scheduler_job.py:980} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2020-12-03 11:52:05,783] {scheduler_job.py:1007} INFO - DAG target_dag has 0/16 running and queued tasks
[2020-12-03 11:52:05,783] {scheduler_job.py:1068} INFO - Setting the following tasks to queued state:
	<TaskInstance: target_dag.wait-task 2020-12-02 00:00:00 00:00 [scheduled]>
[2020-12-03 11:52:05,791] {scheduler_job.py:1110} INFO - Sending TaskInstanceKey(dag_id='target_dag', task_id='wait-task', execution_date=datetime.datetime(2020, 12, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 1 and queue default
[2020-12-03 11:52:05,791] {base_executor.py:82} ERROR - could not queue task TaskInstanceKey(dag_id='target_dag', task_id='wait-task', execution_date=datetime.datetime(2020, 12, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=1)
[2020-12-03 11:52:05,797] {scheduler_job.py:1208} INFO - Executor reports execution of target_dag.wait-task execution_date=2020-12-02 00:00:00 00:00 exited with status success for try_number 1
[2020-12-03 11:52:05,808] {scheduler_job.py:1237} ERROR - Executor reports task instance <TaskInstance: target_dag.wait-task 2020-12-02 00:00:00 00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
from airflow import models
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2018, 10, 31),
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag_name = 'target_dag'

with models.DAG(dag_name,
                default_args=default_args,
                schedule_interval='0 0 * * *',
                catchup=False,
                max_active_runs=5
                ) as dag:

    wait = ExternalTaskSensor(
        task_id='wait-task',
        external_dag_id='master_dag',
        external_task_id='start',
        poke_interval=60 * 5,
        mode='reschedule'
    )

@nathadfield
Copy link
Collaborator

nathadfield commented Dec 3, 2020

Not sure if this is relevant but, when the task was rescheduled five minutes later, I saw this.

[2020-12-03 11:57:07,266] {scheduler_job.py:1237} ERROR - Executor reports task instance <TaskInstance: target_dag.wait-task 2020-12-02 00:00:00 00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
Process ForkProcess-34:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 365, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 596, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 659, in _run_parsing_loop
    self._processors.pop(processor.file_path)
KeyError: '/usr/local/airflow/dags/target_dag.py'
[2020-12-03 11:57:09,101] {dag_processing.py:399} WARNING - DagFileProcessorManager (PID=157) exited with exit code 1 - re-launching
[2020-12-03 11:57:09,105] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 33432

@turbaszek
Copy link
Member

Not sure if this is relevant but, when the task was rescheduled five minutes later, I saw this.

I saw this also from time to time but not always so probably not related.

@turbaszek
Copy link
Member

@nathadfield @yuqian90 and others, have you been able to test 2.0? Have you observed this issue?

@nguyenmphu
Copy link

nguyenmphu commented Oct 22, 2021

I found that in the code of airflow/jobs/scheduler_job.py: https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L535

           if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED:
                Stats.incr('scheduler.tasks.killed_externally')
                msg = (
                    "Executor reports task instance %s finished (%s) although the "
                    "task says its %s. (Info: %s) Was the task killed externally?"
                )
                self.log.error(msg, ti, state, ti.state, info)

The scheduler checks the state of the task instance. When a task instance is rescheduled (e.g: an external sensor), its state transition up_for_reschedule -> scheduled -> queued -> running. If its state is queued and not moved to the running state, the scheduler will raise an error.
So I think the code needs to be changed:

           if ti.try_number == buffer_key.try_number and (
                ti.state == State.QUEUED and not TaskReschedule.find_for_task_instance(ti, session=session)
            ):
                Stats.incr('scheduler.tasks.killed_externally')
                msg = (
                    "Executor reports task instance %s finished (%s) although the "
                    "task says its %s. (Info: %s) Was the task killed externally?"
                )
                self.log.error(msg, ti, state, ti.state, info)

Here is my PR: #19123

@ghostbody
Copy link

ghostbody commented Jan 13, 2022

we reviewed the code and found that in local_task_job.py, the parent process has a heatbeat_callback, and will check the state and child process return code of the task_instance.

However, theses lines may cover a bug?

image

image

The raw task command write back the taskintance's state(like sucess) doesn't mean the child process is finished(returned)?

So, in this heatbeat callback, there maybe a race condition when task state is filled back while the child process is not returned.

In this senario, the local task will kill the child process by mistake. And then, the scheduler will checkout this and report "task instance X finished (success) although the task says its queued. Was the task killed externally?"

this is a simple schematic diagram:

image

@val2k
Copy link

val2k commented Jan 14, 2022

We face the same issue with tasks that stay indefinitely in a queued status, except that we don't see tasks as up_for_retry. It happens randomly within our DAGs. The task will stay in a queued status forever until we manually make it fail. We don't use any sensors at all. We are on an AWS MWAA instance (Airflow 2.0.2).

Example logs:
Scheduler:

[2022-01-14 08:03:32,868] {{scheduler_job.py:1239}} ERROR - Executor reports task instance <TaskInstance: task0 2022-01-13 07:00:00 00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-01-14 08:03:32,845] {{scheduler_job.py:1210}} INFO - Executor reports execution of task0 execution_date=2022-01-13 07:00:00 00:00 exited with status failed for try_number 1
<TaskInstance: task0 2022-01-13 07:00:00 00:00 [queued]> in state FAILURE

Worker:

[2021-04-20 20:54:29,109: ERROR/ForkPoolWorker-15] Failed to execute task dag_id could not be found: task0. Either the dag did not exist or it failed to parse..`
This is not seen in the worker logs for every occurrence in the scheduler logs.

Because of the MWAA autoscaling mechanism, worker_concurrency is not configurable.
worker_autoscale: 10, 10.
dagbag_import_timeout: 120s
dag_file_processor_timeout: 50s
parallelism = 48
dag_concurrency = 10000
max_threads = 8

We currently have 2 (minWorkers) to 10 (maxWorkers) mw1.medium (2 vCPU) workers.

@pbotros
Copy link

pbotros commented Feb 16, 2022

We also run into this fairly often, despite not using any sensors. We only seemed to start getting this error once we changed our Airflow database to be in the cloud (AWS RDB); our Airflow webserver & scheduler runs on desktop workstations on-premises. As others have suggested in this thread, this is a very annoying problem that requires manual intervention.

@ghostbody any progress on determining if that's the correct root cause?

@ghostbody
Copy link

@pbotros No, we do not solve this problem yet. 😢

@omoumniabdou
Copy link

The problem for us was that we had one dag that reach 32 parallelize runnable task ( 32 leaf tasks) which was the value of parameter parallelism. After this, the scheduler was not able to run (or queue) any task.
Increasing this parameter solve the problem for us.

@ghostbody
Copy link

After STRUGLING, We found a method to 100% reproduce this issue !!!

tl;dr

session.commit()

Add a raise to simulate db error which will likely happen when the DB is under great pressure.

Then you will get this issue Was the task killed externally in all the time.

Conditions:

  • Airflow 2.2
  • Celery Executor

It's becasue the worker use a local task job which will spwan a child process to execute the job. The parent process set the task from Queued to Running State. However, when the prepare work for the parent process failed, it will lead to this error directly.

related code is here: https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/local_task_job.py#L89

@turbaszek
Copy link
Member

@ghostbody do you have idea how this can be addressed?

@ghostbody
Copy link

@turbaszek Let me make a PR later~ We are doing pressure tests these days and this problem had appeared often.

@aakashanand92
Copy link

aakashanand92 commented Mar 14, 2022

We face the same issue with tasks that stay indefinitely in a queued status, except that we don't see tasks as up_for_retry. It happens randomly within our DAGs. The task will stay in a queued status forever until we manually make it fail. We don't use any sensors at all. We are on an AWS MWAA instance (Airflow 2.0.2).

Example logs: Scheduler:

[2022-01-14 08:03:32,868] {{scheduler_job.py:1239}} ERROR - Executor reports task instance <TaskInstance: task0 2022-01-13 07:00:00 00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-01-14 08:03:32,845] {{scheduler_job.py:1210}} INFO - Executor reports execution of task0 execution_date=2022-01-13 07:00:00 00:00 exited with status failed for try_number 1
<TaskInstance: task0 2022-01-13 07:00:00 00:00 [queued]> in state FAILURE

Worker:

[2021-04-20 20:54:29,109: ERROR/ForkPoolWorker-15] Failed to execute task dag_id could not be found: task0. Either the dag did not exist or it failed to parse..`
This is not seen in the worker logs for every occurrence in the scheduler logs.

Because of the MWAA autoscaling mechanism, worker_concurrency is not configurable. worker_autoscale: 10, 10. dagbag_import_timeout: 120s dag_file_processor_timeout: 50s parallelism = 48 dag_concurrency = 10000 max_threads = 8

We currently have 2 (minWorkers) to 10 (maxWorkers) mw1.medium (2 vCPU) workers.

@val2k Did you find a solution for this ? I am also using MWAA environment and facing the same issue.

The tasks get stuck in queued state and when I look at the scheduler logs I can see the same error.

"Executor reports task instance %s finished (%s) although the task says its %s. (Info: %s) Was the task killed externally?"

I tried everything I can find in this thread but nothing seems to be working.

@kenny813x201
Copy link

We also got the same error message. In our case, it turns out that we are using the same name for different dags.
Changing different dags from as dag to like as dags1 and as dags2 solve the issue for us.

with DAG(
    "dag_name",
) as dag:

@woodywuuu
Copy link

woodywuuu commented Apr 11, 2022

airflow: 2.2.2 with mysql8、 HA scheduler、celery executor(redis backend)

From logs, it show that those ti reported this error killed externally (status: success) , were rescheduled!

  1. scheduler found a ti to scheduled (ti from None to scheduled)
  2. scheduler queued ti(ti from scheduled to queued)
  3. scheduler send ti to celery
  4. worker get ti
  5. worker found ti‘s state in mysql is scheduled https://github.com/apache/airflow/blob/2.2.2/airflow/models/taskinstance.py#L1224
  6. worker set this ti to None
  7. scheduler reschedule this ti( ti from None to scheduled)
  8. scheduler queue this ti( ti from scheduled to queued) ,but could not queue this ti to celery again, and found this ti success(in celery), so set it to failed

From mysql we get that: all failed task has no external_executor_id!

We use 5000 dags, each with 50 dummy task, found that, if the following two conditions are met,the probability of triggering this problem will highly increase:

  1. no external_executor_id was set to queued ti in celery https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L537
    • This sql above has skip_locked, and some queued ti in celery may miss this external_executor_id.
  2. a scheduler loop cost very long(more than 60s), adopt_or_reset_orphaned_tasks judge that schedulerJob failed, and try adopt orphaned ti
    def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:

We do these tests:

  1. patch SchedulerJob. _process_executor_events , not to set external_executor_id to those queued ti
    • 300 dag failed with killed externally (status: success) normally less than 10
  2. patch adopt_or_reset_orphaned_tasks, not to adopt orphaned ti
    • no dag failed !

I read the notes below , but still don't understand this problems:

  1. why should we handle queued ti in celery and set this external id ?

@vanducng
Copy link
Contributor

@turbaszek Let me make a PR later~ We are doing pressure tests these days and this problem had appeared often.

Hey turbaszek, Any chance to have PR submitted, we are experiencing in 2.3.0 as well.

@potiuk
Copy link
Member

potiuk commented May 22, 2022

@turbaszek Let me make a PR later~ We are doing pressure tests these days and this problem had appeared often.

Hey turbaszek, Any chance to have PR submitted, we are experiencing in 2.3.0 as well.

I think you wanted to call @ghostbody who wanted to submi the fix @vanducng .

@V0lantis
Copy link
Contributor

V0lantis commented Jul 7, 2022

Hello here 👋,
I would very much like to help, experiencing the exact same issue with the sensor in our kubernetes cluster with Celery executor. Just to add our own experience:

We are running the architecture with a shared NFS which is hosting our dags and logs (among other things). The speed and allowed throughput in the shared fileSystem is huge bottleneck for the scheduler (since it needs to parse the dags quite often). We noticed that the issue with the sensors and the log message INFO - Task is not able to run appeared when we spent up the entire credits AWS allowed us. We therefore switched to a speed enhanced FileSystem (FSx) for those who are wondering, and the issue almost disappeared, though remained as for now (which is why I would very much like to help)

[EDIT] I found the reason, see comment below.
But we are also experiencing a similar issue though I am not exactly sure it is the same one.
For example, sometime a task will be picked up by a celery worker but will not output any logs file. Here is what I found:

  1. A task is in a running state for a while without anything happening.
  2. When we take a look at the log, we see the following:
*** Reading remote log from s3://my.bucket/airflow/prod/my_dag/one_of_dag_task/2022-07-03T12:00:00 00:00/1.log.
[2022-07-04, 20:58:08 UTC] {taskinstance.py:1037} INFO - Dependencies not met for <TaskInstance: my_dag.one_of_dag_task scheduled__2022-07-03T12:00:00 00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
[2022-07-04, 20:58:08 UTC] {taskinstance.py:1037} INFO - Dependencies not met for <TaskInstance: my_dag.one_of_dag_task scheduled__2022-07-03T12:00:00 00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state.
[2022-07-04, 20:58:08 UTC] {local_task_job.py:99} INFO - Task is not able to be run

[2022-07-04, 20:58:08 UTC] {taskinstance.py:1037} INFO - Dependencies not met for <TaskInstance: my_dag.one_of_dag_task scheduled__2022-07-03T12:00:00 00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
[2022-07-04, 20:58:08 UTC] {taskinstance.py:1037} INFO - Dependencies not met for <TaskInstance: my_dag.one_of_dag_task scheduled__2022-07-03T12:00:00 00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state.
[2022-07-04, 20:58:08 UTC] {local_task_job.py:99} INFO - Task is not able to be run
[2022-07-04, 23:21:07 UTC] {spark.py:201} INFO - Driver application_1655985151726_32839 has finished
[2022-07-04, 23:21:07 UTC] {taskinstance.py:1288} INFO - Marking task as SUCCESS. dag_id=my_dag, task_id=one_of_dag_task, execution_date=20220703T120000, start_date=20220704T145813, end_date=20220704T232107
[2022-07-04, 23:21:07 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-07-04, 23:21:08 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check

One could argue, why am I making so much trouble if the task has been successfully run ? Because sometime, the task simply doesn't ever finish, and stay in running state indefinitely, which is hard to spot (until we have some alert because the dag has a lot of delay)

  1. By digging through the log, I was able to find one worker who picked the task the 2022-07-04 14:58:05,058:
prod worker [2022-07-04 14:58:05,058: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[661f0a53-129b-4856-8db0-5632ff466833] received
prod worker [2022-07-04 14:58:05,069: INFO/ForkPoolWorker-828] Executing command in Celery: ['airflow', 'tasks', 'run', 'my_dag', 'one_of_dag_task', 'scheduled__2022-07-03T12:00:00 00:00', '--local', '--subdir', 'DAGS_FOLDER/my_dag/one_of_dag_task.py']
prod worker [2022-07-04 14:58:05,929: WARNING/ForkPoolWorker-828] Running <TaskInstance: my_dag.one_of_dag_task scheduled__2022-07-03T12:00:00 00:00 [queued]> on host airflow-worker-cfd5b7655-7hlcz

And then no log given by the worker, saying for example that the task went from queued to running state

  1. But then, later :
prod worker [2022-07-04 20:58:05,273: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[661f0a53-129b-4856-8db0-5632ff466833] received
prod worker [2022-07-04 20:58:05,282: INFO/ForkPoolWorker-2634] Executing command in Celery: ['airflow', 'tasks', 'run', 'my_dag', 'one_of_dag_task', 'scheduled__2022-07-03T12:00:00 00:00', '--local', '--subdir', 'DAGS_FOLDER/my_dag/one_of_dag_task.py']
prod worker [2022-07-04 20:58:05,282: INFO/ForkPoolWorker-2634] Celery task ID: 661f0a53-129b-4856-8db0-5632ff466833
prod worker [2022-07-04 20:58:05,306: INFO/ForkPoolWorker-2634] Filling up the DagBag from /opt/airflow/dags/my_dag/one_of_dag_task.py
prod worker [2022-07-04 20:58:06,023: WARNING/ForkPoolWorker-2634] Running <TaskInstance: my_dag.one_of_dag_task scheduled__2022-07-03T12:00:00 00:00 [running]> on host data-platform-airflow-worker-cfd5b7655-68n2 
  1. And finally in the first worker, we see:
data-eks-prod worker [2022-07-04 23:21:09,542: INFO/ForkPoolWorker-828] Task airflow.executors.celery_executor.execute_command[661f0a53-129b-4856-8db0-5632ff466833] succeeded in 30184.482223104016s: None

(This is a very long task basically waiting for a spark job to termiate)

Here, it ended well, but sometime, there is just nothing happening. How is it possible that no logs from the first worker were generated? How is it possible that the scheduler scheduled the task a second time if it was still in running state 🤔

I am still trying to understand the complexity of Airflow to understand this issue and maybe propose a PR, but I wanted to participate with what I have been able to find so far. Some more informations about our architectures:

Environment:

Cloud provider or hardware configuration: AWS
OS (e.g. from /etc/os-release): centos rhel fedora
Others: Redis, CeleryExecutor, Airflow 2.2.5

@V0lantis
Copy link
Contributor

V0lantis commented Jul 7, 2022

Sorry for the bothering 👆, I found the reason.
If you take a look at the log, you see that there is exactly 6 hours separating the execution of those to tasks. I just need to add a visibility_timeout in the celery_broker_transport_options. Maybe the worker could check if there is already a log file present in the logs? Otherwise, it is going to overwrite the old and good one 😔

@V0lantis
Copy link
Contributor

V0lantis commented Jul 7, 2022

In the case where a task is a visible again in Celery broker because visibility_timeout has been reached, wouldn't it be a good idea that the new worker check first if the task is in a running state and if the first worker has sent a heartbeat recently?

I am just wondering if this is a wanted behavior from Airflow.

Isn't the worker supposed to send heartbeat to the db, to tell it that it is still running? Why would we want a second worker to pick the task again after visibility_timeout has been reached? 🤔

@karthik-raparthi
Copy link

Hello,

We are facing a similar issue but it looks likes for me a combination of EFS(Throughput-Provisioned(25MiB/s) Worker not sending back an exception to Scheduler that unable to read the dag file and task struck in queue state forever.

Scheduler Log
[^[[34m2022-09-16 22:38:31,848^[[0m] {^[[34mscheduler_job.py:^[[0m572} ERROR^[[0m - Executor reports task instance <TaskInstance: dagName.TaskName scheduled__2022-09-16T21:30:00 00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?^[[0m

Worker Log

File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/slack_sdk/__init__.py", line 44, in <module>\n from .web import WebClient # noqa\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/slack_sdk/web/__init__.py", line 3, in <module>\n from .client import WebClient # noqa\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/slack_sdk/web/client.py", line 8, in <module>\n from slack_sdk.models.views import View\n File "<frozen importlib._bootstrap>", line 991, in _find_and_load\n File "<frozen importlib._bootstrap>", line 971, in _find_and_load_unlocked\n File "<frozen importlib._bootstrap>", line 914, in _find_spec\n File "<frozen importlib._bootstrap_external>", line 1342, in find_spec\n File "<frozen importlib._bootstrap_external>", line 1314, in _get_spec\n File "<frozen importlib._bootstrap_external>", line 1458, in find_spec\n File "<frozen importlib._bootstrap_external>", line 101, in _path_isfile\n File "<frozen importlib._bootstrap_external>", line 93, in _path_is_mode_type\n File "<frozen importlib._bootstrap_external>", line 87, in _path_stat\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout\n raise AirflowTaskTimeout(self.error_message)\nairflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /mnt/airflow/config2/dags/DagName.py after 30.0s.\nPlease take a look at these docs to improve your DAG import time:\n* https://airflow.apache.org/docs/apache-airflow/2.2.2/best-practices.html#top-level-python-code\n* https://airflow.apache.org/docs/apache-airflow/2.2.2/best-practices.html#reducing-dag-complexity, PID: 13809\nTraceback (most recent call last):\n File "/mnt/airflow/airflow2/bin/airflow", line 8, in <module>\n sys.exit(main())\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/airflow/__main__.py", line 48, in main\n args.func(args)\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command\n return func(*args, **kwargs)\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper\n return f(*args, **kwargs)\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/airflow/cli/commands/task_command.py", line 276, in task_run\n dag = get_dag(args.subdir, args.dag_id)\n File "/mnt/airflow/airflow2/lib64/python3.8/site-packages/airflow/utils/cli.py", line 192, in get_dag\n raise AirflowException(\nairflow.exceptions.AirflowException: Dag \'DagName\' could not be found; either it does not exist or it failed to parse.\n' [2022-09-16 22:38:12,536: ERROR/ForkPoolWorker-16] execute_command encountered a CalledProcessError

@V0lantis
Copy link
Contributor

V0lantis commented Sep 22, 2022

Hello @karthik-raparthi, we also did experience similar issue with EFS. EFS is definitely not suited for a big airflow deployment, and we stopped having most of its issues when we moved to FSx File System. I therefore encourage you to move to this better solution :)

(we had EFS 100 Mo/s provisioned throughout and still experiencing this)

@potiuk
Copy link
Member

potiuk commented Sep 22, 2022

Hello @karthik-raparthi, we also did experience similar issue with EFS. EFS is definitely not suited for a big airflow deployment, and we stopped having most of its issues when we moved to FSx File System. I therefore encourage you to move to this better solution :)

(we had EFS 100 Mo/s provisioned throughout and still experiencing this)

Quite agree. There were multiple people reporting problems in huge airflow installation where EFS was used. I can also recommend (as usual) switching to Git Sync. I wrote an article about it https://medium.com/apache-airflow/shared-volumes-in-airflow-the-good-the-bad-and-the-ugly-22e9f681afca - especially when you are using Git to store your DAGs already, using shared volume is completely unnecessary and using Git Sync directly is far better solution.

@karthik-raparthi
Copy link

karthik-raparthi commented Sep 22, 2022

Thanks, @V0lantis & @potiuk for the inputs. Yes, we are in process of moving away from EFS. but trying to see any workarounds to find the issue once it struck in Queue by using some alerts.

I did some research and looks like we can rely on the task_instance table on Airflow metadb to alert as soon as a Task struck in Q for more than 30 mins(this time might vary based on EFS)

`select
*
from
task_instance ti
where
start_date is null
and end_date is null
and pid is null
and state = 'queued'

@potiuk
Copy link
Member

potiuk commented Sep 22, 2022

The simplest workaround is to pay EVEN MORE for OUTRAGEOUS amount of EFS IOPS. This seemed to work for many of our customers and it might be cheaper than engineering time you spend on trying to solve it (unless you are already maxing it out).

Copy link

github-actions bot commented Nov 4, 2023

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

Copy link

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug pinned Protect from Stalebot auto closing Stale Bug Report
Projects
None yet
Development

No branches or pull requests