ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] configuration 설명 및 파라미터 튜닝
    공부/데이터 2022. 6. 23. 01:54

    간단하게 사용한다면 airflow 기본 설정으로도 가능하지만 많은 dag나 task들을 병렬로 처리하고자 한다면 튜닝이 진행되어야 합니다.

    튜닝을 진행할 때엔 아래와 같이 3가지 단계로 진행할 수 있습니다.

    • Airflow 환경-level
    • DAG-level
    • Task-level

    파라미터 조정

    Airflow에는 성능에 영향을 미치는 많은 파라미터가 존재하는데 이러한 설정을 조정하면 DAG, task 스케쥴링 성능, 병렬처리 등에 영향을 줄 수 있습니다.

    Airflow 환경-level

    airflow 환경을 조정하는 것이므로 모든 DAG에 영향을 주는 설정입니다. airflow.cfg 라는 설정파일을 수정하거나 재정의하여 적용할 수 있습니다. 해당 파일의 기본값이나 설명은 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html 에서 확인할 수 있습니다.

    현재 적용된 airflow의 환경설정값 확인은 airflow UI에서 Admin > Configuration 에서 확인할 수 있습니다.

    core

    core 영역은 동시에 실행되는 프로세스의 수와 해당 프로세스를 실행할 수 있는 시간을 제어합니다.

    parallelism : 단일 Airflow 환경 내에서 동시에 실행할 수 있는 최대 task의 수입니다. 예를 들어 이 설정을 32로 설정하면 모든 DAG에서 한 번에 32개 이하의 task를 실행할 수 있습니다. task가 오랫동안 queuing상태면 이 값을 늘려야 할 수 있습니다. 기본값은 32입니다.

    max_active_tasks_per_dag(이전 dag_concurrency): DAG당 한 번에 스케줄링되는 최대 task의 수를 결정합니다. 이 설정을 너무 높게 설정하면 하나의 DAG가 병렬 처리 또는 풀에서 사용 가능한 슬롯을 너무 많이 차지하지 하여 다른 DAG의 task가 대기 상태에 빠질 수 있으니 잘 조절하는 것이 중요합니다. 기본값은 16입니다.

    Airflow에 사용할 수 있는 리소스(예: Celery worker 또는 Kubernetes 리소스)의 수를 늘렸는데 task가 늘린 수 만큼 실행되지 않는 경우, parallelismmax_active_tasks_per_dag의 값도 같이 증가시켜야 합니다.

    max_active_runs_per_dag: Airflow Scheduler가 주어진 시간에 생성할 수 있는 활성화된 DAG실행(DAG당)의 최대 수를 결정합니다. Airflow에서 DAG 실행 은 task 인스턴스가 task의 인스턴스화를 나타내는 것처럼 시간에 따른 DAG의 인스턴스화를 나타냅니다. 이 매개변수는 Airflow가 backfill이라고도 하는 누락된 DAG 실행을 따라잡아야 하는 경우 가장 관련이 있습니다.

    간단하게 말해 1개의 DAG 안에서 최대 활성화되는 DAG의 수를 제어하는 값입니다. 기본값은 16입니다.

    dag_file_processor_timeout: DAG 파일을 처리 하는 DagFileProcessor가 시간 초과되기 전에 실행할 수 있는 시간입니다. 기본값은 50초입니다.

    dagbag_import_timeout: DAG 개체를 가져올 수 있는 dagbag시간(초)이며 dag_file_processor_timeout에 설정된 값보다 낮아야 합니다. DAG 처리 로그에 시간 초과가 표시되거나 DAG가 DAG 목록이나 가져오기 오류에 모두 표시되지 않으면 이 값을 늘려 보십시오. task가 실행될 때, worker는 dagbag 을 채워야 하므로 만약 task가 실행되지 않는 경우 이 값을 늘려서 해결할 수도 있습니다. 기본값은 30초입니다.

    scheduler

    스케줄러는 DAG 파일을 분석하고 실행을 생성하는 방법을 제어합니다.

    min_file_process_interval: 각 DAG 파일은 min_file_process_interval 초 단위로 구문 분석됩니다. DAG에 대한 업데이트는 설정된 해당 값 이후에 반영됩니다. 해당 설정값이 낮으면 스케줄러의 CPU 사용량이 증가합니다. 복잡한 코드로 생성된 동적 DAG가 있는 경우 스케줄러 성능에 부정적인 영향을 미치지 않도록 이 값을 늘릴 수 있습니다. 기본값은 30초입니다.

    dag_dir_list_interval: DAG 디렉터리에서 새 파일을 검색하는 빈도(초)입니다. 값이 낮을수록 새 DAG가 더 빨리 처리되지만 스케줄러의 CPU 사용량은 높아집니다. 기본값은 300초입니다.

    min_file_process_intervaldag_dir_list_interval 중 어떤 값을 선택할지 알기 위해서 DAG(dag_processing.total_parse_time)를 구문 분석하는 데 걸리는 시간안다면 도움이 됩니다. dag_dir_list_interval이 각 DAG를 구문 분석하는 데 걸리는 시간보다 짧으면 성능 문제가 발생할 수 있습니다.

    parsing_processes(이전 max_threads): 스케줄러는 DAG를 구문 분석하기 위해 여러 프로세스를 병렬로 실행할 수 있습니다. 이 설정은 병렬로 실행할 수 있는 프로세스 수를 정의합니다. 사용 가능한 vCPU의 2배 값을 설정하는 것이 좋습니다. 이 값을 늘리면 DAG가 많은 경우 DAG를 보다 효율적으로 직렬화할 수 있습니다. 여러 스케줄러를 실행하는 경우 이 값이 각각 에 적용됩니다. 기본값은 2입니다.

    file_parsing_sort_mode: 스케줄러가 구문 분석 순서를 결정하기 위해 DAG 파일을 나열하고 정렬하는 방법을 결정합니다. modified_time, random_seeded_by_host, alphabetical 중 하나로 설정합니다. 기본 값은 modified_time입니다.

    scheduler_heartbeat_sec: 이 설정은 새 작업을 트리거하기 위해 스케줄러를 실행해야 하는 빈도(초)를 정의합니다. 기본값은 5초입니다.

    scheduler가 새로운 task를 찾는데 대기하는 시간으로 해당 값이 크다면 airflow UI에서 아래와 같은 화면을 볼 수도 있습니다.

    max_dagruns_to_create_per_loop: 스케줄러 loop당 Dag 실행을 생성할 최대 DAG 수입니다. 이 값을 값을 줄여 task 예약을 위한 리소스를 확보할 수 있습니다. 기본값은 10초입니다.

    max_tis_per_query: 이 파라미터는 기본 스케줄링 loop에서 메타스토어에 대한 쿼리의 배치 크기를 변경합니다. 값이 높으면 tis쿼리당 더 많이 처리할 수 있지만 쿼리가 너무 복잡해지고 성능 병목 현상이 발생할 수 있습니다. 기본값은 512 쿼리입니다.

    DAG-level

    DAG 수준 설정은 특정 DAG에만 적용되며 DAG 코드에 정의됩니다. DAG 수준과 환경 수준 모두에 설정이 있는 경우 DAG 수준 설정이 우선합니다.

    max_active_runs: DAG에 허용되는 활성 DAG 실행의 최대 수입니다. 설정한 값 이상으로 스케줄러는 새로운 활성 DAG 실행을 생성하지 않습니다. 이 설정이 정의되지 않은 경우 airflow 환경 설정값인 max_active_runs_per_dag이 적용됩니다.

    DAG를 사용 하는 backfill경우 실수로 많은 수의 DAG 실행을 트리거하지 않도록 이 매개변수를 정의하는 것이 좋습니다.

    DAG에서 catchup 을 사용하여 backfill을 할 때, 너무 많은 DAG가 실행되지 않도록 해당 값을 설정하는 것이 좋습니다.

    max_active_tasks: 하나의 DAG가 실행될 때 대해 동시에 실행할 수 있는 총 task의 수입니다. DAG 내의 병렬 처리를 제어합니다. 이 설정이 정의되지 않은 경우 airflow 환경 설정값 max_active_tasks_per_dag이 적용됩니다.

    concurrency: 모든 DAG 실행에서 동시에 실행할 수 있는 최대 task 인스턴스 수입니다. 이 설정이 정의되지 않은 경우 airflow 환경 설정값 max_active_tasks_per_dag이 적용됩니다.

    Task-level

    task 수준 설정은 task 연산자에서 정의되며 성능에 대한 보다 세부적인 제어를 할 수 있습니다.

    max_active_tis_per_dag (이전 task_concurrency): 이는 동일한 task가 모든 DAG 실행에서 동시에 실행할 수 있는 최대 횟수입니다. 예를 들어 작업이 데이터 테이블과 같이 한 번에 여러 작업에서 수정해서는 안 되는 경우 이 값을 1로 설정해 조절할 수 있습니다.

    pool: 이 설정은 task에서 사용할 수 있는 pool의 사이즈를 정의합니다. pool은 임의의 task group의 동시 인스턴스 수를 제한하는 방법입니다. 이 설정은 많은 worker 또는 DAG가 병렬로 실행되지만 API 속도 제한을 피하려고 할 때 유용합니다.

    Executor 확장

    celery와 kubernetes executor를 확장할 때 확인이 필요한 설정값들을 아래에서 설명합니다.

    celery executor

    Celery executor는 task를 실행하기 위해 항상 worker를 실행하고 있습니다. Celery executor를 확장하려면 Airflow에서 사용할 수 있는 worker의 수와 크기를 모두 고려해야 합니다. worker들을 스케일 아웃하거나 스케일업을 한다면 동시에 task들을 더 많이 실행할 수 있습니다.

    각 Celery worker는 주어진 시간에 실행할 수 있는 작업의 수를 결정하는 worker_concurrency를 조정할 수도 있습니다. worker_concurrency를 늘리면 worker를 위해 CPU 나 메모리를 프로비저닝해야 할 수도 있습니다. 기본값은 16입니다.

    kubernetes executor

    Kubernetes executor는 각 task 대해 Kubernetes 클러스터에서 pod를 시작합니다. 각 task는 자체 pod에서 실행되므로 개별적인 task 수준에서 리소스를 지정할 수 있습니다.

    Kubernetes executor로 성능을 조정할 때 Kubernetes 클러스터의 지원 인프라를 고려하는 것이 중요합니다. 많은 사용자가 클러스터에서 자동 크기 조정을 활성화하여 Kubernetes의 탄력성의 이점을 얻을 수 있습니다.

    스케줄러 loop당 생성할 수 있는 pod 수를 결정하는 airflow 환경-level인 worker_pods_creation_batch_size를 조정할 수도 있습니다. AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE기본값은 1이지만 동시에 실행되는 task가 있는 경우, 더 나은 성능을 위해 이 숫자를 조절할 것입니다. 값을 얼마나 높일 수 있는지는 Kubernetes 클러스터의 허용 오차에 따라 다릅니다.

    문제 해결 및 확장 방법

    task의 스케줄링이 너무 오래 걸림

    task를 스케줄링하기 위해 DAG를 분석할 리소스가 부족할 수도 있습니다.

    아래의 값을 조절하도록 합니다.

    • parallelism
    • worker_concurrency(celery를 사용하는 경우)

    DAG가 queue 상태에서 멈춰서 running으로 넘어가지 않음

    • 잠재적 원인: 예약 중인 작업 수가 Airflow 인프라의 용량을 초과했을 수도 있습니다.

    아래와 같이 확인을 해 봅니다.

    • Kubernetes executor를 사용하는 경우, 사용 가능한 리소스가 있는지 확인하고 worker_pods_creation_batch_size를 늘릴 수 있는지 확인
    • Celery executor를 사용하는 경우, worker_concurrency증가할 수 있는지 확인

    개별 DAG는 task를 병렬로 실행하는데 문제가 있지만 다른 DAG에는 영향을 주지 않음

    DAG 수준의 병목현상일 수도 있습니다.

    아래의 값을 조절하도록 합니다.

    • max_active_task_per_dag
    • pool (사용하고 있는 경우)
    • parallelism

    airflow 구성 확장

    여기서는 celery executor를 사용한다 가정하고 설명합니다. 동시에 실행하는 task 및 dag 수를 제어하는 값을 조절하는 것이 airflow를 확장하는 것에 핵심입니다.

    [celery]worker_concurrency

    [celery]worker_concurrency 매개변수는 Airflow worker가 동시에 실행할 수 있는 task의 최대 개수를 제어합니다. 이 값은 추가로 설명되는 [core]parallelism Airflow 구성 옵션에 따라 제한됩니다.

    [core]max_active_runs_per_dag

    [core]max_active_runs_per_dag Airflow 구성 옵션은 DAG당 최대 활성 DAG 실행 수를 제어합니다. 이 한도에 도달하면 스케줄러에서 DAG 실행을 추가로 만들지 않습니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 DAG 실행 인스턴스를 더 만들 수 없으므로 스케줄러가 DAG 실행을 제한하는 문제가 발생할 수 있습니다.

    [core]max_active_tasks_per_dag

    [core]max_active_tasks_per_dag Airflow 구성 옵션은 각 DAG에서 동시에 실행될 수 있는 최대 task 인스턴스 수를 제어합니다. DAG 수준 매개변수입니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 실행할 수 있는 DAG task 수가 제한적이므로 단일 DAG 인스턴스 실행에 문제가 발생할 수 있습니다.

    해결 방법: [core]max_active_tasks_per_dag를 늘립니다.

    [core]parallelism 및 pool

    [core]parallelism Airflow 구성 옵션은 이러한 task의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 task의 수를 제어합니다.

    전체 Airflow 설정에 대한 전역 매개변수입니다.

    task가 한 pool에서 큐에 추가되고 실행됩니다. pool의 크기로 스케줄러가 특정 시점에 실행을 위해 큐에 추가할 수 있는 task 수가 제어됩니다. pool 크기가 너무 작으면 [core]parallelism 구성 옵션 및 [celery]worker_concurrency 구성 옵션과 Airflow worker 수를 곱한 값으로 정의되는 기준에 아직 도달하지 않았더라도 스케줄러에서 실행을 위한 task를 큐에 추가할 수 없습니다.

    Airflow UI에서 pool 크기를 구성할 수 있습니다.

    DAG 수준에서의 튜닝

    https://stackoverflow.com/questions/56370720/how-to-control-the-parallelism-or-concurrency-of-an-airflow-installation/63955004#63955004

     

    추가적으로 보면 좋은 문서

    https://airflow.apache.org/docs/apache-airflow/stable/faq.html

    https://cloud.google.com/composer/docs/composer-2/troubleshooting-scheduling?hl=ko

    https://cloud.google.com/composer/docs/composer-2/troubleshooting-dags?hl=ko

    https://cloud.google.com/composer/docs/composer-2/known-issues?hl=ko

    레퍼런스

    https://www.astronomer.io/guides/airflow-scaling-workers/

    https://jybaek.tistory.com/923

    https://stackoverflow.com/questions/56370720/how-to-control-the-parallelism-or-concurrency-of-an-airflow-installation/63955004#63955004

    https://cloud.google.com/composer/docs/composer-2/troubleshooting-dags?hl=ko

    댓글