ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Test
    공부/데이터 2021. 11. 13. 18:03

    스크립트 실행

    만든 스크립트에 문법적 오류가 없는지 확인을 하기 위해 스크립트를 실행해 봅니다. 만약 예외가 발생하지 않았다면 어느 정도 문법적인 오류가 없다라고 볼 수 있지만 스크립트 특성 상, 호출되지 않은 함수 내의 문법적 오류는 존재할 수 있습니다.

    명령어를 통한 메타데이터 유효성 검사

    아래 명령어를 통해 DAG 및 task가 의도한대로 등록되었는지, 구조가 잘 잡혔는지 확인할 수 있습니다.

    # 등록된 dag 목록 출력
    $ airflow dags list
    
    dag_id                | filepath                 | owner     | paused
    ======================+==========================+===========+=======
    dag_decorator_test    | temp.py                  | airflow   | True  
    example_dag_decorator | example_dag_decorator.py | airflow   | True  
    jinja_test            | jinja_test.py            | brownbear | True  
    macro_test            | macro_test.py            | brownbear | True  
    templates_test        | template_test.py         | brownbear | False
    
    # DAG 내의 task 목록 출력
    # airflow tasks list [dag_id]
    $ airflow tasks list templates_test
    
    sumnumbers
    sumnumbers1
    task4
    # DAG 내의 task 목록을 tree 형식으로 출력
    # airflow tasks list [dag_id] --tree
    $ airflow tasks list templates_test --tree
    
    <Task(PythonOperator): sumnumbers>
        <Task(PythonOperator): sumnumbers1>
    <Task(DummyOperator): task4>

    명령어를 통한 dag와 task 테스트

    airflow에서 테스트는 임시 DB나 데이터로 테스트를 하는 것이 아닌 Task 단위로 실제 인스턴스를 호출하는 테스트를 뜻합니다. Task 단위 테스트를 진행할 때 execution_date 를 주어야 하는데 현재 날짜보다 이전의 시간을 주면 바로 동작하지만 미래의 날짜를 입력하면 실행되지 않습니다.

    # 입력된 dag의 task를 execution_date에 실행
    # airflow tasks test [dag_id] [task_id] [execution_date]
    $ airflow tasks test templates_test sumnumbers 2021-06-01

    이러한 airflow tasks test 명령어는 단일 task 인스턴스를 작업할 수 있으며 task 인스턴스를 로컬에서 실행하고 로그를 표준출력으로 보여줍니다. 또한 task의 종속성을 고려하지 않고 실패, 성공과 같은 상태값을 DB에 저장하지 않습니다.

    만약 Task 단위가 아닌 DAG를 테스트 하고자 하면 아래와 같이 실행합니다.

    # 입력된 dag를 execution_date에 실행
    # airflow dags test [dag_id] [execution_date]
    $ airflow tasks test templates_test 2021-06-01

    DAG 단위로 테스트를 하면 task의 종속성을 고려하지만 task 단위의 test와 동일하게 상태값을 DB에 저장하지 않습니다.

    test 모듈을 사용한 test

    pytest

    테스트 관련된 라이브러리는 무엇을 사용하던지 상관은 없지만 여기서는 pytest를 사용하여 DAG를 테스트 하는 것을 설명합니다.

    pytest를 설치한 다음, 아래와 같이 작성한 후 실행하면 통과하는 것을 볼 수 있습니다.

    $ pip install pytest
    from airflow.models import DagBag
    
    def test_no_import_errors():
      dag_bag = DagBag()
      assert len(dag_bag.import_errors) == 0, "No Import Failures"

    모든 DAG 파일 내에 import 에러가 있는지 확인하는 예제입니다.

    # $ pytest [test 파일 또는 폴더]
    # 또는
    # $ python3 -m pytest [test 파일 또는 폴더]
    
    $ python3 -m pytest dags/test

    또 다른 예제로 아래와 같이 팀에서 모든 DAG는 각 task에 대해 2번 재시도를 해야된다는 규칙이 있는 경우, 해당 규칙을 적용하기 위해 다음과 같은 테스트를 작성할 수 있습니다.

    def test_retries_present():
      dag_bag = DagBag()
      for dag in dag_bag.dags:
          retries = dag_bag.dags[dag].default_args.get('retries', [])
          error_msg = 'Retries not set to 2 for DAG {id}'.format(id=dag)
          assert retries == 2, error_msg

    만약 2번 재시도라는 옵션이 설정되어 있지 않은 DAG가 있다면 아래와 같이 오류가 발생합니다.

    이러한 테스트와 git action을 결합하여 CI/CD workflow를 구축할 수 있습니다.

    unittest

    단위 테스트는 소스 코드를 개별적으로 테스트하여 의도한대로 작동하는지 확인하는 테스트 방법입니다.

    airflow에서는 DAG의 모든 부분에 대한 단위 테스트를 진행할 수 있지만 custom hook나 custom operator를 검증하는데 자주 사용됩니다.

    아래는 숫자가 짝수인지 확인하는 custom operator를 검증하는 예제입니다.

    # 짝수 검증 operator
    from airflow.models import BaseOperator
    
    class EvenNumberCheckOperator(BaseOperator):
        def __init__(self, my_operator_param, *args, **kwargs):
            self.operator_param = my_operator_param
            super().__init__(*args, **kwargs)
    
        def execute(self, context):
            if not self.operator_param % 2:
                return True
            return False
    import unittest
    from datetime import datetime
    
    from airflow import DAG
    from airflow.models import TaskInstance
    
    from custom_operator import EvenNumberCheckOperator
    
    class TestEvenNumberCheckOperator(unittest.TestCase):
        def setUp(self):
            super().setUp()
            self.dag = DAG('test_dag', default_args={'owner': 'airflow', 'start_date': datetime(2021, 1, 1)})
    
        def test_even(self):
            """짝수 테스트"""
            task = EvenNumberCheckOperator(my_operator_param=10, task_id='even', dag=self.dag)
            ti = TaskInstance(task=task, execution_date=datetime.now())
            result = task.execute(ti.get_template_context())
            assert result is True
    
        def test_odd(self):
            """홀수 테스트"""
            task = EvenNumberCheckOperator(my_operator_param=12, task_id='odd', dag=self.dag)
            ti = TaskInstance(task=task, execution_date=datetime.now())
            result = task.execute(ti.get_template_context())
            assert result is False
    
    if __name__ == '__main__':
        unittest.main()

    명령어로 위의 파일을 실행한다면 test_odd() 테스트에 짝수를 넣어 아래와 같이 에러가 발생합니다.

    # python3 [test 파일]

    custom hook이나 custom operator 뿐만 아니라 PythonOperator가 실행하는 함수가 있다면 해당 함수에도 단위 테스트 코드를 작성하는 것이 좋습니다.

    만약 위에서 설명한 pytest의 함수를 unittest로 작성한다면 다음과 같이 작성할 수 있습니다.

    from airflow.models import DagBag
    import unittest
    
    class TestDAGs(unittest.TestCase):
        def setUp(self):
            self.dagbag = DagBag()
    
        def test_no_import_errors(self):
            dag_bag = self.dagbag
            assert len(dag_bag.import_errors) == 0, "No Import Failures"
    
        def test_retries_present(self):
            for dag in self.dagbag.dags:
                retries = self.dagbag.dags[dag].default_args.get('retries', [])
                error_msg = 'Retries not set to 2 for DAG {id}'.format(id=dag)
                assert retries == 2, error_msg
    
    if __name__ == '__main__':
        unittest.main()

    데이터 무결성 테스트

    데이터 무결성 테스트는 데이터 문제가 파이프라인을 고장내거나 다운스트림 시스템이 동작하지 않도록 테스트하는 것입니다. 이러한 테스트는 DAG의 task가 주어진 데이터를 처리할 때 예상되는 출력을 생성하는지 확인하는데 사용할 수 있습니다. 데이터는 정적이지 않기 때문에 앞에서 설명한 코드 테스트와는 성격이 다릅니다.

    이러한 데이터 무결성 테스트를 구현하는데 간단한 방법은 DAG에 task로 직접 작성하는 것입니다. airflow의 종속성을 사용하여 잘못된 데이터를 관리할 수 있습니다.

    댓글