-
[Airflow] DAG에서 다른 DAG 호출하기 (DAG 종속성)공부/데이터 2021. 11. 20. 20:43
dag를 설계할 때 dag끼리 종속성을 갖지 않는 것이 가장 좋지만 어쩔 수 없이 종속성을 만들어야 하는 경우가 있습니다. 아래와 같은 상황일 때, dag의 종속성을 갖는 것이 유용하게 사용됩니다.
- 두 dag는 종속되지만 일정이 다름
- 두 dag는 종속되지만 서로 다른 팀에서 소유
- task는 다른 task에 종속되지만 execution_date가 다름
여기서는 dag에서 다른 dag를 호출하는 방법을 설명합니다.
SubDAG를 사용하여 DAG 종속성을 처리할 수도 있지만 SubDAG가 성능 문제를 일으킬 수도 있으므로 dag 종속성으로 처리하는 것을 권장합니다.
TriggerDagRunOperator
TriggerDagRunOperator
는 dag의 종속성을 구현하는 쉬운 방법입니다. 해당 operator를 사용하면 동일한 airflow 환경에서 다른 dag를 실행할 수 있습니다.TriggerDagRunOperator
는 task에서 종속되어 있는 DAG를 호출시킬 수 있습니다. 해당 operator는 subDAG를 이상적으로 교체할 수 있습니다.from datetime import datetime from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator args = { 'owner': 'brownbear', } dag = DAG( dag_id='trigger_main', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0), schedule_interval="@once", tags=['trigger'], ) t1 = TriggerDagRunOperator( trigger_dag_id='trigger', task_id='trigger', execution_date='{{ execution_date }}', wait_for_completion=True, poke_interval=30, reset_dag_run=True, dag=dag )
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator args = { 'owner': 'brownbear', } dag = DAG( dag_id='trigger', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0), schedule_interval=None, tags=['trigger'], ) start = DummyOperator( task_id='start', dag=dag ) b1 = BashOperator( task_id='bash', bash_command='echo 123', dag=dag ) end = DummyOperator( task_id='end', dag=dag ) start >> b1 >> end
호출 받는 dag에서는 호출이 될 때만 실행이 돼야 한다면 스케쥴을 등록할 필요가 없습니다.
위의 관계는 airflow UI의 browse > dag dependencies 메뉴에서 종속성을 확인할 수 있습니다.
TriggerDagRunOperator
는 아래와 같은 옵션이 존재합니다.trigger_dag_id
: 호출할 외부 dag idconf
: 외부 dag를 호출하기 위한 conifgexecution_date
: 외부 dag에 주입할 실행 날짜reset_dag_run
: 만약 부모 dag를 backfill했을 때, 연결된 외부 dag의 id가 이미 존재할 경우, 이를 지우고 실행할 지 여부값 (기본값 False)wait_for_completion
: 외부 dag가 완료될 때까지 기다릴 지 여부값 (기본값 False)poke_interval
:wait_for_completion
가 True일 때, 외부 dag의 상태값을 몇초마다 체크할 지 (기본값 60)allowed_states
: 허용하는 상태값 (기본값: ['success'])- 성공으로 판단할 state를 입력하는 부분으로 생각
failed_states
: 허용하지 않을 상태값 (기본값: ['failure'])- 실패로 판단할 state를 입력하는 부분으로 생각
ExternalTaskSensor
다음 방법으로
ExternalTaskSensor
를 호출될 dag 내에 추가하는 것입니다. 호출 될 dag는 부모 dag가 완료될 때까지 시작하지 않고 대기하게 됩니다.종속성 구현이 호출될 dag에서 구현되기 때문에 위에서 설명한
TriggerDagRunOperator
만큼 유연하지 않을 수 있지만 여러 dag에 종속되는 dag에서는 유용하게 사용할 수 있습니다.from datetime import datetime from airflow import DAG from airflow.operators.dummy import DummyOperator args = { 'owner': 'brownbear', } dag = DAG( dag_id='external_main', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0), schedule_interval="@once", tags=['external'], ) start = DummyOperator( task_id='start', dag=dag ) end = DummyOperator( task_id='end', dag=dag ) start >> end
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.sensors.external_task import ExternalTaskSensor args = { 'owner': 'brownbear', } dag = DAG( dag_id='external', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0), schedule_interval='@once', tags=['external'], ) downstream_task1 = ExternalTaskSensor( task_id="downstream_task1", timeout: 600, mode: 'reschedule', external_dag_id='external_main', failed_states=['failed'] ) start = DummyOperator( task_id='start', dag=dag ) b1 = BashOperator( task_id='bash', bash_command='echo 123', dag=dag ) end = DummyOperator( task_id='end', dag=dag ) start >> downstream_task1 >> b1 >> end
여기서 중요한 점은 종속성이 연결되어 있는 dag에 scheculder가 존재해야 합니다. 또한 sensor이기 때문에 무한정 대기를 원하지 않는다면 mode 또는 timeout을 설정해야 합니다.
위의 관계는 airflow UI의 browse > dag dependencies 메뉴에서 종속성을 확인할 수 있습니다.
위에서는 특정 task 대신 dag가 완료될 때까지 기다렸다가 실행하도록 한 예시인데 특정 task가 끝난 다음, 실행하려면
external_task_id
를 주입하면 됩니다.여기서 중요한 점은 업스트림 dag(external_main)과 다운스트림 DAG(external)은 schedule_interval이 같아야 합니다.
ExternalTaskSensor
가 동일한execution_date
에서 지정된 task 나 DAG를 찾기 때문입니다.ExternalTaskSensor
는 아래와 같은 옵션이 존재합니다.external_dag_id
: 확인할 외부 dag idexternal_task_id
: 확인할 외부 task id. task id가 입력되면 외부 task가 종료되어야 실행을 함. 만약 입력되지 않으면 외부 dag가 종료될 때까지 대기 (기본값 None)allowed_states
: 허용하는 상태값 (기본값: ['success'])- 성공으로 판단할 state를 입력하는 부분으로 생각
failed_states
: 허용하지 않을 상태값 (기본값: None)- 실패로 판단할 state를 입력하는 부분으로 생각
check_existence
: 외부 dag id나 task id가 존재하는지 체크.external_task_id
가 None이 아니면 외부 task id를 체크하고 None이면 외부 dag id를 체크 (기본값: False)execution_delta
: datetime.timedelta(days=1) 와 같이 이전 실행과의 시간차이를 입력 (기본값 execution_date)- 다른 날짜에 외부 작업이 완료되었는지 확인할 때 쓰는 것으로 생각
Airflow API
2.0 이후 제공되는 REST API를 사용하여 다른 환경의 dag를 호출할 수도 있습니다.
- 명세서: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview/CRUD-Operations
- 호출 가이드 문서: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/post_dag_run
TriggerDagRunOperator 와 ExternalTaskSensor 차이점
TriggerDagRunOperator
은 업스트림의 dag에 호출할 dag를 직접 명시하여 호출하는 방식이고ExternalTaskSensor
은 호출 되어야 하는 다운스트림의 dag에 명시하는 방식입니다.TriggerDagRunOperator
방식의 장점은 호출하고자 하는 dag를 즉시 호출할 수 있습니다. 또한 호출되는 다운스트림의 dag 결과에 따라서 업스트림 dag의 task에 영향을 줄 수 있습니다. 단점은 호출해야 하는 dag가 추가된다면 업스트림의 dag에 task가 추가되므로 잘 동작하는 기존 코드를 수정하는 문제가 있습니다. 즉, SOLID 원칙 중, OCP(개방 폐쇄 원칙)에 위배됩니다.ExternalTaskSensor
방식의 장점은 여러 업스트림의 dag 및 task에 의존성을 걸 수 있으며 호출해야 하는 dag가 추가된다 하더라도 기존의 업스트림 dag를 수정할 필요가 없습니다. 단점으로는 동일한 시작날짜 및 schedule_interval을 추가해야 하며 업스트림의 dag가 평소보다 늦게 완료된다면 불필요하게 dag가 실행되어 대기를 하게 됩니다.TriggerDagRunOperator
를 사용하기 좋은 상황은 다음과 같습니다.- 업스트림 dag와 다운스트림 dag가 1:1 관계
- 다운스트림 dag의 결과가 업스트림 dag의 task의 상태에 영향을 주어야 함
- 개발된 이후, 호출해야 하는 다운스트림 dag가 추가될 가능성이 적음
- 다운스트림 dag는 업스트림 dag가 실행될 때 바로 실행이 되어야 함
ExternalTaskSensor
를 사용하기 좋은 상황은 다음과 같습니다.- 업스트림 dag와 다운스트림 dag의 관계가 N:1 관계
- 다운스트림 dag의 결과가 업스트림 dag의 상태에 영향을 주지 않음
- 개발된 이후, 호출해야 하는 다운스트림 dag가 추가될 가능성이 큼
- 업스트림 dag의 schedule_interval과 다운스트림 dag의 schedule_interval이 동일함
결과적으로
TriggerDagRunOperator
가ExternalTaskSensor
보다 더 강한 종속성 관계를 가지는 것으로 볼 수 있습니다.