-
[Airflow] TriggerDagRunOperator execution_delta, execution_date_fn 사용하기공부/데이터 2022. 2. 20. 01:14
TriggerDagRunOperator는 여기에서 설명이 되어 있습니다.
TriggerDagRunOperator를 사용하기 위해선 참조할 dag의 schedule interval과 동일해야 합니다. 만약 참조할 dag와 다른 schedule interval을 사용할 경우
execution_delta
와execution_date_fn
둘 중 1개를 사용해야 합니다.execution_delta
옵션은 datetime 모듈의 timedelta 클래스를 사용하는 옵션이고execution_date_fn
은 함수를 입력하여 함수식을 작성하는 옵션입니다.예시
참조할 dag의 속성이 아래와 같이 설정되어 있다고 가정합니다.
DAG( schedule_interval="0 * * * *", dag_id="parent_dag", )
해당 dag는 매 시 정각마다 실행이 됩니다. 해당 dag를 참조하는 부분에서는 정각이 아닌 다른 schedule interval을 사용한다면 아래와 같이 사용할 수 있습니다.
execution_date_fn
매 시 30분마다 실행을 한다 가정하면 아래와 같이 설정할 수 있습니다.
from datetime import timedelta dag=DAG( schedule_interval="30 * * * *", dag_id="execution_date_fn_test", ) ExternalTaskSensor( timeout=60, mode="reschedule", check_existence=True, external_dag_id="parent_dag", failed_states=[State.FAILED], execution_date_fn=lambda x: x - timedelta(minutes=30), dag=dag )
execution_date_fn에는 함수가 들어가면 되므로
def func(x): return x - timedelta(minutes=30)
와 같이 정의한 후,func
를 입력하거나 위의 예제처럼 람다식을 사용하여 바로 입력하면 됩니다.만약
2022-01-01 00:30:00
에 실행이 된다면 parent_dag의2022-01-01 00:00:00
가 존재 하는지, 성공 했는지를 확인합니다.- 여기선 예시가 쉽게 run id, execution date 개념은 제외
execution_delta
매 16시 30분마다 실행을 한다 가정하면 아래와 같이 설정할 수 있습니다.
from datetime import timedelta dag=DAG( schedule_interval="30 16 * * *", dag_id="execution_delta_test", ) ExternalTaskSensor( timeout=60, mode="reschedule", check_existence=True, external_dag_id="parent_dag", failed_states=[State.FAILED], execution_delta=timedelta(hours=23, minutes=30) dag=dag )
execution_delta의 경우, datetime 모듈의 timedelta 클래스를 사용합니다. 해당 옵션은 사용하기 편하지만 단점으로 시간을 뺄 수 없습니다.
매일 16시 30분 마다 실행이 되므로
2022-01-02 16:30:00
에 실행이 됐다면 run id는2022-01-01 16:30:00
이 됩니다. parent_dag의2022-01-02 16:00:00
이 실행이 됐는지 비교를 하고 싶으므로23시간 30분을 더해줘야 합니다.
요약
참조할 dag와 다른 schedule interval을 지정할 수 있는 옵션은
execution_delta
와execution_date_fn
이지만execution_delta
는 음수가 적용되지 않으므로execution_date_fn
만 사용하는 것이 좋다.