ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] DAG에서 다른 DAG 호출하기 (DAG 종속성)
    공부/데이터 2021. 11. 20. 20:43

    dag를 설계할 때 dag끼리 종속성을 갖지 않는 것이 가장 좋지만 어쩔 수 없이 종속성을 만들어야 하는 경우가 있습니다. 아래와 같은 상황일 때, dag의 종속성을 갖는 것이 유용하게 사용됩니다.

    • 두 dag는 종속되지만 일정이 다름
    • 두 dag는 종속되지만 서로 다른 팀에서 소유
    • task는 다른 task에 종속되지만 execution_date가 다름

    여기서는 dag에서 다른 dag를 호출하는 방법을 설명합니다.

    SubDAG를 사용하여 DAG 종속성을 처리할 수도 있지만 SubDAG가 성능 문제를 일으킬 수도 있으므로 dag 종속성으로 처리하는 것을 권장합니다.

    TriggerDagRunOperator

    TriggerDagRunOperator는 dag의 종속성을 구현하는 쉬운 방법입니다. 해당 operator를 사용하면 동일한 airflow 환경에서 다른 dag를 실행할 수 있습니다.

    TriggerDagRunOperator는 task에서 종속되어 있는 DAG를 호출시킬 수 있습니다. 해당 operator는 subDAG를 이상적으로 교체할 수 있습니다.

    from datetime import datetime
    
    from airflow import DAG
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    args = {
        'owner': 'brownbear',
    }
    
    dag = DAG(
        dag_id='trigger_main', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0),
        schedule_interval="@once", tags=['trigger'],
    )
    
    t1 = TriggerDagRunOperator(
        trigger_dag_id='trigger',
        task_id='trigger',
        execution_date='{{ execution_date }}',
        wait_for_completion=True,
        poke_interval=30,
        reset_dag_run=True,
        dag=dag
    )

    from datetime import datetime
    
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy import DummyOperator
    
    args = {
        'owner': 'brownbear',
    }
    
    dag = DAG(
        dag_id='trigger', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0),
        schedule_interval=None, tags=['trigger'],
    )
    
    start = DummyOperator(
        task_id='start',
        dag=dag
    )
    
    b1 = BashOperator(
        task_id='bash',
        bash_command='echo 123',
        dag=dag
    )
    
    end = DummyOperator(
        task_id='end',
        dag=dag
    )
    
    start >> b1 >> end

    호출 받는 dag에서는 호출이 될 때만 실행이 돼야 한다면 스케쥴을 등록할 필요가 없습니다.

    위의 관계는 airflow UI의 browse > dag dependencies 메뉴에서 종속성을 확인할 수 있습니다.

    TriggerDagRunOperator는 아래와 같은 옵션이 존재합니다.

    • trigger_dag_id: 호출할 외부 dag id
    • conf: 외부 dag를 호출하기 위한 conifg
    • execution_date: 외부 dag에 주입할 실행 날짜
    • reset_dag_run: 만약 부모 dag를 backfill했을 때, 연결된 외부 dag의 id가 이미 존재할 경우, 이를 지우고 실행할 지 여부값 (기본값 False)
    • wait_for_completion: 외부 dag가 완료될 때까지 기다릴 지 여부값 (기본값 False)
    • poke_interval: wait_for_completion 가 True일 때, 외부 dag의 상태값을 몇초마다 체크할 지 (기본값 60)
    • allowed_states: 허용하는 상태값 (기본값: ['success'])
      • 성공으로 판단할 state를 입력하는 부분으로 생각
    • failed_states: 허용하지 않을 상태값 (기본값: ['failure'])
      • 실패로 판단할 state를 입력하는 부분으로 생각

    ExternalTaskSensor

    다음 방법으로 ExternalTaskSensor 를 호출될 dag 내에 추가하는 것입니다. 호출 될 dag는 부모 dag가 완료될 때까지 시작하지 않고 대기하게 됩니다.

    종속성 구현이 호출될 dag에서 구현되기 때문에 위에서 설명한 TriggerDagRunOperator 만큼 유연하지 않을 수 있지만 여러 dag에 종속되는 dag에서는 유용하게 사용할 수 있습니다.

    from datetime import datetime
    
    from airflow import DAG
    from airflow.operators.dummy import DummyOperator
    
    args = {
        'owner': 'brownbear',
    }
    
    dag = DAG(
        dag_id='external_main', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0),
        schedule_interval="@once", tags=['external'],
    )
    
    start = DummyOperator(
        task_id='start',
        dag=dag
    )
    
    end = DummyOperator(
        task_id='end',
        dag=dag
    )
    
    start >> end

    from datetime import datetime
    
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy import DummyOperator
    from airflow.sensors.external_task import ExternalTaskSensor
    
    args = {
        'owner': 'brownbear',
    }
    
    dag = DAG(
        dag_id='external', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0),
        schedule_interval='@once', tags=['external'],
    )
    
    downstream_task1 = ExternalTaskSensor(
        task_id="downstream_task1",
            timeout: 600,
            mode: 'reschedule',
        external_dag_id='external_main',
        failed_states=['failed']
    )
    
    start = DummyOperator(
        task_id='start',
        dag=dag
    )
    
    b1 = BashOperator(
        task_id='bash',
        bash_command='echo 123',
        dag=dag
    )
    
    end = DummyOperator(
        task_id='end',
        dag=dag
    )
    
    start >> downstream_task1 >> b1 >> end

    여기서 중요한 점은 종속성이 연결되어 있는 dag에 scheculder가 존재해야 합니다. 또한 sensor이기 때문에 무한정 대기를 원하지 않는다면 mode 또는 timeout을 설정해야 합니다.

    위의 관계는 airflow UI의 browse > dag dependencies 메뉴에서 종속성을 확인할 수 있습니다.

    위에서는 특정 task 대신 dag가 완료될 때까지 기다렸다가 실행하도록 한 예시인데 특정 task가 끝난 다음, 실행하려면 external_task_id 를 주입하면 됩니다.

    여기서 중요한 점은 업스트림 dag(external_main)과 다운스트림 DAG(external)은 schedule_interval이 같아야 합니다. ExternalTaskSensor가 동일한 execution_date에서 지정된 task 나 DAG를 찾기 때문입니다.

    ExternalTaskSensor 는 아래와 같은 옵션이 존재합니다.

    • external_dag_id: 확인할 외부 dag id
    • external_task_id: 확인할 외부 task id. task id가 입력되면 외부 task가 종료되어야 실행을 함. 만약 입력되지 않으면 외부 dag가 종료될 때까지 대기 (기본값 None)
    • allowed_states: 허용하는 상태값 (기본값: ['success'])
      • 성공으로 판단할 state를 입력하는 부분으로 생각
    • failed_states: 허용하지 않을 상태값 (기본값: None)
      • 실패로 판단할 state를 입력하는 부분으로 생각
    • check_existence: 외부 dag id나 task id가 존재하는지 체크. external_task_id 가 None이 아니면 외부 task id를 체크하고 None이면 외부 dag id를 체크 (기본값: False)
    • execution_delta: datetime.timedelta(days=1) 와 같이 이전 실행과의 시간차이를 입력 (기본값 execution_date)
      • 다른 날짜에 외부 작업이 완료되었는지 확인할 때 쓰는 것으로 생각

    Airflow API

    2.0 이후 제공되는 REST API를 사용하여 다른 환경의 dag를 호출할 수도 있습니다.

    TriggerDagRunOperator 와 ExternalTaskSensor 차이점

    TriggerDagRunOperator 은 업스트림의 dag에 호출할 dag를 직접 명시하여 호출하는 방식이고

    ExternalTaskSensor은 호출 되어야 하는 다운스트림의 dag에 명시하는 방식입니다.

    TriggerDagRunOperator 방식의 장점은 호출하고자 하는 dag를 즉시 호출할 수 있습니다. 또한 호출되는 다운스트림의 dag 결과에 따라서 업스트림 dag의 task에 영향을 줄 수 있습니다. 단점은 호출해야 하는 dag가 추가된다면 업스트림의 dag에 task가 추가되므로 잘 동작하는 기존 코드를 수정하는 문제가 있습니다. 즉, SOLID 원칙 중, OCP(개방 폐쇄 원칙)에 위배됩니다.

    ExternalTaskSensor 방식의 장점은 여러 업스트림의 dag 및 task에 의존성을 걸 수 있으며 호출해야 하는 dag가 추가된다 하더라도 기존의 업스트림 dag를 수정할 필요가 없습니다. 단점으로는 동일한 시작날짜 및 schedule_interval을 추가해야 하며 업스트림의 dag가 평소보다 늦게 완료된다면 불필요하게 dag가 실행되어 대기를 하게 됩니다.

    TriggerDagRunOperator 를 사용하기 좋은 상황은 다음과 같습니다.

    • 업스트림 dag와 다운스트림 dag가 1:1 관계
    • 다운스트림 dag의 결과가 업스트림 dag의 task의 상태에 영향을 주어야 함
    • 개발된 이후, 호출해야 하는 다운스트림 dag가 추가될 가능성이 적음
    • 다운스트림 dag는 업스트림 dag가 실행될 때 바로 실행이 되어야 함

    ExternalTaskSensor 를 사용하기 좋은 상황은 다음과 같습니다.

    • 업스트림 dag와 다운스트림 dag의 관계가 N:1 관계
    • 다운스트림 dag의 결과가 업스트림 dag의 상태에 영향을 주지 않음
    • 개발된 이후, 호출해야 하는 다운스트림 dag가 추가될 가능성이 큼
    • 업스트림 dag의 schedule_interval과 다운스트림 dag의 schedule_interval이 동일함

    결과적으로 TriggerDagRunOperatorExternalTaskSensor 보다 더 강한 종속성 관계를 가지는 것으로 볼 수 있습니다.

    댓글