How to trigger Airflow -dag using TriggerDagRunOperator

I found the following link:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

which really explains how to use TriggerDagRunOperator

Airflow to execute a separate dump. The documentation uses their own Airflow air sampling examples, but it's hard for me to understand them as they don't use any sensors.

Can someone explain how to start a separate dag using TriggerDagRunOperator

and SqlSensor

? I am trying to start a separate DAG when my SQL Server job task has completed. I know how to check the status of a SQL Server job using SqlSensor

, but I don't know how to attach the result to in TriggerDagRunOperator

order to start a separate DAG.

I don't want to use CLI Airflow or do both in the same DAG. Basically, I want this to be just a trigger.

Below is my current code missing the key conditionally_trigger

# File Name: check-when-db1-sql-task-is-done

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime


default_args = {
        'owner': 'airflow',
        'retry_delay': timedelta(minutes=5),
}

dag = DAG('check-when-db1-sql-task-is-done',
        description='Check-when-DB1-SQL-task-is-done',
        default_args=default_args,
        schedule_interval='@once',
        start_date=datetime.now(),
        )

# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
        task_id='sql-sensor',
        poke_interval=30,
        timeout=3200,
        sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,    
        mssql_conn_id='db1',
        dag=dag,
        )

# dag-to-start
trigger = TriggerDagRunOperator (
        task_id='start-ssh-job',
        trigger_dag_id="qa-knime-ssh-task",
        python_callable=conditionally_trigger,
        params={'condition_param': True,
                'message': 'Hello World'},
        dag=dag)

      

+3


source to share


1 answer


I understand that TriggerDagRunOperator

- this is when you want to use a python function to determine if SubDag should be run. This function is named conditionally_trigger

in your code and examples.

In your case, you are using a sensor for flow control and should not pass a function. You can use SubDagOperator

instead TriggerDagRunOperator

or pass a simple always-true function like python_callable

:



...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...

      

0


source







All Articles