Airflow: pass {{ds}} as parameter for PostgresOperator

I would like to use the due date as a parameter for my sql file:

I tried

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)

      

but that won't work.

+6


source to share


2 answers


dt = '{{ ds }}'

Doesn't work because Jinja (templating engine used in airflow) doesn't process the whole Dag definition file.

For each Operator

there are fields that Jinja will handle, which are part of the definition of the operator itself.

In this case, you can make a field params

(which is actually called parameters

, be sure to change that) if you stretch PostgresOperator

like this:



class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

      

Now you should be able to do this:

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    parameters={'file': '{{ ds }}'},
    dag=dag
)

      

+16


source


PostgresOperator / JDBCOperator inherits from BaseOperator.
One of the input parameters of BaseOperator is params:self.params = params or {} # Available in templates!


This way you should be able to use it without creating a new class:
(although the parameters are not included in template_fields)t1 = JdbcOperator( task_id='copy', sql='copy.sql', jdbc_conn_id='connection_name', params={'schema_name':'public'}, dag=dag )

The SQL statement (copy.sql) might look like this: copy {{ params.schema_name }}.table_name from 's3://.../table_name.csv' iam_role 'arn:aws:iam::<acc_num>:role/<role_name>' csv IGNOREHEADER 1



Notes:

copy.sql is in the same location as the DAG.
OR
you can define the "template_searchpath" variable in "default_args"
and specify the absolute path to the folder where the template file is located.
For example: 'template_searchpath': '/ home / user / airflow / templates /'

0


source







All Articles