Airflow backfill filling even though the test is working properly
I am trying to execute a DAG with only one PythonOperator. When I try to check it works fine and also when I tried Airflow without CeleryExecutor and it worked fine.
But with an error, no descriptive error occurs when I try to fill it in Airflow that works with CeleryExecutor:
airflow@ip:/home/admin$ airflow backfill REDSHIFT3 -s 2017-05-10
[2017-05-22 14:41:14,373] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-05-22 14:41:14,432] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-05-22 14:41:14,452] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-05-22 14:41:14,616] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2017-05-22 14:41:14,994] {models.py:1126} INFO - Dependencies all met for <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [scheduled]>
[2017-05-22 14:41:15,000] {base_executor.py:50} INFO - Adding to queue: airflow run REDSHIFT3 get_data_redshift 2017-05-10T00:00:00 --pickle 81 --local
[2017-05-22 14:41:19,893] {celery_executor.py:78} INFO - [celery] queuing (u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0)) through celery, queue=default
[2017-05-22 14:41:20,598] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:20,607] {jobs.py:1978} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-05-22 14:41:24,954] {jobs.py:1725} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:24,954] {models.py:1417} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
None
[2017-05-22 14:41:24,954] {models.py:1441} INFO - Marking task as FAILED.
[2017-05-22 14:41:25,037] {models.py:1462} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:25,042] {jobs.py:1690} ERROR - Task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [failed]> failed
[2017-05-22 14:41:25,044] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:25,047] {models.py:4064} INFO - Marking run <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> failed
[2017-05-22 14:41:25,087] {jobs.py:1978} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 167, in backfill
pool=args.pool)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 3330, in run
job.run()
File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2021, in _execute
raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
Some task instances failed:
set([(u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0))])
Here is the DAG I'm trying to accomplish:
from __future__ import print_function
from builtins import range
import airflow
from pprint import pprint
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
import time
from pprint import pprint
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='REDSHIFT3', default_args=args,
schedule_interval=None)
def get_data(ds, **kwargs):
pprint(kwargs)
run_this = PythonOperator(
task_id='get_data_redshift',
provide_context=True,
python_callable=get_data,
dag=dag)
source to share
Hey, I had a related issue - the same error, but not during backfill. When my cluster was under constant high load (> 50 workers, hundreds of tasks running concurrently), my database reached its maximum CPU utilization.
For me it was due to my RDS (t2) instance running low on CPU and throttling. Providing a larger instance solved this problem for me.
Even if you are not using AWS, I would double check that your database is not running out of some resource constraints like CPU or I / O. I am guessing this is causing a race condition where the scheduler tries to change the state of the TaskInstance to QUEUED and sends the task message to the message queue before the database actually commits the state change. Hope this helps someone out there.
source to share