-
[Airflow] Operator공부/데이터 2021. 10. 31. 19:12
operator는 DAG 내에서 task를 보다 쉽게 구현하도록 제공되는 템플릿입니다.
with DAG("my-dag") as dag: ping = SimpleHttpOperator(endpoint="http://example.com/update/") email = EmailOperator(to="admin@example.com", subject="Update complete") ping >> email
airflow 내에서 제공하는 operator 뿐만 아니라 provider에서 제공하는 operator도 존재합니다.
BashOperator
PythonOperator
EmailOperator
SimpleHttpOperator
SqliteOperator
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
S3FileTransformOperator
PrestoToMySqlOperator
SlackAPIOperator
이외에도 더 많은 operator가 존재하며 providers 에서 확인할 수 있습니다.
Jinja Templating
Airflow는 Jinja 템플릿을 활용하며 매크로 기능과 함께 사용하고 있습니다.
# The execution date as YYYY-MM-DD date = "{{ ds }}" t = BashOperator( task_id='test_env', bash_command='/tmp/test.sh ', dag=dag, env={'EXECUTION_DATE': date})
{{ ds }}
는 매크로이고BashOperator
의 env 매개변수는 Jinja로 템플릿화 되어 있기 때문에 실행 날짜는EXECUTION_DATE
라는 이름으로 bash script에서 환경 변수로 사용할 수 있습니다.문서 상에서 "templated" 으로 표시된 모든 파라미터와 같이 Jinja 템플릿을 사용할 수 있습니다. 이러한 템플릿 대체는 operator의
pre_execute
함수가 호출되기 전에 발생합니다.class MyDataReader: template_fields = ['path'] def __init__(self, my_path): self.path = my_path # [additional code here...] t = PythonOperator( task_id='transform_data', python_callable=transform_data op_args=[ MyDataReader('/tmp/{{ ds }}/my_file') ], dag=dag, )
MyDataReader 클래스에 변수를 전달하기 전에 {{ ds }} 매크로에 해당되는 날짜 값이 대체가 된 다음, 전달하게 됩니다.
/tmp/yyyy-mm-dd/my_file
형식으로 대체- MyDataReader클래스의 첫 번째 인자로 1번 결과 전달
template_fields
는 클래스, 인스턴스 변수 둘 다 사용할 수 있습니다.
또한 DAG를 생성할 때, 사용자가 custom 값을 Jinja 환경에 전달할 수 있습니다. 아래는 Jinja가 템플릿 문자열에서 줄 바꿈을 삭제하지 않는 옵션을 전달하는 예시입니다.
my_dag = DAG( dag_id='my-dag', jinja_environment_kwargs={ 'keep_trailing_newline': True, # some other jinja2 Environment options here }, )
파이썬 객체로 필드 렌더링
기본적으로 모든
template_fields
은 문자열로 렌더링 됩니다.Xcom으로
{"1001": 301.27, "1002": 433.21, "1003": 502.22}
값이 전달되었다고 가정했을 때, 아래 task가 실행되면 order_data 값으로는'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
와 같이 문자열이 전달됩니다.transform = PythonOperator( task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"}, python_callable=transform )
만약 렌더링된 템플릿 필드가 파이썬 객체 (dict, int 등)가 반환되기를 원한다면 DAG 속성에
render_template_as_native_obj=True
와 같이 선언해야 합니다.dag = DAG( dag_id="example_template_as_python_object", schedule_interval=None, start_date=days_ago(2), render_template_as_native_obj=True, ) def extract(): data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' return json.loads(data_string) def transform(order_data): print(type(order_data)) for value in order_data.values(): total_order_value += value return {"total_order_value": total_order_value} extract_task = PythonOperator( task_id="extract", python_callable=extract ) transform_task = PythonOperator( task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"}, python_callable=transform ) extract_task >> transform_task