ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Task
    공부/데이터 2021. 10. 31. 19:11

    task는 airflow의 기본 실행 단위입니다. DAG 안에 task들을 나열한 다음, upstream, downstream으로 종속성을 정렬합니다.

    task는 3가지가 존재합니다.

    • Operator: DAG를 구축하는데 자주 사용하는 작업 템플릿 클래스입니다.
    • Sensor: 외부 이벤트가 발생할 때까지 대기하는 오퍼레이터의 하위 클래스입니다.
    • TaskFlow: @task 데코레이터를 사용하여 사용자가 작성한 함수 코드를 task로 인식하게 만드는 기능입니다.

    이 기능들은 전부 BaseOperator 클래스를 상속받아 구현되었으며 task와 operator는 기능이 유사하지만 별도로 구분하는 것이 좋습니다.

    operator와 sensor는 템플릿이고 task는 DAG 파일이 호출될 때 만들어 지는 개념입니다.

    표현식

    https://brownbears.tistory.com/584 의 task 의존성 표현 방법에 설명이 되어 있습니다. 일반적으로는 <<, >> 가 읽기 쉽기 때문에 사용을 권장하고 있습니다.

    task 끼리의 정보 전달을 하지 않는 것이 기본이지만 다른 task에게 정보를 전달하려면 XCom이라는 기능을 사용해야 합니다.

    task instance

    보통 DAG와 동일한 방법으로 DAG가 실행될 때 task instance를 만듭니다. task instance의 상태는 아래와 같습니다.

    • none: task가 대기열에 올라가지 않음 (종속성이 아직 충족되지 않음).
    • scheduled: 스케줄러가 task의 종속성이 충족되고 실행되어야 한다고 결정한 상태
    • queued: task가 Executor에 할당되었으며 실행되기를 기다리고 있는 상태
    • running: task가 실행 중
    • success: task 성공
    • failed: task를 실행하는 동안 오류가 발생하여 실패
    • skipped: 조건식, LatestOnly 등으로 인해 작업을 건너뜀
    • upstream_failed: upstream task가 실패했고 trigger rule이 필요한 상태
    • up_for_retry: task가 실패했지만 재시도 횟수가 남아 있어서 재스케쥴을 할 예정인 상태
    • up_for_reschedule: 해당 task가 reschedule 모드인 sensor라고 나타내는 상태
    • sensing: 해당 task가 Smart Sensor 라고 나타냄
    • removed: DAG가 실행된 이후에 task가 삭제됨

    이상적으로 task의 상태는 nonescheduledqueuedrunningsuccess 순으로 흘러가야 합니다.

    Timeout

    task에 execution_timeout 옵션을 추가하여 timeout을 걸 수 있습니다. 설정한 시간을 초과하여 실행될 경우, 실패로 처리합니다.

    SLA

    Service Level Agreement (SLA)는 작업에 소요되는 최대 시간에 대한 예상입니다. task가 설정한 sla 시간보다 오래 걸리는 경우, UI 상에 SLA Misses 으로 표시하고 SLA Misses 라고 표시된 모든 task들을 이메일로 전송할 수 있습니다. (callback을 전달하는 것이니 email 뿐만 아니라 다른 서비스로 알림을 줄 수 있음)

    Timeout과 다르게 SLA를 초과하여 실행한 task는 실패 처리 후 종료되지 않고 계속 실행됩니다.

    operator나 sensor에 sla 옵션을 주어 datetime.timedelta 타입으로 데이터를 전달하여 설정할 수 있습니다. 또한 sla_miss_callback 옵션에 callback 함수를 커스텀하여 전달할 수 있습니다.

    import datetime
    
    task1 = DummyOperator(task_id="task1", dag=dag, sla=datetime.timedelta(seconds=60 * 10))

    만약 모든 airflow 내에서 SLA 검사를 비활성화 하려면 airflow.cfg 파일 내 check_slas 옵션을 False로 주면 됩니다.

    특별한 예외처리

    task 상태를 custom하여 처리를 하고자 할 때, 아래 두 가지 방법이 있습니다.

    • AirflowSkipException: 현재 작업을 건너뛴 것으로 표시
    • AirflowFailException: 남은 재시도를 무시하고 현재 작업을 실패한 것으로 표시

    Zombie/Undead Tasks

    내외부 요인으로 task 인스턴스는 때때로 죽을 수도 있는데 airflow는 두 가지 종류로 task/process 간 불일치를 감지합니다.

    • Zombie task는 실행 중이였지만 갑자기 종료된 task를 말합니다. (e.g, 프로세스나 시스템이 종료됐을 때) airflow는 주기적으로 이러한 task를 찾아 정리하고 설정에 따라서 task를 재시도 하거나 실패처리합니다.
    • Undead task는 실행되지 않아야 하지만 UI를 통해 task 인스턴스를 수동으로 편집할 때 종종 발생합니다. 마찬가지로 airflow는 주기적으로 이를 찾고 종료처리합니다.

    Executor Configuration

    몇몇 executor는 task에서 따로 설정할 수 있습니다. 아래는 task에서 KubernetesExecutor 를 사용해 docker image를 설정하는 예시입니다.

    MyOperator(...,
        executor_config={
            "KubernetesExecutor":
                {"image": "myCustomDockerImage"}
        }
    )

    댓글