-
Airflow 2.3 ~ 3.1 신 기능 정리공부/데이터 2026. 4. 26. 15:39
버전별 신기능 요약
버전 출시일 핵심 기능 주요 키워드 2.3 2022.04 Dynamic Task Mapping expand(), partial() 2.4 2022.09 Datasets (Data-Aware Scheduling) Dataset, outlets, Consumer DAG 2.5 2022.12 Task/DAG 노트, 테스트 개선 note, airflow dags test 2.6 2023.04 Notifiers, ContinuousTimetable, EventsTimetable BaseNotifier, @continuous 2.7 2023.08 Setup/Teardown, Params UI 폼 @setup, @teardown, Param 2.8 2024.01 ObjectStorage, XCom Tab ObjectStoragePath 2.9 2024.04 Dataset AND/OR 조건부 스케줄링 &, 2.10 2024.08 Hybrid Executor, OpenTelemetry Multi-Executor, OTEL 3.0 2025.04 Task SDK, DAG Versioning, Asset, Edge Executor airflow.sdk, AIP-63/69/74 3.1 2025.09 Human-in-the-Loop, Deadline Alerts HITLOperator, DeadlineAlert
Airflow 2.3 — Dynamic Task Mapping
Dynamic Task Mapping
런타임에 동적으로 태스크 수를 결정하는 기능입니다. 기존에는 DAG 작성 시점에 태스크 수가 고정되어야 했지만,
expand()와partial()메서드로 업스트림 태스크 출력값에 따라 병렬 태스크 인스턴스를 자동 생성합니다.- 코드 예시 — 기본 사용법
from airflow.decorators import dag, task from datetime import datetime @dag(start_date=datetime(2023, 1, 1), schedule="@daily") def dynamic_mapping_example(): @task def get_files() -> list[str]: # 런타임에 처리할 파일 목록 결정 return ["file_a.csv", "file_b.csv", "file_c.csv"] @task def process_file(filename: str, batch_size: int): print(f"Processing {filename} with batch_size={batch_size}") files = get_files() # partial()로 고정값 지정, expand()로 동적 매핑 → 3개 태스크 병렬 실행 process_file.partial(batch_size=1000).expand(filename=files) dag = dynamic_mapping_example()- 코드 예시 — expand_kwargs로 복수 파라미터 매핑
@task def process(config: dict): print(f"env={config['env']}, size={config['size']}") configs = [ {"env": "prod", "size": 1000}, {"env": "dev", "size": 100}, ] process.expand_kwargs(configs) # 2개의 태스크 인스턴스 생성
새 Grid View
기존 Tree View를 대체하는 React 기반 Grid View가 도입되었습니다. DAG Run 실행 시간 표시, Task Group 일급 지원, 태스크 호버 시 의존성 라인 강조 등 운영 가시성이 크게 개선되었습니다.
Airflow 2.4 — Datasets (Data-Aware Scheduling)
Datasets / Data-Aware Scheduling
시간 기반이 아닌 데이터 변경 기반 스케줄링을 가능하게 하는 기능입니다.
Dataset클래스로 논리적 데이터 단위를 정의하고, 특정 데이터셋이 업데이트될 때 의존하는 DAG가 자동으로 트리거됩니다.- 코드 예시 — Producer DAG
from airflow.decorators import dag, task from airflow import Dataset from datetime import datetime raw_data = Dataset("s3://my-bucket/raw/sales_data.csv") @dag(dag_id="producer_dag", start_date=datetime(2023, 1, 1), schedule="@daily") def producer(): @task(outlets=[raw_data]) # 완료 시 raw_data 업데이트됨을 선언 def extract_and_save(): print("데이터 추출 및 저장 완료") extract_and_save() dag = producer()- 코드 예시 — Consumer DAG (복수 데이터셋 AND 조건)
from airflow.decorators import dag, task from airflow import Dataset from datetime import datetime raw_data = Dataset("s3://my-bucket/raw/sales_data.csv") ref_data = Dataset("s3://my-bucket/ref/product_master.csv") @dag( dag_id="consumer_dag", start_date=datetime(2023, 1, 1), schedule=[raw_data, ref_data], # 둘 다 업데이트 시 자동 실행 (AND 조건) ) def consumer(): @task def process_data(): print("두 데이터셋 모두 업데이트 감지 → 처리 시작") process_data() dag = consumer()
schedule 파라미터 통합
기존
schedule_interval과timetable파라미터가 deprecated되고schedule단일 파라미터로 통합되었습니다. 하나의 파라미터로 시간 기반 스케줄과 데이터셋 기반 스케줄을 모두 표현합니다.DAG(dag_id="time_based", schedule="@daily") # 시간 기반 DAG(dag_id="data_based", schedule=[Dataset("s3://...")]) # 데이터셋 기반CronTriggerTimetable 도입
데이터 인터벌 개념 없이 트리거 시각을 그대로
logical_date로 사용하는 직관적인 Timetable입니다. Airflow 3.0에서 기본 Timetable로 변경됩니다.from airflow.timetables.trigger import CronTriggerTimetable import pendulum dag = DAG( "trigger_style_dag", schedule=CronTriggerTimetable("0 9 * * *", timezone="Asia/Seoul"), start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"), )
Airflow 2.5 — 안정성 및 DX 개선
Task Instance / DAG Run 노트
운영 팀이 태스크 인스턴스와 DAG 실행에 주석(노트)을 추가할 수 있게 되었습니다. 수동 재실행 이유, 이슈 대응 기록 등 운영 히스토리를 UI와 REST API에서 직접 관리합니다.
# REST API: DAG Run에 노트 추가 # PATCH /api/v1/dags/{dag_id}/dagRuns/{run_id} # { "note": "외부 API 장애로 인한 수동 재실행" }개선된 DAG 테스트
태스크 로그가 콘솔에 직접 출력되며, 단일 프로세스 실행으로 IDE 브레이크포인트 디버깅이 가능합니다. 기존 대비 약 10배 빠른 실행 속도를 제공합니다.
airflow dags test my_dag 2023-01-01Dynamic Task Mapping 개선
매핑된 태스크의 출력을 다시 다른 태스크에 매핑하는 체인 매핑이 안정화되었습니다.
ids = get_ids() data = fetch_data.expand(id=ids) process.expand(item=data) # 매핑된 결과를 다시 매핑 (2.5에서 안정화)Listener API (DagRun 이벤트 추가)
Airflow 2.3에서 실험적으로 도입된 Listener 플러그인이 2.5에서 DagRun 상태 변경 이벤트가 추가되며 완성되었습니다.
pluggy훅 시스템을 통해 전체 Airflow 태스크/DAG 생명주기 이벤트에 코드를 주입할 수 있습니다.기존
on_failure_callback은 각 태스크마다 개별 설정이 필요했지만, Listener는 플러그인 한 곳에서 모든 태스크/DAG 이벤트를 일괄 처리할 수 있습니다.- 코드 예시 — Listener 플러그인 구현
# plugins/my_listener_plugin.py from airflow.plugins_manager import AirflowPlugin from airflow.listeners import hookimpl from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun import logging log = logging.getLogger(__name__) class MyEventListener: """태스크/DAG 실행 이벤트를 외부 모니터링 시스템으로 전송""" # TaskInstance 이벤트 @hookimpl def on_task_instance_running(self, previous_state, task_instance, session): log.info("태스크 시작: dag=%s, task=%s", task_instance.dag_id, task_instance.task_id) @hookimpl def on_task_instance_success(self, previous_state, task_instance, session): duration = (task_instance.end_date - task_instance.start_date).total_seconds() log.info("태스크 성공: task=%s, 소요시간=%.2fs", task_instance.task_id, duration) @hookimpl def on_task_instance_failed(self, previous_state, task_instance, error, session): log.error("태스크 실패: dag=%s, task=%s, 오류=%s", task_instance.dag_id, task_instance.task_id, str(error)) # DagRun 이벤트 (2.5에서 추가) @hookimpl def on_dag_run_running(self, dag_run: DagRun, msg: str): log.info("DAG 실행 시작: dag=%s, run_id=%s", dag_run.dag_id, dag_run.run_id) @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str): duration = (dag_run.end_date - dag_run.start_date).total_seconds() log.info("DAG 완료: dag=%s, 소요시간=%.2fs", dag_run.dag_id, duration) @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str): log.error("DAG 실패: dag=%s, 메시지=%s", dag_run.dag_id, msg) class MyListenerPlugin(AirflowPlugin): name = "my_listener_plugin" listeners = [MyEventListener()] # 플러그인 1회 등록으로 전체 적용
Airflow 2.6 — 알림 프레임워크 & 연속 실행
알림 프레임워크 (BaseNotifier)
BaseNotifier를 상속해 커스텀 알림 객체를 만들 수 있는 확장 가능한 알림 계층이 추가되었습니다. Slack은 즉시 사용 가능한 첫 번째 공식 Notifier로 제공됩니다.- 코드 예시 — SlackNotifier
from airflow.providers.slack.notifications.slack import SlackNotifier from airflow.decorators import dag, task from datetime import datetime SLACK_NOTIFIER = SlackNotifier( slack_conn_id="slack_api_default", text="DAG {{ dag.dag_id }} failed on {{ ds }}", channel="#airflow-alerts", ) @dag( start_date=datetime(2023, 1, 1), on_failure_callback=[SLACK_NOTIFIER], # Notifier 객체를 직접 콜백으로 전달 catchup=False, ) def my_pipeline(): @task def process(): return "done" process()
ContinuousTimetable (@continuous)
이전 DAG 실행이 완료되는 즉시 다음 실행이 시작되는 연속 실행 스케줄입니다. 불규칙한 외부 이벤트를 폴링할 때 적합합니다.
@dag( schedule="@continuous", start_date=datetime(2023, 1, 1), max_active_runs=1, # 반드시 1로 설정 catchup=False, ) def continuous_pipeline(): @task def poll_api(): return requests.get("https://api.example.com/events").json() poll_api()EventsTimetable
미리 정의된 특정 날짜 목록에만 DAG를 실행합니다. 스포츠 경기, 분기 결산 등 불규칙하지만 예측 가능한 이벤트 스케줄에 적합합니다.
from airflow.timetables.events import EventsTimetable import pendulum quarterly_events = EventsTimetable( event_dates=[ pendulum.datetime(2024, 3, 31, tz="UTC"), pendulum.datetime(2024, 6, 30, tz="UTC"), pendulum.datetime(2024, 9, 30, tz="UTC"), pendulum.datetime(2024, 12, 31, tz="UTC"), ], presorted=True, description="분기 마감일에 실행", ) dag = DAG("quarterly_report", schedule=quarterly_events, ...)
Airflow 2.7 — Setup/Teardown & Params UI
Setup/Teardown Tasks (AIP-52)
리소스 생성 → 작업 수행 → 리소스 삭제 패턴을 일급 객체로 지원합니다. Teardown 태스크는 업스트림 태스크 상태와 무관하게 항상 실행됩니다.
- 코드 예시 — ML 훈련 파이프라인
from airflow.decorators import dag, task, setup, teardown from datetime import datetime @dag(start_date=datetime(2023, 1, 1), catchup=False) def ml_training_pipeline(): @setup def create_gpu_cluster(): cluster_id = "gpu-cluster-001" print(f"클러스터 생성: {cluster_id}") return cluster_id @teardown def delete_gpu_cluster(cluster_id: str): print(f"클러스터 삭제: {cluster_id}") # 항상 실행됨 @task def train_model(cluster_id: str): print(f"{cluster_id}에서 모델 훈련 중...") return "model_v1" @task def evaluate_model(model_path: str): print(f"모델 평가: {model_path}") cluster = create_gpu_cluster() model = train_model(cluster) evaluate_model(model) delete_gpu_cluster(cluster).as_teardown(setups=create_gpu_cluster) ml_training_pipeline()
Params UI 폼 (AIP-50)
DAG 파라미터를 UI 폼으로 렌더링하는 기능이 강화되었습니다.
Param객체로 드롭다운, 날짜 피커, 체크박스, 멀티라인 텍스트 등 다양한 입력 필드를 자동 생성합니다.from airflow.models.param import Param @dag( params={ "environment": Param("production", type="string", enum=["development", "staging", "production"]), "run_date": Param("2024-01-01", type="string", format="date"), "debug_notes": Param("", type="string", format="multiline"), "max_rows": Param(1000, type="integer", minimum=1, maximum=100000), }, ... ) def parameterized_etl(): ...default_deferrable 설정
설정 한 줄로 모든 지원 연산자를 Deferrable 모드로 전환합니다. Worker 스레드 점유 없이 대기하므로 대규모 클러스터에서 리소스를 절감합니다.
[operators] default_deferrable = True
Airflow 2.8 — ObjectStorage & 로깅 개선
ObjectStorage (AIP-58)
S3, GCS, Azure Blob Storage, 로컬 파일시스템을 동일한
pathlib.Path인터페이스로 다루는 추상화 계층입니다.- 코드 예시
from airflow.io.path import ObjectStoragePath from airflow.decorators import dag, task from datetime import datetime BASE_S3 = ObjectStoragePath("s3://my-bucket/data/", conn_id="aws_default") BASE_GCS = ObjectStoragePath("gs://my-gcs-bucket/data/", conn_id="gcp_default") @dag(start_date=datetime(2024, 1, 1), catchup=False) def object_storage_pipeline(): @task def upload_to_s3(): path = BASE_S3 / "input" / "{{ ds }}" / "data.parquet" path.parent.mkdir(parents=True, exist_ok=True) with path.open("wb") as f: f.write(b"parquet data") return str(path) @task def copy_to_gcs(s3_path: str): src = ObjectStoragePath(s3_path, conn_id="aws_default") dst = BASE_GCS / "processed" / "data.parquet" src.copy(dst) # 클라우드 간 복사도 동일 코드 copy_to_gcs(upload_to_s3())
TaskContextLogger
Scheduler, Executor 등 다른 Airflow 컴포넌트의 로그가 태스크 로그 뷰에 통합되어 표시됩니다. KubernetesExecutor에서 Pod 생성 실패 메시지가 태스크 로그에 직접 포함되는 등 디버깅이 크게 개선됩니다.
Dataset Listener Hook
데이터셋 생성 및 변경 이벤트를 구독하는 플러그인 훅입니다. 데이터 카탈로그 연동, 품질 검사 트리거, 외부 시스템 알림 등에 활용합니다.
XCom Tab in Grid View
Grid View에 XCom 탭이 추가되어 태스크 인스턴스의 XCom 키-값을 별도 화면 이동 없이 직접 확인할 수 있습니다.
Airflow 2.9 — Dataset 조건부 스케줄링
Dataset AND/OR 조건부 스케줄링
2.4에서 AND 조건만 지원하던 Dataset 스케줄링이 OR 및 AND/OR 혼합 조건을 지원합니다. Python 연산자(
&,|)로 직관적인 표현이 가능합니다.from airflow import Dataset customers = Dataset("s3://data-lake/customers/") orders = Dataset("s3://data-lake/orders/") products = Dataset("s3://data-lake/products/") inventory = Dataset("s3://data-lake/inventory/") # OR 조건: 하나만 업데이트되어도 실행 @dag(schedule=(customers | orders), ...) def or_scheduled(): ... # AND/OR 복합 조건 @dag(schedule=((customers & orders) | (products & inventory)), ...) def complex_scheduled(): ...DatasetOrTimeSchedule
Dataset 이벤트 기반 스케줄과 cron 기반 스케줄을 결합하는 Timetable입니다. 데이터가 오면 즉시 실행하고, 오지 않으면 정해진 시간에 실행하는 SLA 보장 패턴을 구현합니다.
from airflow.timetables.datasets import DatasetOrTimeSchedule from airflow.timetables.trigger import CronTriggerTimetable source = Dataset("s3://data-lake/raw/events/") @dag( schedule=DatasetOrTimeSchedule( timetable=CronTriggerTimetable("0 6 * * *", timezone="Asia/Seoul"), datasets=[source], ), ... ) def hybrid_scheduled(): ...
Airflow 2.10 — Hybrid Executor & Observability
Hybrid Executor (멀티 Executor, AIP-61)
단일 Airflow 환경에서 여러 Executor를 동시에 사용합니다. DAG 또는 태스크 레벨에서 실행할 Executor를 지정합니다.
# airflow.cfg [core] executor = LocalExecutor,KubernetesExecutor@task(executor="LocalExecutor") def lightweight_task(): return "quick work" @task(executor="KubernetesExecutor") def heavy_ml_task(): return "ML training"OpenTelemetry 시스템 트레이싱
Scheduler, Triggerer, Executor, Processor 컴포넌트가 OpenTelemetry 형식의 trace를 내보냅니다. Jaeger, Grafana Tempo 등 OTLP 호환 엔드포인트로 전송해 DAG 실행 병목 구간을 시각적으로 파악합니다.
[traces] otel_on = True otel_host = localhost otel_port = 4317Triggerer Direct Execution
Deferrable Operator가 resume 시 Worker로 돌아가지 않고 Triggerer에서 직접 태스크를 완료합니다. Worker 슬롯을 사용하지 않아 대기 중 리소스가 절약됩니다.
DatasetAlias — 동적 데이터셋 (AIP-74)
런타임에 실제 Dataset URI를 동적으로 결정하는 기능입니다. 태스크가 어떤 파일을 생성할지 실행 전에 알 수 없는 경우(날짜별 동적 파일명 등)에 유용합니다.
- 코드 예시 — DatasetAlias 기본 사용
from airflow.datasets import DatasetAlias, Dataset from airflow.decorators import dag, task from datetime import datetime # --- Producer DAG --- @dag(dag_id="alias_producer", schedule="@daily", start_date=datetime(2024, 1, 1)) def alias_producer_dag(): @task(outlets=[DatasetAlias("daily-report-output")]) def generate_report(*, outlet_events, ds): # 런타임에 실제 URI 결정 (날짜별 동적 경로) actual_uri = f"s3://reports/daily/{ds}/report.parquet" outlet_events[DatasetAlias("daily-report-output")].add( Dataset(actual_uri), extra={"row_count": 50000, "generated_at": ds}, ) generate_report() alias_producer_dag() # --- Consumer DAG (Alias를 구독) --- @dag( dag_id="alias_consumer", schedule=[DatasetAlias("daily-report-output")], # 어떤 URI든 트리거 start_date=datetime(2024, 1, 1), ) def alias_consumer_dag(): @task(inlets=[DatasetAlias("daily-report-output")]) def process_report(*, inlet_events): events = inlet_events[DatasetAlias("daily-report-output")] print(f"마지막 이벤트: {events[-1].extra}") process_report() alias_consumer_dag()
Task Instance 실행 이력 추적
태스크가 retry되거나 clear될 때 이전 실행 기록이 보존됩니다. Grid View에서 모든 시도(attempt)의 이력을 조회할 수 있어 간헐적 실패 패턴 분석이 가능합니다.
Airflow 3.0 — 역대 최대 메이저 릴리즈
Task SDK 분리 (apache-airflow-task-sdk)
DAG 작성에 필요한 인터페이스가 독립 패키지
apache-airflow-task-sdk로 분리되었습니다. 모든 DAG은 이제airflow.sdk에서 임포트해야 합니다.# Airflow 2.x (deprecated) from airflow import DAG from airflow.decorators import task # Airflow 3.0+ (권장) from airflow.sdk import DAG, task, dag, task_groupDAG Versioning (AIP-63)
DAG 구조 변경(태스크 추가/삭제, 의존성 변경 등)이 메타데이터 DB에 자동으로 추적됩니다.
- 구조 변경 시마다 새 버전이 자동 생성됩니다
- 각 DAG Run은 실행 당시의 버전에 고정됩니다
- 실행 중인 DAG Run은 새 버전의 영향을 받지 않아 일관성이 보장됩니다
시너지: 과거 특정 버전의 DAG 로직으로 Backfill 실행이 가능합니다.
Asset (Dataset → Asset 이름 변경 및 개선, AIP-74)
Dataset이Asset으로 이름이 변경되었으며AssetWatcher를 통한 이벤트 드리븐 스케줄링이 도입되었습니다.from airflow.sdk import Asset from airflow.timetables.assets import AssetOrTimeSchedule asset_a = Asset("s3://bucket/dataset-a") asset_b = Asset("s3://bucket/dataset-b") @dag(schedule=(asset_a & asset_b), ...) # 둘 다 업데이트 시 실행 def pipeline_and(): ... @dag(schedule=(asset_a | asset_b), ...) # 하나만 업데이트 시 실행 def pipeline_or(): ... # Asset + 시간 복합 스케줄 @dag(schedule=AssetOrTimeSchedule( timetable=CronTriggerTimetable("0 6 * * *", timezone="UTC"), assets=(asset_a,) ), ...) def pipeline_hybrid(): ...Scheduler 관리 Backfill API (AIP-78)
Backfill이 이제 Scheduler가 직접 관리하며 REST API와 UI를 통해 제어합니다.
# CLI airflow backfill create \ --dag-id data_pipeline \ --start-date 2025-01-01 \ --end-date 2025-03-31 \ --max-active-runs 5 # REST API curl -X POST "http://airflow:8080/api/v2/backfills" \ -d '{"dag_id":"data_pipeline","from_date":"2025-01-01T00:00:00Z","to_date":"2025-03-31T00:00:00Z"}'Edge Executor (AIP-69)
태스크를 중앙 Airflow 클러스터 외부의 분산/원격 환경에서 실행하는 Executor입니다. IoT/엣지 컴퓨팅 환경, 네트워크 격리된 내부망에서도 태스크 실행이 가능합니다.
@task(executor="EdgeExecutor", queue="gpu_cluster") def train_model(): # 원격 GPU 서버에서 실행 passReact + FastAPI UI (AIP-84)
Flask AppBuilder 기반의 기존 UI가 완전히 제거되고 React + FastAPI로 재구축되었습니다. REST API도
/api/v1→/api/v2로 교체되었습니다.2.x → 3.0 주요 Breaking Changes
이전 버전에서 정상 작동하던 코드가 3.0에서 동작하지 않을 수 있습니다.
1.
execution_date컨텍스트 키 제거# BEFORE (Airflow 2.x) def my_task(**context): execution_date = context['execution_date'] # 타당 datetime 객체 # AFTER (Airflow 3.x) — execution_date 컨텍스트 키 제거 def my_task(**context): logical_date = context['logical_date'] # 의미 변경: run_after 기준 data_interval_start = context['data_interval_start'] data_interval_end = context['data_interval_end']주의: 2.x에서
logical_date == data_interval_start였지만, 3.x에서는logical_date == run_after(실행 예약 시점)로 의미가 변경됩니다.2. Cron 기본 Timetable 변경
Airflow 2.x에서는
CronDataIntervalTimetable이 기본값이었지만, 3.0에서는CronTriggerTimetable이 기본값이 되었습니다. 동일한 cron 표현식이라도 DAG 실행 시점과 data_interval 범위가 달라질 수 있습니다.# 2.x 동작을 유지하려면 명시적으로 지정 from airflow.timetables.interval import CronDataIntervalTimetable @dag(timetable=CronDataIntervalTimetable("0 0 * * *", timezone="UTC")) def my_dag(): pass3. 주요 제거/변경 항목
항목 변경 내용 SubDagOperator완전 제거 → TaskGroup사용SLA Miss 제거 → Deadline Alerts(3.1)로 대체 XCom Pickle 보안상 기본 비활성화 → JSON 직렬화 사용 airflow.models.DAGimportairflow.sdk.DAG권장FAB Auth Manager 코어에서 분리 → pip install apache-airflow-providers-fabWorker DB 직접 접근 완전 제거 → Execution API만 사용 REST API 경로 /api/v1→/api/v2# 마이그레이션 체크 도구 pip install ruff ruff check --select AIR301,AIR302,AIR311,AIR312 ./dags/ # AIR301/AIR302: Breaking change (import 경로, 제거된 기능) # AIR311/AIR312: 비-breaking 권고로 수정 사항
Airflow 3.1 — Human-Centered Workflows
Human-in-the-Loop (AIP-90)
DAG 실행 중 인간의 승인/결정/입력을 받아 워크플로우를 계속 진행하는 기능입니다. 4가지 Operator (
HITLOperator,ApprovalOperator,HITLBranchOperator,HITLEntryOperator) 모두 Deferrable로 구현되어 Worker 슬롯을 점유하지 않습니다.- 코드 예시 — 보고서 배포 승인 워크플로우
from airflow.providers.standard.operators.hitl import HITLOperator from airflow.sdk import dag, task from datetime import datetime @dag(dag_id="content_approval", schedule="@daily", start_date=datetime(2025, 9, 1)) def content_pipeline(): @task def generate_report() -> dict: return {"content": "월간 매출 보고서", "amount": 1_500_000} # 인간 승인 대기 (Worker 슬롯 미점유) approval = HITLOperator( task_id="manager_approval", subject="보고서 배포 승인 요청", message="아래 보고서를 외부에 배포해도 됩니까?", params={ "decision": { "type": "select", "options": ["승인", "반려", "수정 후 재검토"], "default": "승인", }, "comment": {"type": "text", "label": "의견 (선택사항)"}, }, timeout=86400, # 24시간 타임아웃 ) @task def publish(approval_result: dict): if approval_result["decision"] == "승인": print("보고서 배포 완료") else: print(f"반려: {approval_result.get('comment')}") report = generate_report() publish(approval) content_pipeline()
Deadline Alerts (AIP-86)
DAG Run이 지정된 시간을 초과하면 자동으로 알림을 보내는 기능입니다. 기존 SLA를 대체하며 더 유연한 기준점을 지원합니다.
from airflow.deadline import DeadlineAlert, AsyncCallback from datetime import timedelta @dag( deadline_alerts=[ DeadlineAlert( reference="logical_date", # logical_date / queued_at / fixed_datetime interval=timedelta(hours=2), # 2시간 이내 완료 안 되면 알림 callback=AsyncCallback(my_alert_callback), ) ], ... ) def critical_etl(): ...17개 언어 i18n 지원
Airflow UI가 한국어를 포함한 17개 언어를 지원합니다. 브라우저 설정을 자동 감지하며, 아랍어·히브리어의 RTL 레이아웃도 지원합니다.
주요 기능 심층 분석
Dataset / Asset
Dataset(2.4)은 시간 기반 스케줄링의 한계를 극복하는 데이터 의존성 기반 스케줄링 패러다임입니다.
핵심 패턴: Producer 태스크에
outlets=[my_asset]을 선언하면 해당 태스크 완료 시 Asset이 업데이트됩니다. Consumer DAG는schedule=my_asset으로 Asset 업데이트를 구독합니다.버전 주요 변경사항 2.4 Dataset 도입, Producer/Consumer 패턴, AND 조건만 지원 2.8 Dataset Listener Hook 추가 (이벤트 구독) 2.9 AND/OR 조건부 스케줄링, DatasetOrTimeSchedule 도입 3.0 Dataset → Asset 이름 변경, AssetWatcher 추가 (이벤트 드리븐) 3.1 Asset Partitioning 기초 도입 Dynamic Task Mapping
2.3에서 도입된 Dynamic Task Mapping은 이후 버전에서 지속적으로 개선되었습니다.
- 2.3:
expand(),partial()도입. 기본 사용법 지원 - 2.5: 매핑된 태스크의 출력을 다시 매핑하는 체인 매핑 안정화
- 2.6+:
expand_kwargs()지원, 다양한 엣지 케이스 수정
max_map_length기본값은 1024이며 설정에서 조정할 수 있습니다. 결과값은 lazy sequence로 처리되어 메모리 효율적입니다.Timetable
Timetable은 Airflow 2.2(AIP-39)에서 도입된 스케줄링 추상화 계층으로, cron 표현식의 한계를 Python 코드로 극복합니다.
클래스 도입 버전 용도 CronDataIntervalTimetable2.2 데이터 인터벌 기반 cron (레거시, DAG 활성화 시 이전 구간 run 생성) CronTriggerTimetable2.4 트리거 방식 cron (3.0 기본값), data_interval_start == data_interval_end EventsTimetable2.4 특정 날짜 목록에만 실행 ContinuousTimetable2.6 이전 run 완료 즉시 다음 실행, max_active_runs=1 필수 DeltaTriggerTimetable2.11 트리거 방식 timedelta (3.0에서 timedelta 기본값) MultipleCronTriggerTimetable3.0 복수 cron 표현식 (하루에 여러 다른 시각) AssetOrTimeSchedule3.0 시간+Asset 복합 스케줄 커스텀 Timetable 구현 시
Timetable클래스를 상속하고next_dagrun_info(),serialize(),deserialize()를 구현한 후AirflowPlugin으로 등록합니다. 영업일 스케줄, 음력 기반 스케줄 등 임의의 복잡한 스케줄을 표현할 수 있습니다.Timezone
Airflow는 내부적으로 모든 datetime을 UTC로 저장하며, timezone 처리에 pendulum 라이브러리를 표준으로 채택합니다.
import pendulum from airflow.timetables.trigger import CronTriggerTimetable with DAG( "kst_dag", schedule=CronTriggerTimetable("0 9 * * 1-5", timezone="Asia/Seoul"), start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Seoul"), catchup=False, ) as dag: pass주요 주의사항:
- UTC 사용 권장: 가능하면 UTC로 작성하는 것이 가장 안전합니다
- Naive datetime 금지:
datetime(2024, 1, 1)대신pendulum.datetime(2024, 1, 1, tz="UTC")사용 - DST 자동 처리: pendulum이 일광절약시간 전환을 자동으로 관리합니다
- 템플릿 변수는 UTC 기준이며 자동 변환되지 않습니다
- Airflow 3.0에서
logical_date의 의미가 변경되었습니다 (data_interval_start→run_after기준)
기능 시너지
기능 조합 시너지 효과 Dynamic Task Mapping + Dataset/Asset 동적으로 생성된 각 태스크가 개별 Asset을 업데이트해 하위 DAG를 트리거합니다 Setup/Teardown + Notifiers 클러스터 삭제 실패 시 즉각 알림을 발송해 비용 누수를 방지합니다 ContinuousTimetable + Deferrable Operator Worker 슬롯 점유 없이 이벤트 대기형 파이프라인을 구현합니다 Dataset OR + DatasetOrTimeSchedule 데이터가 오면 즉시 실행, 오지 않으면 정해진 시간에 실행하는 SLA 보장 패턴을 구현합니다 Task SDK + Edge Executor 원격 환경에 Task SDK만 설치하면 됩니다. 컨테이너 이미지를 최소화합니다 HITL + Deadline Alerts 승인 기한 초과 시 자동 알림으로 비즈니스 SLA를 강제 적용합니다 DAG Versioning + Backfill API 과거 특정 버전의 DAG 로직으로 안전하게 재처리할 수 있습니다 Hybrid Executor + OpenTelemetry 어느 Executor에서 지연이 발생하는지 trace로 즉시 식별합니다 Params UI + Setup/Teardown 트리거 시 클러스터 크기, 리전 등을 UI 폼에서 동적 입력합니다
결론