ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] TriggerDagRunOperator execution_delta, execution_date_fn 사용하기
    공부/데이터 2022. 2. 20. 01:14

    TriggerDagRunOperator는 여기에서 설명이 되어 있습니다.

    TriggerDagRunOperator를 사용하기 위해선 참조할 dag의 schedule interval과 동일해야 합니다. 만약 참조할 dag와 다른 schedule interval을 사용할 경우 execution_deltaexecution_date_fn 둘 중 1개를 사용해야 합니다.

    execution_delta 옵션은 datetime 모듈의 timedelta 클래스를 사용하는 옵션이고 execution_date_fn 은 함수를 입력하여 함수식을 작성하는 옵션입니다.

    예시

    참조할 dag의 속성이 아래와 같이 설정되어 있다고 가정합니다.

    DAG(
        schedule_interval="0 * * * *",
        dag_id="parent_dag",
    )

    해당 dag는 매 시 정각마다 실행이 됩니다. 해당 dag를 참조하는 부분에서는 정각이 아닌 다른 schedule interval을 사용한다면 아래와 같이 사용할 수 있습니다.

    execution_date_fn

    매 시 30분마다 실행을 한다 가정하면 아래와 같이 설정할 수 있습니다.

    from datetime import timedelta
    
    dag=DAG(
        schedule_interval="30 * * * *",
        dag_id="execution_date_fn_test",
    )
    
    ExternalTaskSensor(
        timeout=60,
        mode="reschedule",
        check_existence=True,
        external_dag_id="parent_dag",
        failed_states=[State.FAILED],
            execution_date_fn=lambda x: x - timedelta(minutes=30),
            dag=dag
    )

    execution_date_fn에는 함수가 들어가면 되므로 def func(x): return x - timedelta(minutes=30) 와 같이 정의한 후, func 를 입력하거나 위의 예제처럼 람다식을 사용하여 바로 입력하면 됩니다.

    만약 2022-01-01 00:30:00 에 실행이 된다면 parent_dag의 2022-01-01 00:00:00 가 존재 하는지, 성공 했는지를 확인합니다.

    • 여기선 예시가 쉽게 run id, execution date 개념은 제외

    execution_delta

    매 16시 30분마다 실행을 한다 가정하면 아래와 같이 설정할 수 있습니다.

    from datetime import timedelta
    
    dag=DAG(
        schedule_interval="30 16 * * *",
        dag_id="execution_delta_test",
    )
    
    ExternalTaskSensor(
        timeout=60,
        mode="reschedule",
        check_existence=True,
        external_dag_id="parent_dag",
        failed_states=[State.FAILED],
            execution_delta=timedelta(hours=23, minutes=30)
            dag=dag
    )

    execution_delta의 경우, datetime 모듈의 timedelta 클래스를 사용합니다. 해당 옵션은 사용하기 편하지만 단점으로 시간을 뺄 수 없습니다.

    매일 16시 30분 마다 실행이 되므로 2022-01-02 16:30:00 에 실행이 됐다면 run id는 2022-01-01 16:30:00 이 됩니다. parent_dag의 2022-01-02 16:00:00 이 실행이 됐는지 비교를 하고 싶으므로

    23시간 30분을 더해줘야 합니다.

    요약

    참조할 dag와 다른 schedule interval을 지정할 수 있는 옵션은 execution_deltaexecution_date_fn 이지만 execution_delta 는 음수가 적용되지 않으므로 execution_date_fn 만 사용하는 것이 좋다.

    댓글