Macros in the Airflow Python Statement

Can I use macros with PythonOperator? I tried the following steps, but I was unable to get the macros:

dag = DAG(
    'temp',
    default_args=default_args,
    description='temp dag',
    schedule_interval=timedelta(days=1))

def temp_def(a, b, **kwargs):
    print '{{ds}}'
    print '{{execution_date}}'
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = PythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

      

+13


source to share


2 answers


Macros are processed only for template fields. To force Jinja to handle this field, extend PythonOperator

your.

class MyPythonOperator(PythonOperator):
    template_fields = ('templates_dict','op_args')

      

I added 'templates_dict'

to template_fields

because it myself PythonOperator

has this templated field: PythonOperator



You should now be able to use a macro on this field:

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

      

+21


source


In my opinion, a more Airflow-appropriate way would be to use the included PythonOperator and use the parameter provide_context=True

as such.

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    provide_context=True,
    dag=dag)

      

You now have access to all macros, airflow metadata and task parameters in kwargs

your callable



def temp_def(**kwargs):
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))

      

If you have a specific user params

associated with a task, you can also access them viakwargs['params']

+9


source







All Articles