ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] DAGs
    공부/데이터 2021. 10. 31. 19:08

    DAG 선언 방법

    context manager

    with DAG("my_dag_name") as dag:
        op = DummyOperator(task_id="task")

    class

    my_dag = DAG("my_dag_name")
    op = DummyOperator(task_id="task", dag=my_dag)

    decorator

    2.0 이후 버전

    @dag(start_date=days_ago(2))
    def generate_dag():
        op = DummyOperator(task_id="task")
    
    dag = generate_dag()

    @task 데코레이터와 함께 사용하면 코드가 깔끔해집니다.

    DAGS는 task없이 실행할 수 없습니다.

    task 의존성 표현 방법

    task는 일반적으로 다른 task들과 의존성을 갖고 있고 의존성을 표기하는 방법은 보통 <<, >> 연산자를 사용합니다.

    <<, >>와 set_upstream, set_downstream 함수를 사용하여 표현

     

    <<, >>

    d1 >> [d2, d3]
    d4 >> d3

    set_upstream, set_downstream

    d1.set_downstream([d2, d3])
    d3.set_upstream(d4)

    cross_downstream

    만약 task가 서로 각각 의존해야 된다면 cross_downstream을 사용하여 쉽게 표현할 수 있습니다.

    from airflow.models.baseoperator import cross_downstream
    
    # [op1, op2] >> op3
    # [op1, op2] >> op4
    cross_downstream(from_tasks=[d1, d2], to_tasks=[d3, d4])

    chain

    chain 함수를 사용하여 의존성을 쉽게 표현할 수도 있습니다.

    from airflow.models.baseoperator import chain
    
    # op1 >> op2 >> op3 >> op4
    chain(op1, op2, op3, op4)

    아래와 같이 동적으로도 사용할 수 있습니다.

    chain([DummyOperator(task_id='op' + i) for i in range(1, 6)])

    또한 동일한 사이즈의 리스트에 대해 쌍(pairwise)으로 의존성을 표현할 수 있습니다.

    # Replaces
    # op1 >> op2 >> op4 >> op6
    # op1 >> op3 >> op5 >> op6
    chain(op1, [op2, op3], [op4, op5], op6)

    cross_downstream과 chain 차이

    cross_downstream은 from_tasks * to_tasks 로 표현이 된다면 chain은 리스트의 동일한 인덱스끼리만 방향성을 표현합니다.

    cross_downstream(from_tasks=[d1, d2], to_tasks=[d3, d4]) 와 chain([d1, d2], [d3, d4])

    DAGs 로딩

    airflow에서는 dags 폴더 하위의 각 파일을 가져와 실행한 다음, 해당 파일에서 DAG 객체들을 로드합니다.

    파이썬 파일 당 여러 DAG를 선언하거나 복잡한 DAG 1개를 여러 파일로 쪼개서 관리할 수 있습니다. 여기서 중요한 점은 airflow가 파이썬 파일에서 DAG를 로드할 때, 최상위 수준의 객체(global())만 가져오는 것입니다.

    아래와 같은 예시에서는 함수 내의 dag는 airflow가 로드하지 못합니다.

    dag_1 = DAG('this_dag_will_be_discovered')
    
    def my_function():
        dag_2 = DAG('but_this_dag_will_not')
    
    my_function()

    .airflowignore 파일을 제공해서 무시할 파일을 설정할 수도 있습니다.

    DAGs 실행

    DAG의 실행은 API 또는 수동으로 실행시키거나 정의된 스케쥴에 의해 실행할 수 있습니다. DAG에서는 스케쥴 선언이 필수는 아니지만 일반적으로는 선언을 합니다.

    with DAG("my_daily_dag", schedule_interval="@daily"):
    
    또는
    
    with DAG("my_daily_dag", schedule_interval="0 * * * *"):

    DAG를 실행할 때마다 airflow가 DAG run 이라 부르는 새로운 인스턴스가 생성됩니다. DAG run은 동일한 DAG를 병렬로 실행할 수 있고 논리적으로 날짜와 시간을 구분하는 execution_date 값에 따라서 실행됩니다.

    • 실행이 시작되는 실제 시간이 아님

    default arguments

    DAG 내 operator에 같은 default arguments(start_date 등)을 선언하기 위해 DAG에 미리 선언을 할 수 있습니다.

    default_args = {
        'start_date': datetime(2016, 1, 1),
        'owner': 'airflow'
    }
    
    with DAG('my_dag', default_args=default_args) as dag:
        op = DummyOperator(task_id='dummy')
        print(op.owner)  # "airflow"

    DAG 데코레이터

    2.0 버전 이후에만 사용 가능

    DAG를 선언하는 새로운 방법으로 기존 기능은 그대로 사용하면서 깔끔하게 코드를 만들 수 있습니다. 주의할 점은 데코레이터가 붙은 함수를 global 영역에서 1번 선언하여 호출이 되도록 해야 합니다.

    https://airflow.apache.org/docs/apache-airflow/2.1.2/_modules/airflow/example_dags/example_dag_decorator.html

    흐름 제어

    기본적으로 DAG는 성공에 의존하여 모든 task를 실행시킵니다. 이러한 기능을 변경하는 방법은 다음과 같습니다.

    Branching

    조건에 따라 task를 선택할 수 있는 기능으로 if else 조건문과 동일한 기능을 보여줍니다.

    BranchPythonOperator 클래스를 사용하여 구현하는데 이 클래스는 python_callable 인자에서 호출 이후 task_id를 반환한다는 점을 제외하고 PythonOperator 클래스와 매우 유사(해당 클래스를 상속받았기 때문)합니다. 반환된 task_id에 해당하는 task를 실행하고 다른 경로들은 전부 건너뛰게 됩니다.

    이러한 흐름을 위해 python_callable 에서 반환된 task_id는 직접 >>나 set_downstream으로 참조해줘야 합니다.

    def branching():
        return 'success'
    
    dag = DAG(
        dag_id='task_dependencies',
        start_date=days_ago(2),
        schedule_interval="@once",
        tags=['task연습']
    )
    
    run_this_first = DummyOperator(
        task_id='run_this_first',
        dag=dag,
    )
    
    check_situation = BranchPythonOperator(
        task_id='branching',
        python_callable=branching,
        dag=dag,
    )
    
    success = DummyOperator(
        task_id='success',
        dag=dag,
    )
    
    failure = DummyOperator(
        task_id='failure',
        dag=dag,
    )
    
    send_error = DummyOperator(
        task_id='send_error',
        dag=dag,
    )
    
    ## one_success로 해야 skip된 task를 무시함
    finish = DummyOperator(
        task_id='finish',
        trigger_rule='one_success',
        dag=dag,
    )
    
    run_this_first >> check_situation
    check_situation >> success >> finish
    check_situation >> failure >> send_error >> finish

    위 예시는 success만 나오도록 했으므로 실행하면 failure와 send_error는 건너뛰게 됩니다.

    만약 실패 시, 에러 발송은 별도의 로직으로 분리할 때, 다음과 같이 표현할 수 있습니다.

    run_this_first >> check_situation >> [success, failure] >> finish
    failure >> send_error

    또한 동적으로 upstream task에 따라서 Xcoms를 활용할 수 있습니다.

    def branch_func(ti):
        xcom_value = int(ti.xcom_pull(task_ids='start_task'))
        if xcom_value >= 5:
            return 'continue_task'
        else:
            return 'stop_task'
    
    start_op = BashOperator(
        task_id='start_task',
        bash_command="echo 5",
        do_xcom_push=True,
        dag=dag,
    )
    
    branch_op = BranchPythonOperator(
        task_id='branch_task',
        python_callable=branch_func,
        dag=dag,
    )
    
    continue_op = DummyOperator(task_id='continue_task', dag=dag)
    stop_op = DummyOperator(task_id='stop_task', dag=dag)
    
    start_op >> branch_op >> [continue_op, stop_op]

    만약 커스텀하게 분기처리 operator를 구현할 경우, BaseBranchOperator 클래스를 상속받으면 됩니다. 상속 한 다음 choose_branch 메소드를 구현해야 하며 task id를 반환해야 합니다.

    class MyBranchOperator(BaseBranchOperator):
        def choose_branch(self, context):
            if context['execution_date'].day == 1:
                return ['daily_task_id', 'monthly_task_id']
            else:
                return 'daily_task_id'

    Latest Only

    airflow의 dag run은 종종 현재 시점과 다른 날짜에 실행이 될 수 있습니다. 예를 들어, 과거 데이터를 채우기 위해 backfill을 실행 할 수 있습니다.

    위와 같은 예시에서 특정 과거 날짜 또는 전체 DAG가 run하는 것을 원하지 않을 수 있는데 이럴 때, LatestOnlyOperator를 사용합니다.

    이 특별한 operator는 최근 실행한 DAG가 아닐 경우, 모든 downstream의 작업을 건너뜁니다.

    dag= DAG(
        dag_id='latest_only_with_trigger',
        schedule_interval=dt.timedelta(hours=4),
        start_date=days_ago(2),
        tags=['example3'],
    )
    
    latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
    task1 = DummyOperator(task_id='task1', dag=dag)
    task2 = DummyOperator(task_id='task2', dag=dag)
    task3 = DummyOperator(task_id='task3', dag=dag)
    task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE, dag=dag)
    
    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]
    • task1latest_only 의 직접적인 downstream으로 최신 실행이 아닐 경우 전부 건너뜁니다.
    • task2latest_only 과 의존성이 없으므로 과거 실행이 됩니다.
    • task3task1task2 의 downstream입니다. task의 기본 trigger rule은 all_success 이므로 task2은 성공하여 task3이 실행되지만 task1은 최신 실행이 아닐 시, 건너 뛰므로 task3 또한 task1의 영향으로 건너뛰게 됩니다.
    • task4task1task2 downstream입니다. task3과 다르게 trigger rule이 all_done으로 설정했으므로 실행이 됩니다. all_done은 작업이 완료만 되면 되므로 성공, 실패, 스킵과 같은 결과에 영향을 받지 않습니다.

    Deponds On Past

    이전 DAG run에서 task가 성공하거나 건너뛴 경우에만 다음 DAG의 task에서 실행이 되도록 할 수도 있습니다. (이전에 실행한 task의 결과 여부에 따라 현재 실행 여부가 나뉨)

    처음 시작하거나 이전에 실행된 DAG가 없을 경우엔 무조건 실행이 됩니다.

    operator에 depends_on_past=True 옵션을 추가하면 됩니다.

    start_op = DummyOperator(
        task_id='start_task',
        depends_on_past=True,
        dag=dag,
    )

    Trigger Rules

    기본적으로 task는 upstream의 task가 성공할 때만 실행이 됩니다. trigger_rule 옵션을 추가하여 변경할 수 있습니다.

    • all_success (default): 모든 upstream의 task가 성공해야함
    • all_failed: 모든 upstream의 task가 failed 또는 upstream_failed 상태여야함
    • all_done: 모든 upstream의 task의 작업이 완료되어야 함 (성공, 실패 등의 결과는 상관없음)
    • one_failed: upstream의 task에서 1개 이상 실패가 발생하면 모든 upstream의 task가 완료될 때 까지 기다리지 않음
    • one_success: one_failed 와 반대로 upstream의 task에서 1개 이상 성공이 발생하면 모든 upstream의 task가 완료될 때 까지 기다리지 않음
    • none_failed: 모든 upstream의 task가 failed 또는 upstream_failed 가 아닌 상태여야 함. 즉, 모든 upstream의 task는 성공 또는 건너뜀 상태
    • none_failed_or_skipped: 모든 upstream의 task가 failed 또는 upstream_failed 가 아니며 1개 이상의 upstream의 task가 성공해야함
    • none_skipped: upstream의 task에서 skipped 이 없는 상태여야함. 즉, 모든 upstream의 task가 success, failed, upstream_failed 상태임
    • dummy: upstream task에 의존성이 없으며 항상 실행됨

    이러한 trigger rule은 위에서 설명한 Depends On Past와 결합해 사용할 수도 있습니다.

    trigger rule은 branching operation을 사용할 때, 조심해야 하는데 분기처리 이후 하나로 합쳐진 join task에서 all_success나 all_failed trigger rule을 선언한다면 건너뛴 task와 동일하게 건너뛰게 됩니다.

    이를 피하기 위해 join task의 trigger_rule을 none_failed_or_skipped 을 선언해야 합니다.

    Dynamic DAGs

    DAG는 파이썬 코드로 정의되므로 반복문, 함수 등 자유롭게 정의할 수 있습니다.

    with DAG("loop_example") as dag:
    
        first = DummyOperator(task_id="first")
        last = DummyOperator( task_id="last")
    
        options = ["branch_a", "branch_b", "branch_c", "branch_d"]
        for option in options:
            t = DummyOperator(task_id=option)
            first >> t >> last

    이러한 동적 DAG는 코드 가독성이 떨어지므로 실제 운영에서는 지양하는 것이 좋습니다.

    DAG Visualization

    DAG를 시각적 표현 방법은 두 가지 입니다.

    • airflow UI의 Graph View
    • $airflow dags show 명령어로 이미지 파일 렌더링

    DAG 내 모든 작업 인스턴스의 상태도 표시할 수 있는 airflow UI의 Graph View를 사용하는 것이 좋습니다.

    TaskGroups

    TaskGroup을 사용하여 Graph View에서 작업을 계층적으로 구성할 수 있습니다.

    subDAG와 다른 점으로 TaskGroup은 UI 상에서만 그룹화 개념을 보여줍니다. 해당 task들은 단일 DAG 안에서 실행되므로 DAG pool이나 DAG 설정이 단일 DAG로 적용됩니다.

    task_group = TaskGroup("group1", dag=dag)
    
    task1 = DummyOperator(task_id="task1", task_group=task_group, dag=dag)
    task2 = DummyOperator(task_id="task2", task_group=task_group, dag=dag)
    
    task3 = DummyOperator(task_id="task3", dag=dag)
    
    task_group >> task3

    TaskGroup으로 묶인 task들의 id는 group_id가 접두사로 붙게 됩니다. 이는 group_id, task_id의 고유성을 보장하는 기능입니다.

    만약 접두사를 비활성하려면 prefix_group_id=False 를 TaskGroup 옵션으로 추가하면 됩니다. 추가 후, task와 group의 id들이 고유한지 신경을 써야 합니다.

    Edge Labels

    task를 잇는 간선에 설명을 추가해 가독성을 향상할 수 있습니다.

    from airflow.utils.edgemodifier import Label
    
    task1 >> Label("중간") >> task2 >> Label("종료") >> task3
    
    또는 
    
    task1.set_downstream(task2, Label("start1"))
    task2.set_downstream(task3, Label("end2"))

    이렇게 label을 잘 사용하면 분기 처리하는 task를 쉽게 설명할 수 있습니다.

    DAG & Task 문서화

    웹 인터페이스에서 볼 수 있는 DAG 또는 task 객체에 문서를 추가할 수 있습니다.

    • airflow UI에서 DAG의 경우엔 Graph View, Tree View, task의 경우엔 Task Instance Details 에서 볼 수 있음

     

    DAG의 경우엔 doc_md 만 사용이 가능합니다. 이러한 문서화 기능은 config 파일로부터 동적으로 구성되는 task를 설명할 때 유용하게 쓸 수 있습니다.

    dag.doc_md = """\
    #dag 문서 예시"
    [네이버 링크](www.naver.com)
    """
    
    task1 = DummyOperator(task_id="task1", dag=dag)
    task2 = DummyOperator(task_id="task2", dag=dag)
    task3 = DummyOperator(task_id="task3", dag=dag)
    
    task1.doc_md = """\
    #task1 문서 예시"
    [구글 링크](www.google.com)
    """
    
    task1.set_downstream(task2, Label("start1"))
    task2.set_downstream(task3, Label("end2"))

    SubDAGs

    많은 작업을 하나의 논리적 단위로 그룹화하여 병렬로 처리할 때 유용한 기능입니다.SubDAG 기능을 사용하면 airflow UI 상에서 TaskGroup처럼 논리적인 단위로 묶어서 볼 수 있을 뿐만 아니라 별도의 DAG로 취급 받아 병렬 처리가 가능합니다.

    # subdag_operator.py
    
    from airflow import DAG
    from airflow.example_dags.subdags.subdag import subdag
    from airflow.operators.dummy import DummyOperator
    from airflow.operators.subdag import SubDagOperator
    from airflow.utils.dates import days_ago
    
    DAG_NAME = 'subdag_operator'
    
    args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        dag_id=DAG_NAME, default_args=args, start_date=days_ago(2),
        schedule_interval="@once", tags=['subdag']
    )
    
    start = DummyOperator(
        task_id='start',
        dag=dag,
    )
    
    section_1 = SubDagOperator(
        task_id='section-1',
        subdag=subdag(DAG_NAME, 'section-1', args),
        dag=dag,
    )
    
    some_other_task = DummyOperator(
        task_id='some-other-task',
        dag=dag,
    )
    
    section_2 = SubDagOperator(
        task_id='section-2',
        subdag=subdag(DAG_NAME, 'section-2', args),
        dag=dag,
    )
    
    end = DummyOperator(
        task_id='end',
        dag=dag,
    )
    
    start >> section_1 >> some_other_task >> section_2 >> end

    코드를 작성할 때, subdag는 함수 내부에 정의하여 global로 노출되지 않도록 주의해야 합니다. (global로 노출되면 airflow가 메인 dag로 인식해버림)

    # subdag.py
    
    from airflow import DAG
    from airflow.operators.dummy import DummyOperator
    from airflow.utils.dates import days_ago
    
    def subdag(parent_dag_name, child_dag_name, args):
        dag_subdag = DAG(
            dag_id=f'{parent_dag_name}.{child_dag_name}',
            default_args=args,
            start_date=days_ago(2),
            schedule_interval="@daily",
        )
    
        for i in range(5):
            DummyOperator(
                task_id='{}-task-{}'.format(child_dag_name, i + 1),
                default_args=args,
                dag=dag_subdag,
            )
    
        return dag_subdag

    아래는 subDAG를 작성할 때 유용한 정보입니다.

    • subDAG의 id는 상위 {DAG id}.{subDAG id} 로 짓는 것이 규칙임
    • subDAG에 arguments를 전달해서 상위 DAG의 arguments를 subDAG에 공유하여 사용해야 합니다.
    • SubDAG에는 schedule이 설정 및 활성화가 되어 있어야 합니다. 만약 None 이나 @once 일 경우, 실행되지 않고 성공으로 처리 합니다.
    • [SubDagOperator](https://airflow.apache.org/docs/apache-airflow/2.1.2/_api/airflow/operators/subdag/index.html#airflow.operators.subdag.SubDagOperator) 지우면 해당 subDAG 안의 task의 상태도 지워지게 됩니다.
    • [SubDagOperator](https://airflow.apache.org/docs/apache-airflow/2.1.2/_api/airflow/operators/subdag/index.html#airflow.operators.subdag.SubDagOperator) 에서 success로 표시해도 그 안의 작업 상태에는 영향을 미치지 않습니다.
    • subDAG 안에서 Depends On Past 기능을 킬 경우, 헷갈릴 수 있으므로 지양합니다.
    • subDAG에 executor를 지정할 수 있습니다.

    Packaging DAGs

    간단한 DAG의 경우, 1개의 파일로 제공할 수 있지만 복잡한 로직은 여러 파일로 분산해서 관리를 진행합니다. 표준 파일 시스템 레이아웃을 사용하여 DAG_FOLDER 내부에서 이 모든 작업을 수행하거나 DAG 및 모든 Python 파일을 단일 zip 파일로 패키징할 수 있습니다.

    예를 들어 아래 zip 파일로 필요한 의존성 파일과 함께 두 개의 dag를 제공할 수 있습니다.

    my_dag1.py
    my_dag2.py
    package1/__init__.py
    package1/functions.py

    패키징 DAG에는 아래와 같은 주의사항이 있습니다.

    • 직렬화를 사용할 수 없습니다.
    • 순수 파이썬만 가능합니다 (컴파일된 라이브러리 X)
    • 이 패키징 파이썬의 sys.path에 등록되고 aiflow의 다른 코드에서도 접근할 수 있으므로 이미 airflow에서 사용하고 있는 패키지들과 이름이 충돌하는지 주의해야 합니다.

    DAG Dependency

    DAG의 task 간 의존성은 upstream, downstream 관계를 통해 명시적이지만 DAG 끼리의 의존 관계는 복잡합니다. 한 DAG가 다른 DAG에 종속될 수 있는 방법은 두 가지 입니다.

    • triggering - TriggerDagRunOperator
    • waiting - ExternalTaskSensor

    추가적으로 한 DAG는 execution_date이 다른 DAG를 계속 기다리거나 여러번 실행을 시키는 트리거가 될 수 있어서 이를 조절하기가 어렵습니다.

    이러한 DAG 간 의존관계는 airflow UI에서 확인할 수 있는데 menuBrowseDAG Dependencies 로 확인할 수 있습니다.

    댓글