ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Airflow 2.3 ~ 3.1 신 기능 정리
    공부/데이터 2026. 4. 26. 15:39

    버전별 신기능 요약

    버전 출시일 핵심 기능 주요 키워드
    2.3 2022.04 Dynamic Task Mapping expand(), partial()
    2.4 2022.09 Datasets (Data-Aware Scheduling) Dataset, outlets, Consumer DAG
    2.5 2022.12 Task/DAG 노트, 테스트 개선 note, airflow dags test
    2.6 2023.04 Notifiers, ContinuousTimetable, EventsTimetable BaseNotifier, @continuous
    2.7 2023.08 Setup/Teardown, Params UI 폼 @setup, @teardown, Param
    2.8 2024.01 ObjectStorage, XCom Tab ObjectStoragePath
    2.9 2024.04 Dataset AND/OR 조건부 스케줄링 &,
    2.10 2024.08 Hybrid Executor, OpenTelemetry Multi-Executor, OTEL
    3.0 2025.04 Task SDK, DAG Versioning, Asset, Edge Executor airflow.sdk, AIP-63/69/74
    3.1 2025.09 Human-in-the-Loop, Deadline Alerts HITLOperator, DeadlineAlert

    Airflow 2.3 — Dynamic Task Mapping

    Dynamic Task Mapping

    런타임에 동적으로 태스크 수를 결정하는 기능입니다. 기존에는 DAG 작성 시점에 태스크 수가 고정되어야 했지만, expand()partial() 메서드로 업스트림 태스크 출력값에 따라 병렬 태스크 인스턴스를 자동 생성합니다.

    • 코드 예시 — 기본 사용법
    • from airflow.decorators import dag, task from datetime import datetime @dag(start_date=datetime(2023, 1, 1), schedule="@daily") def dynamic_mapping_example(): @task def get_files() -> list[str]: # 런타임에 처리할 파일 목록 결정 return ["file_a.csv", "file_b.csv", "file_c.csv"] @task def process_file(filename: str, batch_size: int): print(f"Processing {filename} with batch_size={batch_size}") files = get_files() # partial()로 고정값 지정, expand()로 동적 매핑 → 3개 태스크 병렬 실행 process_file.partial(batch_size=1000).expand(filename=files) dag = dynamic_mapping_example()
    • 코드 예시 — expand_kwargs로 복수 파라미터 매핑
    • @task def process(config: dict): print(f"env={config['env']}, size={config['size']}") configs = [ {"env": "prod", "size": 1000}, {"env": "dev", "size": 100}, ] process.expand_kwargs(configs) # 2개의 태스크 인스턴스 생성

    새 Grid View

    기존 Tree View를 대체하는 React 기반 Grid View가 도입되었습니다. DAG Run 실행 시간 표시, Task Group 일급 지원, 태스크 호버 시 의존성 라인 강조 등 운영 가시성이 크게 개선되었습니다.


    Airflow 2.4 — Datasets (Data-Aware Scheduling)

    Datasets / Data-Aware Scheduling

    시간 기반이 아닌 데이터 변경 기반 스케줄링을 가능하게 하는 기능입니다. Dataset 클래스로 논리적 데이터 단위를 정의하고, 특정 데이터셋이 업데이트될 때 의존하는 DAG가 자동으로 트리거됩니다.

    • 코드 예시 — Producer DAG
    • from airflow.decorators import dag, task from airflow import Dataset from datetime import datetime raw_data = Dataset("s3://my-bucket/raw/sales_data.csv") @dag(dag_id="producer_dag", start_date=datetime(2023, 1, 1), schedule="@daily") def producer(): @task(outlets=[raw_data]) # 완료 시 raw_data 업데이트됨을 선언 def extract_and_save(): print("데이터 추출 및 저장 완료") extract_and_save() dag = producer()
    • 코드 예시 — Consumer DAG (복수 데이터셋 AND 조건)
    • from airflow.decorators import dag, task from airflow import Dataset from datetime import datetime raw_data = Dataset("s3://my-bucket/raw/sales_data.csv") ref_data = Dataset("s3://my-bucket/ref/product_master.csv") @dag( dag_id="consumer_dag", start_date=datetime(2023, 1, 1), schedule=[raw_data, ref_data], # 둘 다 업데이트 시 자동 실행 (AND 조건) ) def consumer(): @task def process_data(): print("두 데이터셋 모두 업데이트 감지 → 처리 시작") process_data() dag = consumer()

    schedule 파라미터 통합

    기존 schedule_intervaltimetable 파라미터가 deprecated되고 schedule 단일 파라미터로 통합되었습니다. 하나의 파라미터로 시간 기반 스케줄과 데이터셋 기반 스케줄을 모두 표현합니다.

    DAG(dag_id="time_based",  schedule="@daily")              # 시간 기반
    DAG(dag_id="data_based",  schedule=[Dataset("s3://...")])  # 데이터셋 기반

    CronTriggerTimetable 도입

    데이터 인터벌 개념 없이 트리거 시각을 그대로 logical_date로 사용하는 직관적인 Timetable입니다. Airflow 3.0에서 기본 Timetable로 변경됩니다.

    from airflow.timetables.trigger import CronTriggerTimetable
    import pendulum
    
    dag = DAG(
        "trigger_style_dag",
        schedule=CronTriggerTimetable("0 9 * * *", timezone="Asia/Seoul"),
        start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
    )

    Airflow 2.5 — 안정성 및 DX 개선

    Task Instance / DAG Run 노트

    운영 팀이 태스크 인스턴스와 DAG 실행에 주석(노트)을 추가할 수 있게 되었습니다. 수동 재실행 이유, 이슈 대응 기록 등 운영 히스토리를 UI와 REST API에서 직접 관리합니다.

    # REST API: DAG Run에 노트 추가
    # PATCH /api/v1/dags/{dag_id}/dagRuns/{run_id}
    # { "note": "외부 API 장애로 인한 수동 재실행" }

    개선된 DAG 테스트

    태스크 로그가 콘솔에 직접 출력되며, 단일 프로세스 실행으로 IDE 브레이크포인트 디버깅이 가능합니다. 기존 대비 약 10배 빠른 실행 속도를 제공합니다.

    airflow dags test my_dag 2023-01-01

    Dynamic Task Mapping 개선

    매핑된 태스크의 출력을 다시 다른 태스크에 매핑하는 체인 매핑이 안정화되었습니다.

    ids   = get_ids()
    data  = fetch_data.expand(id=ids)
    process.expand(item=data)  # 매핑된 결과를 다시 매핑 (2.5에서 안정화)

    Listener API (DagRun 이벤트 추가)

    Airflow 2.3에서 실험적으로 도입된 Listener 플러그인이 2.5에서 DagRun 상태 변경 이벤트가 추가되며 완성되었습니다. pluggy 훅 시스템을 통해 전체 Airflow 태스크/DAG 생명주기 이벤트에 코드를 주입할 수 있습니다.

    기존 on_failure_callback은 각 태스크마다 개별 설정이 필요했지만, Listener는 플러그인 한 곳에서 모든 태스크/DAG 이벤트를 일괄 처리할 수 있습니다.

    • 코드 예시 — Listener 플러그인 구현
    • # plugins/my_listener_plugin.py from airflow.plugins_manager import AirflowPlugin from airflow.listeners import hookimpl from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun import logging log = logging.getLogger(__name__) class MyEventListener: """태스크/DAG 실행 이벤트를 외부 모니터링 시스템으로 전송""" # TaskInstance 이벤트 @hookimpl def on_task_instance_running(self, previous_state, task_instance, session): log.info("태스크 시작: dag=%s, task=%s", task_instance.dag_id, task_instance.task_id) @hookimpl def on_task_instance_success(self, previous_state, task_instance, session): duration = (task_instance.end_date - task_instance.start_date).total_seconds() log.info("태스크 성공: task=%s, 소요시간=%.2fs", task_instance.task_id, duration) @hookimpl def on_task_instance_failed(self, previous_state, task_instance, error, session): log.error("태스크 실패: dag=%s, task=%s, 오류=%s", task_instance.dag_id, task_instance.task_id, str(error)) # DagRun 이벤트 (2.5에서 추가) @hookimpl def on_dag_run_running(self, dag_run: DagRun, msg: str): log.info("DAG 실행 시작: dag=%s, run_id=%s", dag_run.dag_id, dag_run.run_id) @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str): duration = (dag_run.end_date - dag_run.start_date).total_seconds() log.info("DAG 완료: dag=%s, 소요시간=%.2fs", dag_run.dag_id, duration) @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str): log.error("DAG 실패: dag=%s, 메시지=%s", dag_run.dag_id, msg) class MyListenerPlugin(AirflowPlugin): name = "my_listener_plugin" listeners = [MyEventListener()] # 플러그인 1회 등록으로 전체 적용

    Airflow 2.6 — 알림 프레임워크 & 연속 실행

    알림 프레임워크 (BaseNotifier)

    BaseNotifier를 상속해 커스텀 알림 객체를 만들 수 있는 확장 가능한 알림 계층이 추가되었습니다. Slack은 즉시 사용 가능한 첫 번째 공식 Notifier로 제공됩니다.

    • 코드 예시 — SlackNotifier
    • from airflow.providers.slack.notifications.slack import SlackNotifier from airflow.decorators import dag, task from datetime import datetime SLACK_NOTIFIER = SlackNotifier( slack_conn_id="slack_api_default", text="DAG {{ dag.dag_id }} failed on {{ ds }}", channel="#airflow-alerts", ) @dag( start_date=datetime(2023, 1, 1), on_failure_callback=[SLACK_NOTIFIER], # Notifier 객체를 직접 콜백으로 전달 catchup=False, ) def my_pipeline(): @task def process(): return "done" process()

    ContinuousTimetable (@continuous)

    이전 DAG 실행이 완료되는 즉시 다음 실행이 시작되는 연속 실행 스케줄입니다. 불규칙한 외부 이벤트를 폴링할 때 적합합니다.

    @dag(
        schedule="@continuous",
        start_date=datetime(2023, 1, 1),
        max_active_runs=1,  # 반드시 1로 설정
        catchup=False,
    )
    def continuous_pipeline():
        @task
        def poll_api():
            return requests.get("https://api.example.com/events").json()
        poll_api()

    EventsTimetable

    미리 정의된 특정 날짜 목록에만 DAG를 실행합니다. 스포츠 경기, 분기 결산 등 불규칙하지만 예측 가능한 이벤트 스케줄에 적합합니다.

    from airflow.timetables.events import EventsTimetable
    import pendulum
    
    quarterly_events = EventsTimetable(
        event_dates=[
            pendulum.datetime(2024, 3, 31, tz="UTC"),
            pendulum.datetime(2024, 6, 30, tz="UTC"),
            pendulum.datetime(2024, 9, 30, tz="UTC"),
            pendulum.datetime(2024, 12, 31, tz="UTC"),
        ],
        presorted=True,
        description="분기 마감일에 실행",
    )
    
    dag = DAG("quarterly_report", schedule=quarterly_events, ...)

    Airflow 2.7 — Setup/Teardown & Params UI

    Setup/Teardown Tasks (AIP-52)

    리소스 생성 → 작업 수행 → 리소스 삭제 패턴을 일급 객체로 지원합니다. Teardown 태스크는 업스트림 태스크 상태와 무관하게 항상 실행됩니다.

    • 코드 예시 — ML 훈련 파이프라인
    • from airflow.decorators import dag, task, setup, teardown from datetime import datetime @dag(start_date=datetime(2023, 1, 1), catchup=False) def ml_training_pipeline(): @setup def create_gpu_cluster(): cluster_id = "gpu-cluster-001" print(f"클러스터 생성: {cluster_id}") return cluster_id @teardown def delete_gpu_cluster(cluster_id: str): print(f"클러스터 삭제: {cluster_id}") # 항상 실행됨 @task def train_model(cluster_id: str): print(f"{cluster_id}에서 모델 훈련 중...") return "model_v1" @task def evaluate_model(model_path: str): print(f"모델 평가: {model_path}") cluster = create_gpu_cluster() model = train_model(cluster) evaluate_model(model) delete_gpu_cluster(cluster).as_teardown(setups=create_gpu_cluster) ml_training_pipeline()

    Params UI 폼 (AIP-50)

    DAG 파라미터를 UI 폼으로 렌더링하는 기능이 강화되었습니다. Param 객체로 드롭다운, 날짜 피커, 체크박스, 멀티라인 텍스트 등 다양한 입력 필드를 자동 생성합니다.

    from airflow.models.param import Param
    
    @dag(
        params={
            "environment": Param("production", type="string",
                                 enum=["development", "staging", "production"]),
            "run_date":    Param("2024-01-01", type="string", format="date"),
            "debug_notes": Param("", type="string", format="multiline"),
            "max_rows":    Param(1000, type="integer", minimum=1, maximum=100000),
        },
        ...
    )
    def parameterized_etl(): ...

    default_deferrable 설정

    설정 한 줄로 모든 지원 연산자를 Deferrable 모드로 전환합니다. Worker 스레드 점유 없이 대기하므로 대규모 클러스터에서 리소스를 절감합니다.

    [operators]
    default_deferrable = True

    Airflow 2.8 — ObjectStorage & 로깅 개선

    ObjectStorage (AIP-58)

    S3, GCS, Azure Blob Storage, 로컬 파일시스템을 동일한 pathlib.Path 인터페이스로 다루는 추상화 계층입니다.

    • 코드 예시
    • from airflow.io.path import ObjectStoragePath from airflow.decorators import dag, task from datetime import datetime BASE_S3 = ObjectStoragePath("s3://my-bucket/data/", conn_id="aws_default") BASE_GCS = ObjectStoragePath("gs://my-gcs-bucket/data/", conn_id="gcp_default") @dag(start_date=datetime(2024, 1, 1), catchup=False) def object_storage_pipeline(): @task def upload_to_s3(): path = BASE_S3 / "input" / "{{ ds }}" / "data.parquet" path.parent.mkdir(parents=True, exist_ok=True) with path.open("wb") as f: f.write(b"parquet data") return str(path) @task def copy_to_gcs(s3_path: str): src = ObjectStoragePath(s3_path, conn_id="aws_default") dst = BASE_GCS / "processed" / "data.parquet" src.copy(dst) # 클라우드 간 복사도 동일 코드 copy_to_gcs(upload_to_s3())

    TaskContextLogger

    Scheduler, Executor 등 다른 Airflow 컴포넌트의 로그가 태스크 로그 뷰에 통합되어 표시됩니다. KubernetesExecutor에서 Pod 생성 실패 메시지가 태스크 로그에 직접 포함되는 등 디버깅이 크게 개선됩니다.

    Dataset Listener Hook

    데이터셋 생성 및 변경 이벤트를 구독하는 플러그인 훅입니다. 데이터 카탈로그 연동, 품질 검사 트리거, 외부 시스템 알림 등에 활용합니다.

    XCom Tab in Grid View

    Grid View에 XCom 탭이 추가되어 태스크 인스턴스의 XCom 키-값을 별도 화면 이동 없이 직접 확인할 수 있습니다.


    Airflow 2.9 — Dataset 조건부 스케줄링

    Dataset AND/OR 조건부 스케줄링

    2.4에서 AND 조건만 지원하던 Dataset 스케줄링이 OR 및 AND/OR 혼합 조건을 지원합니다. Python 연산자(&, |)로 직관적인 표현이 가능합니다.

    from airflow import Dataset
    
    customers  = Dataset("s3://data-lake/customers/")
    orders     = Dataset("s3://data-lake/orders/")
    products   = Dataset("s3://data-lake/products/")
    inventory  = Dataset("s3://data-lake/inventory/")
    
    # OR 조건: 하나만 업데이트되어도 실행
    @dag(schedule=(customers | orders), ...)
    def or_scheduled(): ...
    
    # AND/OR 복합 조건
    @dag(schedule=((customers & orders) | (products & inventory)), ...)
    def complex_scheduled(): ...

    DatasetOrTimeSchedule

    Dataset 이벤트 기반 스케줄과 cron 기반 스케줄을 결합하는 Timetable입니다. 데이터가 오면 즉시 실행하고, 오지 않으면 정해진 시간에 실행하는 SLA 보장 패턴을 구현합니다.

    from airflow.timetables.datasets import DatasetOrTimeSchedule
    from airflow.timetables.trigger import CronTriggerTimetable
    
    source = Dataset("s3://data-lake/raw/events/")
    
    @dag(
        schedule=DatasetOrTimeSchedule(
            timetable=CronTriggerTimetable("0 6 * * *", timezone="Asia/Seoul"),
            datasets=[source],
        ),
        ...
    )
    def hybrid_scheduled(): ...

    Airflow 2.10 — Hybrid Executor & Observability

    Hybrid Executor (멀티 Executor, AIP-61)

    단일 Airflow 환경에서 여러 Executor를 동시에 사용합니다. DAG 또는 태스크 레벨에서 실행할 Executor를 지정합니다.

    # airflow.cfg
    [core]
    executor = LocalExecutor,KubernetesExecutor
    @task(executor="LocalExecutor")
    def lightweight_task():
        return "quick work"
    
    @task(executor="KubernetesExecutor")
    def heavy_ml_task():
        return "ML training"

    OpenTelemetry 시스템 트레이싱

    Scheduler, Triggerer, Executor, Processor 컴포넌트가 OpenTelemetry 형식의 trace를 내보냅니다. Jaeger, Grafana Tempo 등 OTLP 호환 엔드포인트로 전송해 DAG 실행 병목 구간을 시각적으로 파악합니다.

    [traces]
    otel_on   = True
    otel_host = localhost
    otel_port = 4317

    Triggerer Direct Execution

    Deferrable Operator가 resume 시 Worker로 돌아가지 않고 Triggerer에서 직접 태스크를 완료합니다. Worker 슬롯을 사용하지 않아 대기 중 리소스가 절약됩니다.

    DatasetAlias — 동적 데이터셋 (AIP-74)

    런타임에 실제 Dataset URI를 동적으로 결정하는 기능입니다. 태스크가 어떤 파일을 생성할지 실행 전에 알 수 없는 경우(날짜별 동적 파일명 등)에 유용합니다.

    • 코드 예시 — DatasetAlias 기본 사용
    • from airflow.datasets import DatasetAlias, Dataset from airflow.decorators import dag, task from datetime import datetime # --- Producer DAG --- @dag(dag_id="alias_producer", schedule="@daily", start_date=datetime(2024, 1, 1)) def alias_producer_dag(): @task(outlets=[DatasetAlias("daily-report-output")]) def generate_report(*, outlet_events, ds): # 런타임에 실제 URI 결정 (날짜별 동적 경로) actual_uri = f"s3://reports/daily/{ds}/report.parquet" outlet_events[DatasetAlias("daily-report-output")].add( Dataset(actual_uri), extra={"row_count": 50000, "generated_at": ds}, ) generate_report() alias_producer_dag() # --- Consumer DAG (Alias를 구독) --- @dag( dag_id="alias_consumer", schedule=[DatasetAlias("daily-report-output")], # 어떤 URI든 트리거 start_date=datetime(2024, 1, 1), ) def alias_consumer_dag(): @task(inlets=[DatasetAlias("daily-report-output")]) def process_report(*, inlet_events): events = inlet_events[DatasetAlias("daily-report-output")] print(f"마지막 이벤트: {events[-1].extra}") process_report() alias_consumer_dag()

    Task Instance 실행 이력 추적

    태스크가 retry되거나 clear될 때 이전 실행 기록이 보존됩니다. Grid View에서 모든 시도(attempt)의 이력을 조회할 수 있어 간헐적 실패 패턴 분석이 가능합니다.


    Airflow 3.0 — 역대 최대 메이저 릴리즈

    Task SDK 분리 (apache-airflow-task-sdk)

    DAG 작성에 필요한 인터페이스가 독립 패키지 apache-airflow-task-sdk로 분리되었습니다. 모든 DAG은 이제 airflow.sdk에서 임포트해야 합니다.

    # Airflow 2.x (deprecated)
    from airflow import DAG
    from airflow.decorators import task
    
    # Airflow 3.0+ (권장)
    from airflow.sdk import DAG, task, dag, task_group

    DAG Versioning (AIP-63)

    DAG 구조 변경(태스크 추가/삭제, 의존성 변경 등)이 메타데이터 DB에 자동으로 추적됩니다.

    • 구조 변경 시마다 새 버전이 자동 생성됩니다
    • 각 DAG Run은 실행 당시의 버전에 고정됩니다
    • 실행 중인 DAG Run은 새 버전의 영향을 받지 않아 일관성이 보장됩니다

    시너지: 과거 특정 버전의 DAG 로직으로 Backfill 실행이 가능합니다.

    Asset (Dataset → Asset 이름 변경 및 개선, AIP-74)

    DatasetAsset으로 이름이 변경되었으며 AssetWatcher를 통한 이벤트 드리븐 스케줄링이 도입되었습니다.

    from airflow.sdk import Asset
    from airflow.timetables.assets import AssetOrTimeSchedule
    
    asset_a = Asset("s3://bucket/dataset-a")
    asset_b = Asset("s3://bucket/dataset-b")
    
    @dag(schedule=(asset_a & asset_b), ...)  # 둘 다 업데이트 시 실행
    def pipeline_and(): ...
    
    @dag(schedule=(asset_a | asset_b), ...)  # 하나만 업데이트 시 실행
    def pipeline_or(): ...
    
    # Asset + 시간 복합 스케줄
    @dag(schedule=AssetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"),
        assets=(asset_a,)
    ), ...)
    def pipeline_hybrid(): ...

    Scheduler 관리 Backfill API (AIP-78)

    Backfill이 이제 Scheduler가 직접 관리하며 REST API와 UI를 통해 제어합니다.

    # CLI
    airflow backfill create \
      --dag-id data_pipeline \
      --start-date 2025-01-01 \
      --end-date 2025-03-31 \
      --max-active-runs 5
    
    # REST API
    curl -X POST "http://airflow:8080/api/v2/backfills" \
      -d '{"dag_id":"data_pipeline","from_date":"2025-01-01T00:00:00Z","to_date":"2025-03-31T00:00:00Z"}'

    Edge Executor (AIP-69)

    태스크를 중앙 Airflow 클러스터 외부의 분산/원격 환경에서 실행하는 Executor입니다. IoT/엣지 컴퓨팅 환경, 네트워크 격리된 내부망에서도 태스크 실행이 가능합니다.

    @task(executor="EdgeExecutor", queue="gpu_cluster")
    def train_model():
        # 원격 GPU 서버에서 실행
        pass

    React + FastAPI UI (AIP-84)

    Flask AppBuilder 기반의 기존 UI가 완전히 제거되고 React + FastAPI로 재구축되었습니다. REST API도 /api/v1/api/v2로 교체되었습니다.

    2.x → 3.0 주요 Breaking Changes

    이전 버전에서 정상 작동하던 코드가 3.0에서 동작하지 않을 수 있습니다.

    1. execution_date 컨텍스트 키 제거

    # BEFORE (Airflow 2.x)
    def my_task(**context):
        execution_date = context['execution_date']  # 타당 datetime 객체
    
    # AFTER (Airflow 3.x) — execution_date 컨텍스트 키 제거
    def my_task(**context):
        logical_date = context['logical_date']       # 의미 변경: run_after 기준
        data_interval_start = context['data_interval_start']
        data_interval_end   = context['data_interval_end']

    주의: 2.x에서 logical_date == data_interval_start였지만, 3.x에서는 logical_date == run_after(실행 예약 시점)로 의미가 변경됩니다.

    2. Cron 기본 Timetable 변경

    Airflow 2.x에서는 CronDataIntervalTimetable이 기본값이었지만, 3.0에서는 CronTriggerTimetable이 기본값이 되었습니다. 동일한 cron 표현식이라도 DAG 실행 시점과 data_interval 범위가 달라질 수 있습니다.

    # 2.x 동작을 유지하려면 명시적으로 지정
    from airflow.timetables.interval import CronDataIntervalTimetable
    
    @dag(timetable=CronDataIntervalTimetable("0 0 * * *", timezone="UTC"))
    def my_dag():
        pass

    3. 주요 제거/변경 항목

    항목 변경 내용
    SubDagOperator 완전 제거TaskGroup 사용
    SLA Miss 제거 → Deadline Alerts(3.1)로 대체
    XCom Pickle 보안상 기본 비활성화 → JSON 직렬화 사용
    airflow.models.DAG import airflow.sdk.DAG 권장
    FAB Auth Manager 코어에서 분리 → pip install apache-airflow-providers-fab
    Worker DB 직접 접근 완전 제거 → Execution API만 사용
    REST API 경로 /api/v1/api/v2
    # 마이그레이션 체크 도구
    pip install ruff
    ruff check --select AIR301,AIR302,AIR311,AIR312 ./dags/
    # AIR301/AIR302: Breaking change (import 경로, 제거된 기능)
    # AIR311/AIR312: 비-breaking 권고로 수정 사항

    Airflow 3.1 — Human-Centered Workflows

    Human-in-the-Loop (AIP-90)

    DAG 실행 중 인간의 승인/결정/입력을 받아 워크플로우를 계속 진행하는 기능입니다. 4가지 Operator (HITLOperator, ApprovalOperator, HITLBranchOperator, HITLEntryOperator) 모두 Deferrable로 구현되어 Worker 슬롯을 점유하지 않습니다.

    • 코드 예시 — 보고서 배포 승인 워크플로우
    • from airflow.providers.standard.operators.hitl import HITLOperator from airflow.sdk import dag, task from datetime import datetime @dag(dag_id="content_approval", schedule="@daily", start_date=datetime(2025, 9, 1)) def content_pipeline(): @task def generate_report() -> dict: return {"content": "월간 매출 보고서", "amount": 1_500_000} # 인간 승인 대기 (Worker 슬롯 미점유) approval = HITLOperator( task_id="manager_approval", subject="보고서 배포 승인 요청", message="아래 보고서를 외부에 배포해도 됩니까?", params={ "decision": { "type": "select", "options": ["승인", "반려", "수정 후 재검토"], "default": "승인", }, "comment": {"type": "text", "label": "의견 (선택사항)"}, }, timeout=86400, # 24시간 타임아웃 ) @task def publish(approval_result: dict): if approval_result["decision"] == "승인": print("보고서 배포 완료") else: print(f"반려: {approval_result.get('comment')}") report = generate_report() publish(approval) content_pipeline()

    Deadline Alerts (AIP-86)

    DAG Run이 지정된 시간을 초과하면 자동으로 알림을 보내는 기능입니다. 기존 SLA를 대체하며 더 유연한 기준점을 지원합니다.

    from airflow.deadline import DeadlineAlert, AsyncCallback
    from datetime import timedelta
    
    @dag(
        deadline_alerts=[
            DeadlineAlert(
                reference="logical_date",   # logical_date / queued_at / fixed_datetime
                interval=timedelta(hours=2),  # 2시간 이내 완료 안 되면 알림
                callback=AsyncCallback(my_alert_callback),
            )
        ],
        ...
    )
    def critical_etl(): ...

    17개 언어 i18n 지원

    Airflow UI가 한국어를 포함한 17개 언어를 지원합니다. 브라우저 설정을 자동 감지하며, 아랍어·히브리어의 RTL 레이아웃도 지원합니다.


    주요 기능 심층 분석

    Dataset / Asset

    Dataset(2.4)은 시간 기반 스케줄링의 한계를 극복하는 데이터 의존성 기반 스케줄링 패러다임입니다.

    핵심 패턴: Producer 태스크에 outlets=[my_asset]을 선언하면 해당 태스크 완료 시 Asset이 업데이트됩니다. Consumer DAG는 schedule=my_asset으로 Asset 업데이트를 구독합니다.

    버전 주요 변경사항
    2.4 Dataset 도입, Producer/Consumer 패턴, AND 조건만 지원
    2.8 Dataset Listener Hook 추가 (이벤트 구독)
    2.9 AND/OR 조건부 스케줄링, DatasetOrTimeSchedule 도입
    3.0 Dataset → Asset 이름 변경, AssetWatcher 추가 (이벤트 드리븐)
    3.1 Asset Partitioning 기초 도입

    Dynamic Task Mapping

    2.3에서 도입된 Dynamic Task Mapping은 이후 버전에서 지속적으로 개선되었습니다.

    • 2.3: expand(), partial() 도입. 기본 사용법 지원
    • 2.5: 매핑된 태스크의 출력을 다시 매핑하는 체인 매핑 안정화
    • 2.6+: expand_kwargs() 지원, 다양한 엣지 케이스 수정

    max_map_length 기본값은 1024이며 설정에서 조정할 수 있습니다. 결과값은 lazy sequence로 처리되어 메모리 효율적입니다.

    Timetable

    Timetable은 Airflow 2.2(AIP-39)에서 도입된 스케줄링 추상화 계층으로, cron 표현식의 한계를 Python 코드로 극복합니다.

    클래스 도입 버전 용도
    CronDataIntervalTimetable 2.2 데이터 인터벌 기반 cron (레거시, DAG 활성화 시 이전 구간 run 생성)
    CronTriggerTimetable 2.4 트리거 방식 cron (3.0 기본값), data_interval_start == data_interval_end
    EventsTimetable 2.4 특정 날짜 목록에만 실행
    ContinuousTimetable 2.6 이전 run 완료 즉시 다음 실행, max_active_runs=1 필수
    DeltaTriggerTimetable 2.11 트리거 방식 timedelta (3.0에서 timedelta 기본값)
    MultipleCronTriggerTimetable 3.0 복수 cron 표현식 (하루에 여러 다른 시각)
    AssetOrTimeSchedule 3.0 시간+Asset 복합 스케줄

    커스텀 Timetable 구현 시 Timetable 클래스를 상속하고 next_dagrun_info(), serialize(), deserialize()를 구현한 후 AirflowPlugin으로 등록합니다. 영업일 스케줄, 음력 기반 스케줄 등 임의의 복잡한 스케줄을 표현할 수 있습니다.

    Timezone

    Airflow는 내부적으로 모든 datetime을 UTC로 저장하며, timezone 처리에 pendulum 라이브러리를 표준으로 채택합니다.

    import pendulum
    from airflow.timetables.trigger import CronTriggerTimetable
    
    with DAG(
        "kst_dag",
        schedule=CronTriggerTimetable("0 9 * * 1-5", timezone="Asia/Seoul"),
        start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"),
        catchup=False,
    ) as dag:
        pass

    주요 주의사항:

    • UTC 사용 권장: 가능하면 UTC로 작성하는 것이 가장 안전합니다
    • Naive datetime 금지: datetime(2024, 1, 1) 대신 pendulum.datetime(2024, 1, 1, tz="UTC") 사용
    • DST 자동 처리: pendulum이 일광절약시간 전환을 자동으로 관리합니다
    • 템플릿 변수는 UTC 기준이며 자동 변환되지 않습니다
    • Airflow 3.0에서 logical_date의 의미가 변경되었습니다 (data_interval_startrun_after 기준)

    기능 시너지

    기능 조합 시너지 효과
    Dynamic Task Mapping + Dataset/Asset 동적으로 생성된 각 태스크가 개별 Asset을 업데이트해 하위 DAG를 트리거합니다
    Setup/Teardown + Notifiers 클러스터 삭제 실패 시 즉각 알림을 발송해 비용 누수를 방지합니다
    ContinuousTimetable + Deferrable Operator Worker 슬롯 점유 없이 이벤트 대기형 파이프라인을 구현합니다
    Dataset OR + DatasetOrTimeSchedule 데이터가 오면 즉시 실행, 오지 않으면 정해진 시간에 실행하는 SLA 보장 패턴을 구현합니다
    Task SDK + Edge Executor 원격 환경에 Task SDK만 설치하면 됩니다. 컨테이너 이미지를 최소화합니다
    HITL + Deadline Alerts 승인 기한 초과 시 자동 알림으로 비즈니스 SLA를 강제 적용합니다
    DAG Versioning + Backfill API 과거 특정 버전의 DAG 로직으로 안전하게 재처리할 수 있습니다
    Hybrid Executor + OpenTelemetry 어느 Executor에서 지연이 발생하는지 trace로 즉시 식별합니다
    Params UI + Setup/Teardown 트리거 시 클러스터 크기, 리전 등을 UI 폼에서 동적 입력합니다

    결론

    댓글