ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Templates
    공부/데이터 2021. 11. 8. 00:37

    Airflow에서는 변수, macro, filter를 템플릿에서 사용할 수 있습니다. 이러한 템플릿은 런타임에 task 인스턴스에 동적으로 정보를 전달하는 Airflow의 강력한 개념입니다.

    아래는 작업을 실행할 때마다 요일을 인쇄하는 예시입니다.

    BashOperator(
        task_id="print_day_of_week",
        bash_command="echo Today is {{ execution_date.format('dddd') }}",
    )

    여기서 {{ execution_date.format('dddd') }} 가 템플릿 코드이고 위 코드를 수요일에 실행하면 "Today is Wednesday" 가 출력됩니다.

    이러한 템플릿은 dag가 실행될 때, 실행날짜 별 파일을 만들어야 할 때, 유용하게 사용할 수 있습니다.

    airflow는 Jinja 템플릿 엔진을 활용하고 있습니다. 여기서는 템플릿 활용에 대해서 설명을 하고 간략한 설명은 https://brownbears.tistory.com/586의 Jinja Templating 부분에 작성되어 있습니다.

    Variables

    airflow에서 제공하는 기본 변수들은 아래 링크에서 확인할 수 있습니다.

    https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables

    흔히 사용하는 {{ execution_date }}{{ next_ds }} 는 현재 stable 버전인 2.2.1에서 deprecated 예정이므로 사용하지 않는 것을 권장합니다.

    위의 변수 중, task, ti, dag 등은 객체로 . 을 사용해 객체 내에 있는 변수나 메소드를 호출할 수 있습니다.

    task

    task는 현재 operator를 의미합니다.

    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "{{ task }}"', # echo "<Task(BashOperator): task1>
        dag=dag,
    )
    
    def task2_function(t):
        print(t) # <Task(PythonOperator): task2>
    
    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_function,
        op_args=['{{ task }}'],
        dag=dag,
    )

    즉, task1.owner{{ task.owner }} 는 동일한 의미를 가지므로 템플릿에 해당 operator의 변수 및 메소드를 사용할 수 있습니다.

    아래 링크에서 어떤 변수, 메소드를 제공하는지 설명되어 있으며 이를 템플릿으로 사용할 수 있습니다.

    task_instance = ti

    task instance를 의미합니다.

    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "{{ ti }}"', # echo "<TaskInstance: templates_test.task1 2021-11-07T11:14:01.675163+00:00 [running]>
        dag=dag,
    )
    
    def task2_function(t):
        print('task_id:', t) # task_id:  task2
    
    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_function,
        op_args=['{{ ti.task_id }}'],
        dag=dag,
    )

    위의 task와 동일하고 아래 링크에서 어떤 변수, 메소드를 제공하는지 설명되어 있으며 이를 템플릿으로 사용할 수 있습니다.

    var

    var 변수를 사용하면 airflow UI에서 정의한 변수를 접근할 수 있습니다.

    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "{{ var }}"', # echo "{\'json\': None, \'value\': None}"
        dag=dag,
    )
    
    def task2_function(t):
        print('var: ', t, type(t)) # {'json': None, 'value': None} <class 'str'>
    
    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_function,
        op_args=['{{ var }}'],
        dag=dag,
    )
    
    # Raw value
    # echo {{ var.value.<variable_name> }}
    
    # Auto-deserialize JSON value
    # echo {{ var.json.<variable_name> }}

    variable에 관한 문서는 아래에서 확인할 수 있습니다.

    UI상에서 variable 관리하는 방법

    conn (2.2.0 이상 버전만)

    위의 var 와 유사하게 airflow의 connection 데이터를 가져올 수 있습니다.

    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_function,
        op_args=['{{ conn }}'],
        dag=dag,
    )

    https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/connection/index.html

    filter(2.2.0 이상 버전만)

    airflow는 파이프라인을 사용하여 값의 포맷을 지정할 수 있습니다.

    task3 = BashOperator(
        task_id='task3',
        bash_command="echo {{ execution_date }} {{ execution_date | ts_nodash_with_tz }}",
        dag=dag,
    )

    airflow에서 제공해주는 형식은 아래에서 확인할 수 있습니다.

    클래스 및 operator에 템플릿 적용하기

    operator

    operator의 기본인 BaseOperator 클래스는 아래와 같이 3가지 템플릿을 허용합니다.

    @functools.total_ordering
    class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta):
            template_fields: Iterable[str] = ()
        # Defines which files extensions to look for in the templated fields
        template_ext: Iterable[str] = ()
        # Template field renderers indicating type of the field, for example sql, json, bash
        template_fields_renderers: Dict[str, str] = {}
    • template_fields: 템플릿으로 사용할 변수를 정의
    • template_ext: 템플릿으로 사용할 파일 확장자를 정의
    • template_fields_renderers: 템플릿 필드의 유형을 표시

    모든 operator들은 모든 템플릿을 적용하여 사용하고 있지 않습니다. 예를 들어, PythonOperator의 경우에 2가지, BashOperator는 3가지 템플릿을 허용하고 있습니다.

    class PythonOperator(BaseOperator):
            template_fields = ('templates_dict', 'op_args', 'op_kwargs')
        template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
    
    ...
    
    class BashOperator(BaseOperator):
            template_fields = ('bash_command', 'env')
        template_fields_renderers = {'bash_command': 'bash', 'env': 'json'}
        template_ext = (
            '.sh',
            '.bash',
        )

    template_fields는 템플릿화 할 수 있는 변수 목록을 갖고 있으며 airflow UI에서 찾을 수 있습니다.

    template_ext 는 런타임에 템플릿화 할 수 있는 파일 확장자 목록을 포함합니다. 즉, 파일 내에 템플릿을 런타임때 변경할 수 있도록 합니다.

    # test.sh
    echo "{{ execution_date.format('dddd') }}"
    
    # py
    task1 = BashOperator(
        task_id='task1',
        bash_command='test.sh',
        dag=dag,
    )

    bash_command에 파일명을 작성하여 .sh 파일 확장자 내에 있는 템플릿을 동적으로 변경하고 실행합니다.

    추가적으로 DAG에 옵션을 주어 파일을 찾는 경로를 제한할 수 있습니다.

    with DAG(..., template_searchpath="/tmp") as dag:
        run_this = BashOperator(task_id="run_this", bash_command="script.sh")

    airflow에서 해당 DAG가 실행되면 /tmp를 기본 경로로 설정하고 파일을 찾습니다.

    클래스

    사용자가 정의한 클래스에도 템플릿을 적용할 수 있습니다.

    def transform_data(cls):
        print(2)
        print(cls.path)
    
    class MyDataReader:
        template_fields = ['path']
    
        def __init__(self, my_path1):
            print(1111)
            self.path = my_path1
    
    t = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        op_args=[MyDataReader('/tmp/{{ ds }}/my_file')],
        dag=dag,
    )

    만약 template_fields에 선언한 변수명이 init메소드 내에 없다면 에러가 발생합니다.

    class MyDataReader:
        template_fields = ['path']
    
        def __init__(self, my_path1):
            print(1111)
            self.path1 = my_path1
    
    # AttributeError: 'MyDataReader' object has no attribute 'path'

    템플릿에서 사용자 정의 함수 및 변수 사용

    airflow에서 제공하는 변수 및 함수를 제외하고 별도의 함수 및 변수를 사용하기 위해선 DAG에 user_defined_macros 을 주어야 합니다.

    def days_to_now(starting_date):
        return (datetime.now() - starting_date).days
    
    class TestEcho:
        def echo(self):
            return 'echo'
    
    dag = DAG(
        dag_id='templates_test', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 7, 0, 0, 0),
        schedule_interval="@once", tags=['templates'],
        user_defined_macros={
            'starting_date': datetime(2021, 11, 1),
            'days_to_now': days_to_now,
            'test_echo': TestEcho()
        }
    )
    
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}, {{ test_echo.echo() }}",
        dag=dag,
    )
    
    # Days since 2021-11-01 00:00:00 is 6, echo

    함수 뿐만 아니라 클래스도 선언하여 사용할 수 있습니다.

    또한 DAG 옵션으로 user_defined_filters 를 주어 사용자가 정의한 필터를 적용할 수도 있습니다.

    dag = DAG(
        dag_id='templates_test', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 7, 0, 0, 0),
        schedule_interval="@once", tags=['templates'],
        user_defined_macros={
            'starting_date': datetime(2021, 11, 1),
            'days_to_now': days_to_now,
        },
        user_defined_filters={"days_to_now": days_to_now},
    )
    
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}",
        dag=dag,
    )
    
    # Days since 2021-11-01 00:00:00 is 6

    파이썬 객체로 필드 렌더링

    https://brownbears.tistory.com/586의 파이썬 객체로 필드 렌더링 처럼 Xcoms와 동일하게 DAG trigger로 config ({"numbers": [1,2,3]})를 전달한다면 전부 문자열로 인식하게 됩니다.

    def sum_numbers(*args):
        total = 0
        for val in args:
            total += val
    
        print(total)
        return total
    
    sumnumbers = PythonOperator(
        task_id="sumnumbers",
        python_callable=sum_numbers,
        op_args=[1,2,3],
        dag=dag
    )
    # 6
    
    sumnumbers1 = PythonOperator(
        task_id="sumnumbers1",
        python_callable=sum_numbers,
        op_args="{{ dag_run.conf['numbers'] }}",
        dag=dag
    )
    # TypeError: unsupported operand type(s) for +=: 'int' and 'str'

    2.1.0 버전부터 나온 render_template_as_native_obj DAG 옵션을 추가하면 파이썬 타입을 온전히 사용할 수 있습니다.

    댓글