Airflow backfill filling even though the test is working properly

I am trying to execute a DAG with only one PythonOperator. When I try to check it works fine and also when I tried Airflow without CeleryExecutor and it worked fine.

But with an error, no descriptive error occurs when I try to fill it in Airflow that works with CeleryExecutor:

airflow@ip:/home/admin$ airflow backfill REDSHIFT3 -s 2017-05-10
[2017-05-22 14:41:14,373] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-05-22 14:41:14,432] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-05-22 14:41:14,452] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-05-22 14:41:14,616] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2017-05-22 14:41:14,994] {models.py:1126} INFO - Dependencies all met for <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [scheduled]>
[2017-05-22 14:41:15,000] {base_executor.py:50} INFO - Adding to queue: airflow run REDSHIFT3 get_data_redshift 2017-05-10T00:00:00 --pickle 81 --local
[2017-05-22 14:41:19,893] {celery_executor.py:78} INFO - [celery] queuing (u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0)) through celery, queue=default
[2017-05-22 14:41:20,598] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:20,607] {jobs.py:1978} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-05-22 14:41:24,954] {jobs.py:1725} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:24,954] {models.py:1417} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
None
[2017-05-22 14:41:24,954] {models.py:1441} INFO - Marking task as FAILED.
[2017-05-22 14:41:25,037] {models.py:1462} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:25,042] {jobs.py:1690} ERROR - Task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [failed]> failed
[2017-05-22 14:41:25,044] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:25,047] {models.py:4064} INFO - Marking run <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> failed
[2017-05-22 14:41:25,087] {jobs.py:1978} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 167, in backfill
    pool=args.pool)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 3330, in run
    job.run()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2021, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
Some task instances failed:
set([(u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0))])

      

Here is the DAG I'm trying to accomplish:

from __future__ import print_function    
from builtins import range    
import airflow    
from pprint import pprint    
from airflow.operators.bash_operator import BashOperator    
from airflow.hooks.postgres_hook import PostgresHook    
from airflow.operators.python_operator import PythonOperator    
from airflow.models import DAG

import time
from pprint import pprint

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='REDSHIFT3', default_args=args,
    schedule_interval=None)

def get_data(ds, **kwargs):
    pprint(kwargs)

run_this = PythonOperator(
    task_id='get_data_redshift',
    provide_context=True,
    python_callable=get_data,
    dag=dag)    

      

+3


source to share


1 answer


Hey, I had a related issue - the same error, but not during backfill. When my cluster was under constant high load (> 50 workers, hundreds of tasks running concurrently), my database reached its maximum CPU utilization.

For me it was due to my RDS (t2) instance running low on CPU and throttling. Providing a larger instance solved this problem for me.



Even if you are not using AWS, I would double check that your database is not running out of some resource constraints like CPU or I / O. I am guessing this is causing a race condition where the scheduler tries to change the state of the TaskInstance to QUEUED and sends the task message to the message queue before the database actually commits the state change. Hope this helps someone out there.

0


source







All Articles