ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] SLAs 사용하기
    공부/데이터 2023. 4. 10. 00:22

    airflow의 DAG에서 각 task 단계는 일정 시간 내에 실행되어야 합니다. 이 때, SLA(Service Level Agreement)를 사용하여 작업 단계가 제때 실행되는지 확인할 수 있습니다. 아래에서 SLA가 무엇인지, 어떻게 사용하는지와 현재 airflow에서 SLA의 한계까지 설명합니다.

    • 여기서 airflow 2.3.4 버전 기준으로 설명합니다.

    SLA란?

    Service Level Agreement (SLA)는 서비스 수준 협약으로, 서비스 제공자와 이용자 간에 서비스 제공에 대한 약속 내용을 명시한 문서입니다. 이를 통해 서비스 제공자는 이용자에게 최소한의 서비스 수준을 보장하며, 이용자는 이에 대한 보상을 받을 수 있습니다.

    Airflow SLA 사용하기

    SLA는 task 단위로 설정하거나 dag 단위로 설정할 수 있습니다.

    task에 sla 적용하기

    task 파라미터로 sla 옵션을 입력하면 됩니다.

    load = PythonOperator(
                    task_id="load_delta",
                                    ......,
                    sla=timedelta(minutes=5),
    )
    
    delete_delta = BigQueryDeleteTableOperator(
                    task_id="delete_delta",
                                    ......,
                    sla=timedelta(minutes=15),
                )
    load >> delete_delta

    여기서 sla 입력 방식은 다소 직관적이지 않습니다. sla는 dag가 실행한 시점을 기준으로 적용이 됩니다. 위 예시에서 load는 dag가 시작한지 5분이 지나면 task가 지연이라고 sla miss가 발생합니다. delete_delta의 경우, dag가 시작한지 15분이 지나면 task가 지연이라고 sla miss가 발생합니다.

    dag에 sla 적용하기

    sla를 task 별로 설정하는 것이 아닌 dag 단위로 일괄로 적용할 수 있습니다. 이 때, default_args에 sla 파라미터를 추가해서 사용해야 합니다. default_args가 아닌 DAG의 파라미터로 사용한다면 오류가 발생합니다.

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2022, 1, 1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'sla': timedelta(hours=1)
    }
    

    위 예시에서 dag가 시작한지 1시간이 지나면 오류를 발생합니다. 이 때, 해당 dag의 sla miss가 발생하는 것이 아닌, 1시간이 지난 시점에 완료가 되지 않은 모든 task가 sla miss를 발생시킵니다. 예를 들어, 100개의 task 중, dag가 시작하고 1시간이 지난 시점에서 50개만 성공을 했다면 50개의 task는 동시에 sla miss를 발생시킵니다.

    sla miss 커스터마이징하기 (sla miss callback 수정하기)

    airflow에서 제공하는 sla miss 오류를 그대로 사용할 수 있지만 사용자가 메시지를 수정하거나 알림을 이메일이 아닌 슬랙으로도 줄 수 있습니다.

    DAG 의 파라미터로 sla_miss_callback 에 커스텀 함수명을 입력하고 커스텀 함수에서 원하는대로 수정하면 됩니다.

    def sla_miss_callback(
        dag: DAG,
        task_list: str,
        blocking_task_list: str,
        slas: List[SlaMiss],
        blocking_tis: List[TaskInstance],
    ):
        delay_task_instance = blocking_tis[0]
    
        dag_id = delay_task_instance.dag_id
        task_id = delay_task_instance.task_id
        run_id = delay_task_instance.run_id
        execution_date = str(delay_task_instance.execution_date)
        state = delay_task_instance.state
    
            # 아래는 슬랙으로 지연 task 알림을 주는 커스텀 함수 예시
        delay_task_slack_notification(dag_id, task_id, run_id, execution_date, state)
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2022, 1, 1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'sla': timedelta(hours=1),
    }
    
    with DAG(
        default_args=default_args,
        start_date=datetime(2021, 6, 30),
        schedule_interval="0 * * * *",
        dag_id='test_dag',
        catchup=False,
        sla_miss_callback=sla_miss_callback,
    ) as dag:
        .....

    여기서 중요한 점은 sla_miss_callback에 들어가는 파라미터는 위 예시처럼 고정으로 받을 수 있도록 명시해야 합니다.

    SLA Miss 확인하기

    airflow UI 좌측 상단에 Browse > SLA Misses 를 들어가면 sla miss인 리스트를 확인할 수 있습니다.

    또한 airflow checksla 명령어를 사용하여 SLA 미스가 발생한 DAG를 확인할 수 있습니다.

    SLA Miss로 슬랙 알림 보내기 한계

    여기까지 설명하면 해당 기능으로 dag나 특정 task가 일정 시간동안 끝나지 않을 때, 슬랙으로 알림을 바로 받을 수 있는 것처럼 보이지만 실제로는 그렇지가 않습니다.

    예를 들어, 30분 이상 지연 시, sla miss를 발생시키고 1시간 단위로 실행되는 dag 정보와 아래와 같이 있다고 가정합니다.

    def sla_miss_callback(
        dag: DAG,
        task_list: str,
        blocking_task_list: str,
        slas: List[SlaMiss],
        blocking_tis: List[TaskInstance],
    ):
        delay_task_instance = blocking_tis[0]
    
        dag_id = delay_task_instance.dag_id
        task_id = delay_task_instance.task_id
        run_id = delay_task_instance.run_id
        execution_date = str(delay_task_instance.execution_date)
        state = delay_task_instance.state
    
            # 아래는 슬랙으로 지연 task 알림을 주는 커스텀 함수 예시
        delay_task_slack_notification(dag_id, task_id, run_id, execution_date, state)
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2022, 1, 1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'sla': timedelta(minutes=30),
    }
    
    with DAG(
        default_args=default_args,
        start_date=datetime(2021, 6, 30),
        schedule_interval="0 * * * *",
        dag_id='test_dag',
        catchup=False,
        sla_miss_callback=sla_miss_callback,
    ) as dag:
        .....

    2023-04-10 15:00:00에 dag가 실행된다면 dag의 실행 시간은 2023-04-10 14:00:00이 됩니다.

    • start_date: 2023-04-10 15:00:00
    • execution_date(logical_date): 2023-04-10 14:00:00

    30분이 지난 시점에도 dag가 계속 실행 중이면 성공하지 않은 task를 모두 sla miss라고 등록하게 됩니다. 이후 슬랙 알림이 바로 올 것으로 예상하지만 즉시 오지 않고 해당 dag의 다음 스케줄인 2023-04-10 16:00:00에 sla miss 슬랙 알림이 오게 됩니다.

    정확하게 왜 이런지는 파악하지 못했지만 sla miss에 등록된 task를 자세히 보면 이상한 부분을 확인할 수 있습니다.

    2023-04-10 15:00:00에 실행되고 30분 이상 지연되어 sla miss로 등록된 task를 확인해보면 logical_date는 2023-04-10 14:00:00가 아닌 2023-04-10 15:00:00로 등록됩니다. 또한 이 시점에선 notification sent로 False 상태입니다. 따라서 sla miss로 등록은 됐지만 logical date가 미래 시점인 1시간 뒤이므로 슬랙 알림을 바로 보내는 것이 아닌, 1시간 뒤인 2023-04-10 15:00:00에 일괄로 보내는 것으로 예상됩니다.

    dag의 스케줄 주기가 짧다면 큰 문제가 되지 않지만 1시간, 1일 이상 주기라면 사실상 지연 알림이 제기능을 하지 못합니다.

    요약

    dag, task 단위로 지연 알림을 줄 수 있다.

    하지만 버그가 존재해 지연된 task는 그 다음 스케줄 시간에 일괄 발송된다.

    지연 시, 준실시간으로 알림을 보내고자 한다면 sla 기능은 사용하지 못한다.

    댓글