Macros in the Airflow Python Statement
Can I use macros with PythonOperator? I tried the following steps, but I was unable to get the macros:
dag = DAG(
'temp',
default_args=default_args,
description='temp dag',
schedule_interval=timedelta(days=1))
def temp_def(a, b, **kwargs):
print '{{ds}}'
print '{{execution_date}}'
print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = PythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
source to share
Macros are processed only for template fields. To force Jinja to handle this field, extend PythonOperator
your.
class MyPythonOperator(PythonOperator):
template_fields = ('templates_dict','op_args')
I added 'templates_dict'
to template_fields
because it myself PythonOperator
has this templated field:
PythonOperator
You should now be able to use a macro on this field:
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)
source to share
In my opinion, a more Airflow-appropriate way would be to use the included PythonOperator and use the parameter provide_context=True
as such.
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
provide_context=True,
dag=dag)
You now have access to all macros, airflow metadata and task parameters in kwargs
your callable
def temp_def(**kwargs):
print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))
If you have a specific user params
associated with a task, you can also access them viakwargs['params']
source to share