ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Sensor의 reschedule과 poke 실행 모드 동작 원리
    공부/데이터 2023. 4. 16. 23:17

    airflow 공식 블로그에서도 각 모드에 대해 어떻게 동작이 달라지는지 설명이 나와 있지 않아 실제 코드를 보며 설명을 해보겠습니다

    poke 모드

    sensor에서 poke을 설정하고 해당 sensor가 실행되면 airflow UI에서 다음과 같은 로그를 볼 수 있습니다.

    [2023-04-10, 11:22:17 UTC] {external_task.py:184} INFO - Poking for tasks None in dag  on 2023-04-10T10:00:00+00:00 ... 
    [2023-04-10, 11:23:17 UTC] {external_task.py:184} INFO - Poking for tasks None in dag  on 2023-04-10T10:00:00+00:00 ... 

    poke모드의 경우, Sensor가 작업을 감지할 때까지 pool의 slot을 계속 점유하면서 일정 시간 간격으로 Sensor를 실행하며, 작업이 완료될 때까지 이러한 행동을 반복합니다.

    다음은 sensor 클래스에서 poke이 어떻게 작업을 멈췄다가 다시 진행하는지 해당하는 코드입니다.

    # airflow/sensors/base.py
    
    class BaseSensorOperator(BaseOperator, SkipMixin):
    ...
    def poke(self, context: Context) -> Union[bool, PokeReturnValue]:
            """
            Function that the sensors defined while deriving this class should
            override.
            """
            raise AirflowException('Override me.')
    
    def execute(self, context: Context) -> Any:
            started_at: Union[datetime.datetime, float]
    
            if self.reschedule:
    
                # If reschedule, use the start date of the first try (first try can be either the very
                # first execution of the task, or the first execution after the task was cleared.)
                first_try_number = context['ti'].max_tries - self.retries + 1
                task_reschedules = TaskReschedule.find_for_task_instance(
                    context['ti'], try_number=first_try_number
                )
                if not task_reschedules:
                    start_date = timezone.utcnow()
                else:
                    start_date = task_reschedules[0].start_date
                started_at = start_date
    
                def run_duration() -> float:
                    # If we are in reschedule mode, then we have to compute diff
                    # based on the time in a DB, so can't use time.monotonic
                    return (timezone.utcnow() - start_date).total_seconds()
    
            else:
                started_at = start_monotonic = time.monotonic()
    
                def run_duration() -> float:
                    return time.monotonic() - start_monotonic
    
            try_number = 1
            log_dag_id = self.dag.dag_id if self.has_dag() else ""
    
            xcom_value = None
            while True:
                poke_return = self.poke(context)
                if poke_return:
                    if isinstance(poke_return, PokeReturnValue):
                        xcom_value = poke_return.xcom_value
                    break
    
                if run_duration() > self.timeout:
                    # If sensor is in soft fail mode but times out raise AirflowSkipException.
                    if self.soft_fail:
                        raise AirflowSkipException(f"Snap. Time is OUT. DAG id: {log_dag_id}")
                    else:
                        raise AirflowSensorTimeout(f"Snap. Time is OUT. DAG id: {log_dag_id}")
                if self.reschedule:
                    next_poke_interval = self._get_next_poke_interval(started_at, run_duration, try_number)
                    reschedule_date = timezone.utcnow() + timedelta(seconds=next_poke_interval)
                    if _is_metadatabase_mysql() and reschedule_date > _MYSQL_TIMESTAMP_MAX:
                        raise AirflowSensorTimeout(
                            f"Cannot reschedule DAG {log_dag_id} to {reschedule_date.isoformat()} "
                            f"since it is over MySQL's TIMESTAMP storage limit."
                        )
                    raise AirflowRescheduleException(reschedule_date)
                else:
                    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
                    try_number += 1
            self.log.info("Success criteria met. Exiting.")
            return xcom_value

    BaseSensorOperator를 상속하는 sensor 오퍼레이터가 실행된다면(예: ExternalSensorOperator) execute 메소드가 호출되며 무한 루프로 poke 메소드를 실행하게 됩니다.

    poke 메소드에서 작업이 완료되지 않았다면 사용자가 설정한 시간만큼 time.sleep을 실행하게 됩니다. 따라서 slot을 계속 점유하면서 인터벌까지 멈춰있는 상태입니다.

    또한 인터벌이 도달하여 다시 루프문을 돈다면 poke 메소드를 호출하므로 사용자가 custom sensor operator를 만든다고 했을 때, poke 메소드를 상속받고 인스턴스 변수로 몇 번 실행이 되었는지도 노출할 수도 있습니다.

    [2023-04-10, 11:22:17 UTC] {logging_mixin.py:115} INFO - 1번째 시도 중
    [2023-04-10, 11:22:17 UTC] {external_task.py:184} INFO - Poking for tasks None in dag  on 2023-04-10T10:00:00+00:00 ... 
    [2023-04-10, 11:23:17 UTC] {logging_mixin.py:115} INFO - 2번째 시도 중
    [2023-04-10, 11:23:17 UTC] {external_task.py:184} INFO - Poking for tasks None in dag  on 2023-04-10T10:00:00+00:00 ...

    sensor 모드

    sensor에서 reschedule을 설정하고 해당 sensor가 실행되면 airflow UI에서 다음과 같은 로그를 볼 수 있습니다.

    --------------------------------------------------------------------------------
    [2023-04-10, 04:16:02 UTC] {taskinstance.py:1370} INFO - Starting attempt 1 of 1441
    [2023-04-10, 04:16:02 UTC] {taskinstance.py:1371} INFO - 
    --------------------------------------------------------------------------------
    .....
    --------------------------------------------------------------------------------
    [2023-04-10, 04:17:02 UTC] {taskinstance.py:1370} INFO - Starting attempt 1 of 1441
    [2023-04-10, 04:17:02 UTC] {taskinstance.py:1371} INFO - 
    --------------------------------------------------------------------------------
    .....
    --------------------------------------------------------------------------------
    [2023-04-10, 04:18:02 UTC] {taskinstance.py:1370} INFO - Starting attempt 1 of 1441
    [2023-04-10, 04:18:02 UTC] {taskinstance.py:1371} INFO - 
    --------------------------------------------------------------------------------
    .....--------------------------------------------------------------------------------
    [2023-04-10, 04:19:02 UTC] {taskinstance.py:1370} INFO - Starting attempt 1 of 1441
    [2023-04-10, 04:19:02 UTC] {taskinstance.py:1371} INFO - 
    --------------------------------------------------------------------------------

    reschedule 모드의 경우, sensor가 작업을 감지하지 못하면 다시 작업을 예약합니다. 예약한 작업 시간이 되기 전까지는 sensor의 동작을 일시 중지하기 때문에 pool의 slot도 차지하지 않습니다.

    다음은 sensor 클래스에서 reschedule이 어떻게 작업을 다시 예약하는지 해당하는 코드입니다.

    # airflow/sensors/base.py
    
    class BaseSensorOperator(BaseOperator, SkipMixin):
    ...
    def poke(self, context: Context) -> Union[bool, PokeReturnValue]:
            """
            Function that the sensors defined while deriving this class should
            override.
            """
            raise AirflowException('Override me.')
    
    def execute(self, context: Context) -> Any:
            started_at: Union[datetime.datetime, float]
    
            if self.reschedule:
    
                # If reschedule, use the start date of the first try (first try can be either the very
                # first execution of the task, or the first execution after the task was cleared.)
                first_try_number = context['ti'].max_tries - self.retries + 1
                task_reschedules = TaskReschedule.find_for_task_instance(
                    context['ti'], try_number=first_try_number
                )
                if not task_reschedules:
                    start_date = timezone.utcnow()
                else:
                    start_date = task_reschedules[0].start_date
                started_at = start_date
    
                def run_duration() -> float:
                    # If we are in reschedule mode, then we have to compute diff
                    # based on the time in a DB, so can't use time.monotonic
                    return (timezone.utcnow() - start_date).total_seconds()
    
            else:
                started_at = start_monotonic = time.monotonic()
    
                def run_duration() -> float:
                    return time.monotonic() - start_monotonic
    
            try_number = 1
            log_dag_id = self.dag.dag_id if self.has_dag() else ""
    
            xcom_value = None
            while True:
                poke_return = self.poke(context)
                if poke_return:
                    if isinstance(poke_return, PokeReturnValue):
                        xcom_value = poke_return.xcom_value
                    break
    
                if run_duration() > self.timeout:
                    # If sensor is in soft fail mode but times out raise AirflowSkipException.
                    if self.soft_fail:
                        raise AirflowSkipException(f"Snap. Time is OUT. DAG id: {log_dag_id}")
                    else:
                        raise AirflowSensorTimeout(f"Snap. Time is OUT. DAG id: {log_dag_id}")
                if self.reschedule:
                    next_poke_interval = self._get_next_poke_interval(started_at, run_duration, try_number)
                    reschedule_date = timezone.utcnow() + timedelta(seconds=next_poke_interval)
                    if _is_metadatabase_mysql() and reschedule_date > _MYSQL_TIMESTAMP_MAX:
                        raise AirflowSensorTimeout(
                            f"Cannot reschedule DAG {log_dag_id} to {reschedule_date.isoformat()} "
                            f"since it is over MySQL's TIMESTAMP storage limit."
                        )
                    raise AirflowRescheduleException(reschedule_date)
                else:
                    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
                    try_number += 1
            self.log.info("Success criteria met. Exiting.")
            return xcom_value

    BaseSensorOperator를 상속하는 sensor 오퍼레이터가 실행된다면(예: ExternalSensorOperator) execute 메소드가 호출되며 무한 루프로 poke 메소드를 실행하게 됩니다.

    하지만 reschedule의 경우, 작업을 감지 못했다면 무한 루프 아랫 부분의 if self.reschedule: 에서 다음 실행 스케줄을 계산하고 계산한 값을 AirflowRescheduleException 에 파라미터로 전달하여 오류처리를 진행합니다. 여기서 중요한 점은 해당 클래스가 실제 파이썬 exception 기능이 아닌, 스케줄을 다시 등록하는 용도로 쓰입니다.

    # airflow/models/taskinstance.py
    
    class TaskInstance(Base, LoggingMixin):
    ....
    @provide_session
        @Sentry.enrich_errors
        def _run_raw_task(
            self,
            mark_success: bool = False,
            test_mode: bool = False,
            job_id: Optional[str] = None,
            pool: Optional[str] = None,
            error_file: Optional[str] = None,
            session=NEW_SESSION,
        ) -> None:
        ....
        except AirflowRescheduleException as reschedule_exception:
                self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, session=session)
                session.commit()
                return

    다시 말해, reschedule 모드도 poke 모드와 동일하게 무한 루프 안에서 poke 메소드를 호출하지만 작업이 완료되지 않았다면 AirflowRescheduleException 를 호출하여 다음 인터벌을 등록하고 무한 루프를 빠져나갑니다. 그러므로 실행되는 로그를 보면 Starting attempt 1 of 1441 와 같이 시도 횟수가 1에서 증가하지 않습니다.

    • AirflowRescheduleException 에서 이름과 다른 동작(다음 스케줄을 등록)을 진행하기 때문에 코드가 명확하지 않음

    요약

    • poke, reschedule 모드 둘 다 execute메소드 내 무한루프로 감싸져 있고 내부에서 poke 메소드를 호출
    • poke 모드는 sensor 작업이 완료되지 않았다면 주어진 인터벌만큼 sleep을 함 (=pool의 slot을 계속 차지)
    • reschedule 모드는 sensor 작업이 완료되지 않았다면 다음 스케줄을 등록한 후, 무한루프를 빠져나가고 다음 인터벌에서 다시 execute 메소드를 호출
    • reschedule 모드의 동작 코드는 이상하게 짜여져 있음
    • custom sensor operator를 만든다면 poke모드가 더 쉽고 원하는대로 할 수 있음

    댓글