ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] custom operator, hook, sensor, util 등록하기
    공부/데이터 2021. 11. 14. 20:02

    plugins 분리하기

    실제 동작하는 dag와 custom으로 생성한 operator, hook, alert 등의 작업물을 분리하여 관리하는 방법입니다.

    https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html

    문서에 나와있는 것처럼 plugins 폴더 하위에 작성하면 airflow가 자동으로 경로를 인식합니다.

    /plugins/operators/custom_operator.py 폴더와 파일을 만들고 아래 코드를 추가합니다.

    from airflow.operators.bash import BashOperator
    
    class EvenNumberCheckOperator(BashOperator):
        def __init__(self, my_operator_param, *args, **kwargs):
            self.operator_param = my_operator_param
            super().__init__(*args, **kwargs)
    
        def execute(self, context):
            if not self.operator_param % 2:
                self.bash_command = 'echo even number'
            else:
                self.bash_command = 'echo odd number'
    
            super().execute(context)

    그 다음, 실제 사용하는 /dags/even/even_check_dag.py에서는 아래와 같이 코드를 추가합니다.

    from datetime import datetime
    
    from airflow import DAG
    
    from dags.defendencies.config.base import default_args
    from operators.custom_operator import EvenNumberCheckOperator ## 여기
    
    dag = DAG(
        dag_id='plugin_test', default_args=default_args, start_date=datetime(2021, 11, 6, 0, 0, 0),
        schedule_interval="@hourly", tags=['plugins_test'],
        catchup=False,
    )
    
    a = EvenNumberCheckOperator(
        task_id='eventest',
        my_operator_param=11,
        bash_command='echo ""',
        dag=dag
    )

    여기서 경로가 from plugins.operators.custom_operator import EvenNumberCheckOperator 이 아닌 것을 주의해야 합니다. airflow config 파일 내에 /plugins 이라고 경로가 잡혀있어서 plugins을 명시하면 airflow에서 오류로 인식하기 때문입니다.

    만약 IDE(파이참)에서 위와 같이 인식을 못할 경우, plugins 폴더에 source root를 지정해주면 해결됩니다.

    airflow 버전 2.x대부터 AirflowPlugin 클래스를 상속받아 airflow.{operators,sensors,hooks}.<plugin_name> 같은 형태를 지원하지 않고 일반 파이썬 모듈로 선언하도록 강제하고 있습니다.

    custom python util 생성하기

    파이썬으로 만든 util 파일도 plugins 하위에 관리를 해도 되지만 GCP의 composer 서비스에서는 dags 폴더 하위에 폴더를 생성하라고 가이드하고 있습니다.

    따라서 plugins 폴더에서 관리할지, dag 폴더 하위에서 관리할지는 각자의 선택일 것 같습니다.

    레퍼런스

    https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators
    https://airflow.apache.org/docs/apache-airflow/stable/plugins.html
    https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html
    https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
    https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies#viewing_installed_python_packages

    댓글