ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Operator
    공부/데이터 2021. 10. 31. 19:12

    operator는 DAG 내에서 task를 보다 쉽게 구현하도록 제공되는 템플릿입니다.

    with DAG("my-dag") as dag:
        ping = SimpleHttpOperator(endpoint="http://example.com/update/")
        email = EmailOperator(to="admin@example.com", subject="Update complete")
    
        ping >> email

    airflow 내에서 제공하는 operator 뿐만 아니라 provider에서 제공하는 operator도 존재합니다.

    • BashOperator
    • PythonOperator
    • EmailOperator
    • SimpleHttpOperator
    • SqliteOperator
    • MySqlOperator
    • PostgresOperator
    • MsSqlOperator
    • OracleOperator
    • JdbcOperator
    • DockerOperator
    • HiveOperator
    • S3FileTransformOperator
    • PrestoToMySqlOperator
    • SlackAPIOperator

    이외에도 더 많은 operator가 존재하며 providers 에서 확인할 수 있습니다.

    Jinja Templating

    Airflow는 Jinja 템플릿을 활용하며 매크로 기능과 함께 사용하고 있습니다.

    # The execution date as YYYY-MM-DD
    date = "{{ ds }}"
    t = BashOperator(
        task_id='test_env',
        bash_command='/tmp/test.sh ',
        dag=dag,
        env={'EXECUTION_DATE': date})

    {{ ds }} 는 매크로이고 BashOperator의 env 매개변수는 Jinja로 템플릿화 되어 있기 때문에 실행 날짜는 EXECUTION_DATE 라는 이름으로 bash script에서 환경 변수로 사용할 수 있습니다.

    문서 상에서 "templated" 으로 표시된 모든 파라미터와 같이 Jinja 템플릿을 사용할 수 있습니다. 이러한 템플릿 대체는 operator의 pre_execute 함수가 호출되기 전에 발생합니다.

    class MyDataReader:
        template_fields = ['path']
    
        def __init__(self, my_path):
            self.path = my_path
    
        # [additional code here...]
    
    t = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
        op_args=[
            MyDataReader('/tmp/{{ ds }}/my_file')
        ],
        dag=dag,
    )

    MyDataReader 클래스에 변수를 전달하기 전에 {{ ds }} 매크로에 해당되는 날짜 값이 대체가 된 다음, 전달하게 됩니다.

    1. /tmp/yyyy-mm-dd/my_file 형식으로 대체
    2. MyDataReader클래스의 첫 번째 인자로 1번 결과 전달
    3. template_fields 는 클래스, 인스턴스 변수 둘 다 사용할 수 있습니다.

    또한 DAG를 생성할 때, 사용자가 custom 값을 Jinja 환경에 전달할 수 있습니다. 아래는 Jinja가 템플릿 문자열에서 줄 바꿈을 삭제하지 않는 옵션을 전달하는 예시입니다.

    my_dag = DAG(
        dag_id='my-dag',
        jinja_environment_kwargs={
            'keep_trailing_newline': True,
             # some other jinja2 Environment options here
        },
    )

    파이썬 객체로 필드 렌더링

    기본적으로 모든 template_fields 은 문자열로 렌더링 됩니다.

    Xcom으로 {"1001": 301.27, "1002": 433.21, "1003": 502.22} 값이 전달되었다고 가정했을 때, 아래 task가 실행되면 order_data 값으로는 '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' 와 같이 문자열이 전달됩니다.

    transform = PythonOperator(
        task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
        python_callable=transform
    )

    만약 렌더링된 템플릿 필드가 파이썬 객체 (dict, int 등)가 반환되기를 원한다면 DAG 속성에 render_template_as_native_obj=True 와 같이 선언해야 합니다.

    dag = DAG(
        dag_id="example_template_as_python_object",
        schedule_interval=None,
        start_date=days_ago(2),
        render_template_as_native_obj=True,
    )
    
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        return json.loads(data_string)
    
    def transform(order_data):
        print(type(order_data))
        for value in order_data.values():
            total_order_value += value
        return {"total_order_value": total_order_value}
    
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract
    )
    
    transform_task = PythonOperator(
        task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
        python_callable=transform
    )
    
    extract_task >> transform_task

    댓글