-
[Airflow] configuration 설명 및 파라미터 튜닝공부/데이터 2022. 6. 23. 01:54
간단하게 사용한다면 airflow 기본 설정으로도 가능하지만 많은 dag나 task들을 병렬로 처리하고자 한다면 튜닝이 진행되어야 합니다.
튜닝을 진행할 때엔 아래와 같이 3가지 단계로 진행할 수 있습니다.
- Airflow 환경-level
- DAG-level
- Task-level
파라미터 조정
Airflow에는 성능에 영향을 미치는 많은 파라미터가 존재하는데 이러한 설정을 조정하면 DAG, task 스케쥴링 성능, 병렬처리 등에 영향을 줄 수 있습니다.
Airflow 환경-level
airflow 환경을 조정하는 것이므로 모든 DAG에 영향을 주는 설정입니다.
airflow.cfg
라는 설정파일을 수정하거나 재정의하여 적용할 수 있습니다. 해당 파일의 기본값이나 설명은 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html 에서 확인할 수 있습니다.현재 적용된 airflow의 환경설정값 확인은 airflow UI에서 Admin > Configuration 에서 확인할 수 있습니다.
core
core 영역은 동시에 실행되는 프로세스의 수와 해당 프로세스를 실행할 수 있는 시간을 제어합니다.
parallelism
: 단일 Airflow 환경 내에서 동시에 실행할 수 있는 최대 task의 수입니다. 예를 들어 이 설정을 32로 설정하면 모든 DAG에서 한 번에 32개 이하의 task를 실행할 수 있습니다. task가 오랫동안 queuing상태면 이 값을 늘려야 할 수 있습니다. 기본값은 32입니다.max_active_tasks_per_dag
(이전dag_concurrency
): DAG당 한 번에 스케줄링되는 최대 task의 수를 결정합니다. 이 설정을 너무 높게 설정하면 하나의 DAG가 병렬 처리 또는 풀에서 사용 가능한 슬롯을 너무 많이 차지하지 하여 다른 DAG의 task가 대기 상태에 빠질 수 있으니 잘 조절하는 것이 중요합니다. 기본값은 16입니다.Airflow에 사용할 수 있는 리소스(예: Celery worker 또는 Kubernetes 리소스)의 수를 늘렸는데 task가 늘린 수 만큼 실행되지 않는 경우,
parallelism
및max_active_tasks_per_dag
의 값도 같이 증가시켜야 합니다.max_active_runs_per_dag
: Airflow Scheduler가 주어진 시간에 생성할 수 있는 활성화된 DAG실행(DAG당)의 최대 수를 결정합니다. Airflow에서 DAG 실행 은 task 인스턴스가 task의 인스턴스화를 나타내는 것처럼 시간에 따른 DAG의 인스턴스화를 나타냅니다. 이 매개변수는 Airflow가 backfill이라고도 하는 누락된 DAG 실행을 따라잡아야 하는 경우 가장 관련이 있습니다.간단하게 말해 1개의 DAG 안에서 최대 활성화되는 DAG의 수를 제어하는 값입니다. 기본값은 16입니다.
dag_file_processor_timeout
: DAG 파일을 처리 하는DagFileProcessor
가 시간 초과되기 전에 실행할 수 있는 시간입니다. 기본값은 50초입니다.dagbag_import_timeout
: DAG 개체를 가져올 수 있는dagbag
시간(초)이며dag_file_processor_timeout
에 설정된 값보다 낮아야 합니다. DAG 처리 로그에 시간 초과가 표시되거나 DAG가 DAG 목록이나 가져오기 오류에 모두 표시되지 않으면 이 값을 늘려 보십시오. task가 실행될 때, worker는dagbag
을 채워야 하므로 만약 task가 실행되지 않는 경우 이 값을 늘려서 해결할 수도 있습니다. 기본값은 30초입니다.scheduler
스케줄러는 DAG 파일을 분석하고 실행을 생성하는 방법을 제어합니다.
min_file_process_interval
: 각 DAG 파일은min_file_process_interval
초 단위로 구문 분석됩니다. DAG에 대한 업데이트는 설정된 해당 값 이후에 반영됩니다. 해당 설정값이 낮으면 스케줄러의 CPU 사용량이 증가합니다. 복잡한 코드로 생성된 동적 DAG가 있는 경우 스케줄러 성능에 부정적인 영향을 미치지 않도록 이 값을 늘릴 수 있습니다. 기본값은 30초입니다.dag_dir_list_interval
: DAG 디렉터리에서 새 파일을 검색하는 빈도(초)입니다. 값이 낮을수록 새 DAG가 더 빨리 처리되지만 스케줄러의 CPU 사용량은 높아집니다. 기본값은 300초입니다.min_file_process_interval
와dag_dir_list_interval
중 어떤 값을 선택할지 알기 위해서 DAG(dag_processing.total_parse_time
)를 구문 분석하는 데 걸리는 시간안다면 도움이 됩니다.dag_dir_list_interval
이 각 DAG를 구문 분석하는 데 걸리는 시간보다 짧으면 성능 문제가 발생할 수 있습니다.parsing_processes
(이전max_threads
): 스케줄러는 DAG를 구문 분석하기 위해 여러 프로세스를 병렬로 실행할 수 있습니다. 이 설정은 병렬로 실행할 수 있는 프로세스 수를 정의합니다. 사용 가능한 vCPU의 2배 값을 설정하는 것이 좋습니다. 이 값을 늘리면 DAG가 많은 경우 DAG를 보다 효율적으로 직렬화할 수 있습니다. 여러 스케줄러를 실행하는 경우 이 값이 각각 에 적용됩니다. 기본값은 2입니다.file_parsing_sort_mode
: 스케줄러가 구문 분석 순서를 결정하기 위해 DAG 파일을 나열하고 정렬하는 방법을 결정합니다.modified_time
,random_seeded_by_host
,alphabetical
중 하나로 설정합니다. 기본 값은modified_time
입니다.scheduler_heartbeat_sec
: 이 설정은 새 작업을 트리거하기 위해 스케줄러를 실행해야 하는 빈도(초)를 정의합니다. 기본값은 5초입니다.scheduler가 새로운 task를 찾는데 대기하는 시간으로 해당 값이 크다면 airflow UI에서 아래와 같은 화면을 볼 수도 있습니다.
max_dagruns_to_create_per_loop
: 스케줄러 loop당 Dag 실행을 생성할 최대 DAG 수입니다. 이 값을 값을 줄여 task 예약을 위한 리소스를 확보할 수 있습니다. 기본값은 10초입니다.max_tis_per_query
: 이 파라미터는 기본 스케줄링 loop에서 메타스토어에 대한 쿼리의 배치 크기를 변경합니다. 값이 높으면tis
쿼리당 더 많이 처리할 수 있지만 쿼리가 너무 복잡해지고 성능 병목 현상이 발생할 수 있습니다. 기본값은 512 쿼리입니다.DAG-level
DAG 수준 설정은 특정 DAG에만 적용되며 DAG 코드에 정의됩니다. DAG 수준과 환경 수준 모두에 설정이 있는 경우 DAG 수준 설정이 우선합니다.
max_active_runs
: DAG에 허용되는 활성 DAG 실행의 최대 수입니다. 설정한 값 이상으로 스케줄러는 새로운 활성 DAG 실행을 생성하지 않습니다. 이 설정이 정의되지 않은 경우 airflow 환경 설정값인max_active_runs_per_dag
이 적용됩니다.DAG를 사용 하는
backfill
경우 실수로 많은 수의 DAG 실행을 트리거하지 않도록 이 매개변수를 정의하는 것이 좋습니다.DAG에서
catchup
을 사용하여 backfill을 할 때, 너무 많은 DAG가 실행되지 않도록 해당 값을 설정하는 것이 좋습니다.max_active_tasks
: 하나의 DAG가 실행될 때 대해 동시에 실행할 수 있는 총 task의 수입니다. DAG 내의 병렬 처리를 제어합니다. 이 설정이 정의되지 않은 경우 airflow 환경 설정값max_active_tasks_per_dag
이 적용됩니다.concurrency
: 모든 DAG 실행에서 동시에 실행할 수 있는 최대 task 인스턴스 수입니다. 이 설정이 정의되지 않은 경우 airflow 환경 설정값max_active_tasks_per_dag
이 적용됩니다.Task-level
task 수준 설정은 task 연산자에서 정의되며 성능에 대한 보다 세부적인 제어를 할 수 있습니다.
max_active_tis_per_dag
(이전task_concurrency
): 이는 동일한 task가 모든 DAG 실행에서 동시에 실행할 수 있는 최대 횟수입니다. 예를 들어 작업이 데이터 테이블과 같이 한 번에 여러 작업에서 수정해서는 안 되는 경우 이 값을 1로 설정해 조절할 수 있습니다.pool
: 이 설정은 task에서 사용할 수 있는 pool의 사이즈를 정의합니다. pool은 임의의 task group의 동시 인스턴스 수를 제한하는 방법입니다. 이 설정은 많은 worker 또는 DAG가 병렬로 실행되지만 API 속도 제한을 피하려고 할 때 유용합니다.Executor 확장
celery와 kubernetes executor를 확장할 때 확인이 필요한 설정값들을 아래에서 설명합니다.
celery executor
Celery executor는 task를 실행하기 위해 항상 worker를 실행하고 있습니다. Celery executor를 확장하려면 Airflow에서 사용할 수 있는 worker의 수와 크기를 모두 고려해야 합니다. worker들을 스케일 아웃하거나 스케일업을 한다면 동시에 task들을 더 많이 실행할 수 있습니다.
각 Celery worker는 주어진 시간에 실행할 수 있는 작업의 수를 결정하는
worker_concurrency
를 조정할 수도 있습니다.worker_concurrency
를 늘리면 worker를 위해 CPU 나 메모리를 프로비저닝해야 할 수도 있습니다. 기본값은 16입니다.kubernetes executor
Kubernetes executor는 각 task 대해 Kubernetes 클러스터에서 pod를 시작합니다. 각 task는 자체 pod에서 실행되므로 개별적인 task 수준에서 리소스를 지정할 수 있습니다.
Kubernetes executor로 성능을 조정할 때 Kubernetes 클러스터의 지원 인프라를 고려하는 것이 중요합니다. 많은 사용자가 클러스터에서 자동 크기 조정을 활성화하여 Kubernetes의 탄력성의 이점을 얻을 수 있습니다.
스케줄러 loop당 생성할 수 있는 pod 수를 결정하는 airflow 환경-level인
worker_pods_creation_batch_size
를 조정할 수도 있습니다.AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE
기본값은 1이지만 동시에 실행되는 task가 있는 경우, 더 나은 성능을 위해 이 숫자를 조절할 것입니다. 값을 얼마나 높일 수 있는지는 Kubernetes 클러스터의 허용 오차에 따라 다릅니다.문제 해결 및 확장 방법
task의 스케줄링이 너무 오래 걸림
task를 스케줄링하기 위해 DAG를 분석할 리소스가 부족할 수도 있습니다.
아래의 값을 조절하도록 합니다.
parallelism
worker_concurrency
(celery를 사용하는 경우)
DAG가 queue 상태에서 멈춰서 running으로 넘어가지 않음
• 잠재적 원인: 예약 중인 작업 수가 Airflow 인프라의 용량을 초과했을 수도 있습니다.
아래와 같이 확인을 해 봅니다.
- Kubernetes executor를 사용하는 경우, 사용 가능한 리소스가 있는지 확인하고
worker_pods_creation_batch_size
를 늘릴 수 있는지 확인 - Celery executor를 사용하는 경우,
worker_concurrency
증가할 수 있는지 확인
개별 DAG는 task를 병렬로 실행하는데 문제가 있지만 다른 DAG에는 영향을 주지 않음
DAG 수준의 병목현상일 수도 있습니다.
아래의 값을 조절하도록 합니다.
max_active_task_per_dag
- pool (사용하고 있는 경우)
parallelism
airflow 구성 확장
여기서는 celery executor를 사용한다 가정하고 설명합니다. 동시에 실행하는 task 및 dag 수를 제어하는 값을 조절하는 것이 airflow를 확장하는 것에 핵심입니다.
[celery]worker_concurrency
[celery]worker_concurrency
매개변수는 Airflow worker가 동시에 실행할 수 있는 task의 최대 개수를 제어합니다. 이 값은 추가로 설명되는[core]parallelism
Airflow 구성 옵션에 따라 제한됩니다.[core]max_active_runs_per_dag
[core]max_active_runs_per_dag
Airflow 구성 옵션은 DAG당 최대 활성 DAG 실행 수를 제어합니다. 이 한도에 도달하면 스케줄러에서 DAG 실행을 추가로 만들지 않습니다.이 매개변수를 잘못 설정하면 지정된 시점에 DAG 실행 인스턴스를 더 만들 수 없으므로 스케줄러가 DAG 실행을 제한하는 문제가 발생할 수 있습니다.
[core]max_active_tasks_per_dag
[core]max_active_tasks_per_dag
Airflow 구성 옵션은 각 DAG에서 동시에 실행될 수 있는 최대 task 인스턴스 수를 제어합니다. DAG 수준 매개변수입니다.이 매개변수를 잘못 설정하면 지정된 시점에 실행할 수 있는 DAG task 수가 제한적이므로 단일 DAG 인스턴스 실행에 문제가 발생할 수 있습니다.
해결 방법:
[core]max_active_tasks_per_dag
를 늘립니다.[core]parallelism 및 pool
[core]parallelism
Airflow 구성 옵션은 이러한 task의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 task의 수를 제어합니다.전체 Airflow 설정에 대한 전역 매개변수입니다.
task가 한 pool에서 큐에 추가되고 실행됩니다. pool의 크기로 스케줄러가 특정 시점에 실행을 위해 큐에 추가할 수 있는 task 수가 제어됩니다. pool 크기가 너무 작으면
[core]parallelism
구성 옵션 및[celery]worker_concurrency
구성 옵션과 Airflow worker 수를 곱한 값으로 정의되는 기준에 아직 도달하지 않았더라도 스케줄러에서 실행을 위한 task를 큐에 추가할 수 없습니다.Airflow UI에서 pool 크기를 구성할 수 있습니다.
DAG 수준에서의 튜닝
추가적으로 보면 좋은 문서
https://airflow.apache.org/docs/apache-airflow/stable/faq.html
https://cloud.google.com/composer/docs/composer-2/troubleshooting-scheduling?hl=ko
https://cloud.google.com/composer/docs/composer-2/troubleshooting-dags?hl=ko
https://cloud.google.com/composer/docs/composer-2/known-issues?hl=ko
레퍼런스
https://www.astronomer.io/guides/airflow-scaling-workers/
https://jybaek.tistory.com/923
https://cloud.google.com/composer/docs/composer-2/troubleshooting-dags?hl=ko