ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Taskflow API
    공부/데이터 2021. 11. 13. 19:45

    버전 2.X대에서 도입된 Taskflow API를 사용하여 데이터 파이프라인 튜토리얼을 설명하면서 기존 1.X대 버전과 비교를 해봅니다.

    Taskflow API

    taskflow는 간단하게 데코레이터를 사용해 DAG와 Task를 구성하는 방식입니다.

    전체 코드

    import json
    
    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    @dag(default_args=default_args, description='Taskflow API ETL DAG tutorial',
         schedule_interval=None, start_date=days_ago(2), tags=['taskflow api'])
    def tutorial_taskflow_api_etl():
        """
        ### TaskFlow API Tutorial Documentation
        This is a simple ETL data pipeline example which demonstrates the use of
        the TaskFlow API using three simple tasks for Extract, Transform, and Load.
        Documentation that goes along with the Airflow TaskFlow API tutorial is
        located
        [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
        """
    
        @task()
        def extract():
            """
            #### Extract task
            A simple Extract task to get data ready for the rest of the data
            pipeline. In this case, getting data is simulated by reading from a
            hardcoded JSON string.
            """
            data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    
            order_data_dict = json.loads(data_string)
            return order_data_dict
    
        @task(multiple_outputs=True)
        def transform(order_data_dict: dict):
            """
            #### Transform task
            A simple Transform task which takes in the collection of order data and
            computes the total order value.
            """
            total_order_value = 0
    
            for value in order_data_dict.values():
                total_order_value += value
    
            return {"total_order_value": total_order_value}
    
        @task()
        def load(total_order_value: float):
            """
            #### Load task
            A simple Load task which takes in the result of the Transform task and
            instead of saving it to end user review, just prints it out.
            """
    
            print(f"Total order value is: {total_order_value:.2f}")
    
        order_data = extract()
        order_summary = transform(order_data)
        load(order_summary["total_order_value"])
    
    # DAG 호출
    tutorial_etl_dag = tutorial_taskflow_api_etl()

    DAG 구성

    default_args = {
        'owner': 'airflow',
    }
    
    @dag(default_args=default_args, description='Taskflow API ETL DAG tutorial',
         schedule_interval=None, start_date=days_ago(2), tags=['taskflow api'])
    def tutorial_taskflow_api_etl():
        """
        ### TaskFlow API Tutorial Documentation
        This is a simple ETL data pipeline example which demonstrates the use of
        the TaskFlow API using three simple tasks for Extract, Transform, and Load.
        Documentation that goes along with the Airflow TaskFlow API tutorial is
        located
        [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
        """
    

    여기서는 retry나 복잡한 스케쥴링 없이 단순한 형태로 DAG를 구성한 케이스입니다. 물론 데코레이터 옵션으로 retry, scheduler 등등을 줄 수 있습니다. 여기서 dag_id는 데코레이터가 달린 함수의 이름이 됩니다. 또한 위의 형식처럼 함수 내 주석을 추가하면 Airflow UI 내에서 document으로 인식하여 노출됩니다.

    Task 구성

    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    
        order_data_dict = json.loads(data_string)
        return order_data_dict

    taskflow에서 task는 task 데코레이터를 사용하여 함수를 구성하면 됩니다. dag 데코레이터가 달린 함수 내에 내부 함수로 구성하며 dag와 동일하게 데코레이터가 달린 함수의 이름이 task_id가 됩니다.

    또한 함수 내, 주석을 추가하면 dag와 마찬가지로 document로 인식하여 UI에서 노출이 됩니다.

    또한 taskflow에서는 함수 내에서 반환한 값을 이후 task에서 사용할 수 있습니다.

    다중 반환

    @task(multiple_outputs=True)
        def transform(order_data_dict: dict):
            """
            #### Transform task
            A simple Transform task which takes in the collection of order data and
            computes the total order value.
            """
            total_order_value = 0
    
            for value in order_data_dict.values():
                total_order_value += value
    
            return {"total_order_value": total_order_value}

    multiple_outputs 옵션을 true로 설정하면 반환된 값은 여러 XCom 값으로 저장됩니다. 또한 list와 tuple은 언롤링되어 저장됩니다. dict의 경우엔 dict의 key를 XCom에서 key로 사용하고 값을 저장합니다.

    아래와 같이 typing의 Dict로 선언한다면 multiple_outputs 값을 수동으로 사용하지 않지 않아도 자동으로 추론하여 multiple_outputs 값을 true로 인식하게 됩니다.

    @task
    def identity_dict(x: int, y: int) -> Dict[str, int]:
        return {"x": x, "y": y}

    DAG의 flow

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

    함수 기반으로 작성한 task를 위와 같이 flow를 나타낼 수 있습니다.

    여기서 extract() 를 호출한 다음, 반환된 값을 transform(order_data) 함수 내에 파라미터 값으로 전달하여 변형한 뒤 여기서 반환된 값을 다시 load(order_summary["total_order_value"] 함수의 파라미터로 전달하는 로직입니다.

    데이터에 전달, 접근, 사용에 대한 Xcom 사용 1.X 버전과 동일하지만 airflow 2.0의 DAG author로부터 추상화되어 있기 때문에 편리하게 사용할 수 있습니다.

    Xcom을 사용하여 데이터를 전달하는 방식은 동일하기 때문에 airflow UI의 Admin → XComs 에서 확인할 수 있습니다.

    아래에서 설명하겠지만 기존 1.X 버전에서는 PythonOperator 를 사용한 task에서 다른 task에게 값을 전달하려면 Xcoms를 사용해야만 하지만 Taskflow API에서는 다른 노드 위에서 실행될 수 있는 task간 데이터 전달을 airflow가 처리합니다.

    tutorial_etl_dag = tutorial_taskflow_api_etl()

    그 다음, 글로벌 영역에서 dag 데코레이터가 붙은 함수를 호출하여 airflow에서 dag를 인식하게끔 만들면 모든 작업은 끝이 납니다.

    종속성 직접 설정하기

    위에선 파이썬 기반 task간 종속성을 자동으로 생성하는 것을 설명했는데 task가 실행되기 전 BashOperator나 Sensor 기반 task가 먼저 실행해야 된다면 기존과 같은 방법으로 선언하면 됩니다.

    @task()
    def extract_from_file():
        """
        #### Extract from file task
        A simple Extract task to get data ready for the rest of the data
        pipeline, by reading the data from a file into a pandas dataframe
        """
        order_data_file = '/tmp/order_data.csv'
        order_data_df = pd.read_csv(order_data_file)
    
    file_task = FileSensor(task_id='check_file', filepath='/tmp/order_data.csv')
    order_data = extract_from_file()
    
    file_task >> order_data

    기존 DAG 구성

    이제 기존 DAG를 구성하는 방법으로 위의 로직을 구현해보도록 합니다.

    """
    ### ETL DAG Tutorial Documentation
    This ETL DAG is compatible with Airflow 1.10.x (specifically tested with 1.10.12) and is referenced
    as part of the documentation that goes along with the Airflow Functional DAG tutorial located
    [here](https://airflow.apache.org/tutorial_decorated_flows.html)
    """
    import json
    from textwrap import dedent
    
    from airflow import DAG
    
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    with DAG(
        'tutorial_etl_dag',
        default_args=default_args,
        description='ETL DAG tutorial',
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example1'],
    ) as dag:
        dag.doc_md = __doc__
    
        def extract(**kwargs):
            ti = kwargs['ti']
            data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
            ti.xcom_push('order_data', data_string)
    
        def transform(**kwargs):
            ti = kwargs['ti']
            extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
            order_data = json.loads(extract_data_string)
    
            total_order_value = 0
            for value in order_data.values():
                total_order_value += value
    
            total_value = {"total_order_value": total_order_value}
            total_value_json_string = json.dumps(total_value)
            ti.xcom_push('total_order_value', total_value_json_string)
    
        def load(**kwargs):
            ti = kwargs['ti']
            total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value')
            total_order_value = json.loads(total_value_string)
    
            print(total_order_value)
    
        extract_task = PythonOperator(
            task_id='extract',
            python_callable=extract,
        )
        extract_task.doc_md = dedent(
            """\
        #### Extract task
        A simple Extract task to get data ready for the rest of the data pipeline.
        In this case, getting data is simulated by reading from a hardcoded JSON string.
        This data is then put into xcom, so that it can be processed by the next task.
        """
        )
    
        transform_task = PythonOperator(
            task_id='transform',
            python_callable=transform,
        )
        transform_task.doc_md = dedent(
            """\
        #### Transform task
        A simple Transform task which takes in the collection of order data from xcom
        and computes the total order value.
        This computed value is then put into xcom, so that it can be processed by the next task.
        """
        )
    
        load_task = PythonOperator(
            task_id='load',
            python_callable=load,
        )
        load_task.doc_md = dedent(
            """\
        #### Load task
        A simple Load task which takes in the result of the Transform task, by reading it
        from xcom and instead of saving it to end user review, just prints it out.
        """
        )
    
        extract_task >> transform_task >> load_task

    Dag클래스를 context문으로 열고 그 하위에 task를 선언한 기존의 DAG 구성 방식입니다.

    여기서 transform() 에서 처리되는 데이터는 Xcom으로 전달된 변수를 사용하고 있습니다. 이렇게 다른 task에 데이터를 전달하기 위해 Xcom을 직접 구현, 선언하는 방식은 Taskflow 보다는 불편해 보입니다.

    또한 문서화도 큰 차이가 보이는데 1.x 버전에서는 문서화 작업이 코드 가독성을 떨어트리는 것을 볼 수 있습니다.

    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract,
    )
    extract_task.doc_md = dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    task의 flow를 지정하는 방식도 2.0에서는 함수 호출 순으로 자동으로 생성하지만 1.x대 버전은 직접 명시를 해줘야합니다.

    extract_task >> transform_task >> load_task

    가상 환경을 사용한 Taskflow API 사용하기

    airflow 2.0.3부터 가상환경에서 Taskflow API를 사용할 수 있습니다. 이 기능은 airflow worker 및 시스템 라이브러리에 영향을 받지 않으므로 자유롭게 구성할 수 있습니다.

    import json
    
    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    @dag(default_args=default_args, description='Taskflow API ETL DAG tutorial',
         schedule_interval=None, start_date=days_ago(2), tags=['taskflow api'])
    def tutorial_taskflow_api_etl_virtualenv():
        """
        ### TaskFlow API Tutorial Documentation
        This is a simple ETL data pipeline example which demonstrates the use of
        the TaskFlow API using three simple tasks for Extract, Transform, and Load.
        Documentation that goes along with the Airflow TaskFlow API tutorial is
        located
        [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
        """
        @task.virtualenv(
            use_dill=True,
            system_site_packages=False,
            requirements=['funcsigs'],
        )
        def extract():
            """
            #### Extract task
            A simple Extract task to get data ready for the rest of the data
            pipeline. In this case, getting data is simulated by reading from a
            hardcoded JSON string.
            """
            data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    
            order_data_dict = json.loads(data_string)
            return order_data_dict
    
        @task.virtualenv(multiple_outputs=True)
        def transform(order_data_dict: dict):
            """
            #### Transform task
            A simple Transform task which takes in the collection of order data and
            computes the total order value.
            """
            total_order_value = 0
    
            for value in order_data_dict.values():
                total_order_value += value
    
            return {"total_order_value": total_order_value}
    
        @task()
        def load(total_order_value: float):
            """
            #### Load task
            A simple Load task which takes in the result of the Transform task and
            instead of saving it to end user review, just prints it out.
            """
    
            print(f"Total order value is: {total_order_value:.2f}")
    
        order_data = extract()
        order_summary = transform(order_data)
        load(order_summary["total_order_value"])
    
    tutorial_etl_dag = tutorial_taskflow_api_etl_virtualenv()

    task를 가상환경에서 실행하려면 @task.virtualenv 를 추가하면 됩니다. 해당 데코레이터를 추가하면 해당 task는 사용자가 정의한 라이브러리를 사용하여 virtualenv를 생성하고 함수를 실행하기 위한 다른 파이썬 버전을 만들 수도 있습니다.

    이 기능을 사용하면 단순하고 파이써닉한 코드를 작성할 수 있습니다.

    댓글