-
[Airflow] Templates공부/데이터 2021. 11. 8. 00:37
Airflow에서는 변수, macro, filter를 템플릿에서 사용할 수 있습니다. 이러한 템플릿은 런타임에 task 인스턴스에 동적으로 정보를 전달하는 Airflow의 강력한 개념입니다.
아래는 작업을 실행할 때마다 요일을 인쇄하는 예시입니다.
BashOperator( task_id="print_day_of_week", bash_command="echo Today is {{ execution_date.format('dddd') }}", )
여기서
{{ execution_date.format('dddd') }}
가 템플릿 코드이고 위 코드를 수요일에 실행하면"Today is Wednesday"
가 출력됩니다.이러한 템플릿은 dag가 실행될 때, 실행날짜 별 파일을 만들어야 할 때, 유용하게 사용할 수 있습니다.
airflow는 Jinja 템플릿 엔진을 활용하고 있습니다. 여기서는 템플릿 활용에 대해서 설명을 하고 간략한 설명은 https://brownbears.tistory.com/586의 Jinja Templating 부분에 작성되어 있습니다.
Variables
airflow에서 제공하는 기본 변수들은 아래 링크에서 확인할 수 있습니다.
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables
흔히 사용하는
{{ execution_date }}
나{{ next_ds }}
는 현재 stable 버전인 2.2.1에서 deprecated 예정이므로 사용하지 않는 것을 권장합니다.위의 변수 중, task, ti, dag 등은 객체로 . 을 사용해 객체 내에 있는 변수나 메소드를 호출할 수 있습니다.
task
task는 현재 operator를 의미합니다.
task1 = BashOperator( task_id='task1', bash_command='echo "{{ task }}"', # echo "<Task(BashOperator): task1> dag=dag, ) def task2_function(t): print(t) # <Task(PythonOperator): task2> task2 = PythonOperator( task_id='task2', python_callable=task2_function, op_args=['{{ task }}'], dag=dag, )
즉,
task1.owner
와{{ task.owner }}
는 동일한 의미를 가지므로 템플릿에 해당 operator의 변수 및 메소드를 사용할 수 있습니다.아래 링크에서 어떤 변수, 메소드를 제공하는지 설명되어 있으며 이를 템플릿으로 사용할 수 있습니다.
task_instance = ti
task instance를 의미합니다.
task1 = BashOperator( task_id='task1', bash_command='echo "{{ ti }}"', # echo "<TaskInstance: templates_test.task1 2021-11-07T11:14:01.675163+00:00 [running]> dag=dag, ) def task2_function(t): print('task_id:', t) # task_id: task2 task2 = PythonOperator( task_id='task2', python_callable=task2_function, op_args=['{{ ti.task_id }}'], dag=dag, )
위의 task와 동일하고 아래 링크에서 어떤 변수, 메소드를 제공하는지 설명되어 있으며 이를 템플릿으로 사용할 수 있습니다.
var
var 변수를 사용하면 airflow UI에서 정의한 변수를 접근할 수 있습니다.
task1 = BashOperator( task_id='task1', bash_command='echo "{{ var }}"', # echo "{\'json\': None, \'value\': None}" dag=dag, ) def task2_function(t): print('var: ', t, type(t)) # {'json': None, 'value': None} <class 'str'> task2 = PythonOperator( task_id='task2', python_callable=task2_function, op_args=['{{ var }}'], dag=dag, ) # Raw value # echo {{ var.value.<variable_name> }} # Auto-deserialize JSON value # echo {{ var.json.<variable_name> }}
variable에 관한 문서는 아래에서 확인할 수 있습니다.
UI상에서 variable 관리하는 방법
conn (2.2.0 이상 버전만)
위의
var
와 유사하게 airflow의 connection 데이터를 가져올 수 있습니다.task2 = PythonOperator( task_id='task2', python_callable=task2_function, op_args=['{{ conn }}'], dag=dag, )
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/connection/index.html
filter(2.2.0 이상 버전만)
airflow는 파이프라인을 사용하여 값의 포맷을 지정할 수 있습니다.
task3 = BashOperator( task_id='task3', bash_command="echo {{ execution_date }} {{ execution_date | ts_nodash_with_tz }}", dag=dag, )
airflow에서 제공해주는 형식은 아래에서 확인할 수 있습니다.
클래스 및 operator에 템플릿 적용하기
operator
operator의 기본인 BaseOperator 클래스는 아래와 같이 3가지 템플릿을 허용합니다.
@functools.total_ordering class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta): template_fields: Iterable[str] = () # Defines which files extensions to look for in the templated fields template_ext: Iterable[str] = () # Template field renderers indicating type of the field, for example sql, json, bash template_fields_renderers: Dict[str, str] = {}
- template_fields: 템플릿으로 사용할 변수를 정의
- template_ext: 템플릿으로 사용할 파일 확장자를 정의
- template_fields_renderers: 템플릿 필드의 유형을 표시
모든 operator들은 모든 템플릿을 적용하여 사용하고 있지 않습니다. 예를 들어, PythonOperator의 경우에 2가지, BashOperator는 3가지 템플릿을 허용하고 있습니다.
class PythonOperator(BaseOperator): template_fields = ('templates_dict', 'op_args', 'op_kwargs') template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} ... class BashOperator(BaseOperator): template_fields = ('bash_command', 'env') template_fields_renderers = {'bash_command': 'bash', 'env': 'json'} template_ext = ( '.sh', '.bash', )
template_fields
는 템플릿화 할 수 있는 변수 목록을 갖고 있으며 airflow UI에서 찾을 수 있습니다.template_ext
는 런타임에 템플릿화 할 수 있는 파일 확장자 목록을 포함합니다. 즉, 파일 내에 템플릿을 런타임때 변경할 수 있도록 합니다.# test.sh echo "{{ execution_date.format('dddd') }}" # py task1 = BashOperator( task_id='task1', bash_command='test.sh', dag=dag, )
bash_command에 파일명을 작성하여 .sh 파일 확장자 내에 있는 템플릿을 동적으로 변경하고 실행합니다.
추가적으로 DAG에 옵션을 주어 파일을 찾는 경로를 제한할 수 있습니다.
with DAG(..., template_searchpath="/tmp") as dag: run_this = BashOperator(task_id="run_this", bash_command="script.sh")
airflow에서 해당 DAG가 실행되면 /tmp를 기본 경로로 설정하고 파일을 찾습니다.
클래스
사용자가 정의한 클래스에도 템플릿을 적용할 수 있습니다.
def transform_data(cls): print(2) print(cls.path) class MyDataReader: template_fields = ['path'] def __init__(self, my_path1): print(1111) self.path = my_path1 t = PythonOperator( task_id='transform_data', python_callable=transform_data, op_args=[MyDataReader('/tmp/{{ ds }}/my_file')], dag=dag, )
만약 template_fields에 선언한 변수명이 init메소드 내에 없다면 에러가 발생합니다.
class MyDataReader: template_fields = ['path'] def __init__(self, my_path1): print(1111) self.path1 = my_path1 # AttributeError: 'MyDataReader' object has no attribute 'path'
템플릿에서 사용자 정의 함수 및 변수 사용
airflow에서 제공하는 변수 및 함수를 제외하고 별도의 함수 및 변수를 사용하기 위해선 DAG에
user_defined_macros
을 주어야 합니다.def days_to_now(starting_date): return (datetime.now() - starting_date).days class TestEcho: def echo(self): return 'echo' dag = DAG( dag_id='templates_test', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 7, 0, 0, 0), schedule_interval="@once", tags=['templates'], user_defined_macros={ 'starting_date': datetime(2021, 11, 1), 'days_to_now': days_to_now, 'test_echo': TestEcho() } ) task1 = BashOperator( task_id='task1', bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}, {{ test_echo.echo() }}", dag=dag, ) # Days since 2021-11-01 00:00:00 is 6, echo
함수 뿐만 아니라 클래스도 선언하여 사용할 수 있습니다.
또한 DAG 옵션으로
user_defined_filters
를 주어 사용자가 정의한 필터를 적용할 수도 있습니다.dag = DAG( dag_id='templates_test', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 7, 0, 0, 0), schedule_interval="@once", tags=['templates'], user_defined_macros={ 'starting_date': datetime(2021, 11, 1), 'days_to_now': days_to_now, }, user_defined_filters={"days_to_now": days_to_now}, ) task1 = BashOperator( task_id='task1', bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}", dag=dag, ) # Days since 2021-11-01 00:00:00 is 6
파이썬 객체로 필드 렌더링
https://brownbears.tistory.com/586의
파이썬 객체로 필드 렌더링
처럼 Xcoms와 동일하게 DAG trigger로 config ({"numbers": [1,2,3]}
)를 전달한다면 전부 문자열로 인식하게 됩니다.def sum_numbers(*args): total = 0 for val in args: total += val print(total) return total sumnumbers = PythonOperator( task_id="sumnumbers", python_callable=sum_numbers, op_args=[1,2,3], dag=dag ) # 6 sumnumbers1 = PythonOperator( task_id="sumnumbers1", python_callable=sum_numbers, op_args="{{ dag_run.conf['numbers'] }}", dag=dag ) # TypeError: unsupported operand type(s) for +=: 'int' and 'str'
2.1.0 버전부터 나온
render_template_as_native_obj
DAG 옵션을 추가하면 파이썬 타입을 온전히 사용할 수 있습니다.