ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] 데이터베이스 로그 주기적으로 자동 삭제하기
    언어/파이썬 & 장고 2024. 3. 1. 18:16

    시간이 지남에 따라 환경의 Airflow 데이터베이스에는 더 많은 데이터가 저장됩니다. 이 데이터에는 과거 DAG 실행, 태스크, 기타 Airflow 작업과 관련된 정보 및 로그가 포함됩니다.

    Dag를 사용하여 주기적인 DB 유지보수

    다음 유지보수 DAG를 사용하여 데이터베이스 콘텐츠 속도를 빠르게 할 수 있습니다.(프루닝) 데이터베이스 크기를 작게 유지하려면 유지보수 DAG를 주기적으로 실행해야 합니다. 구글의 composer 문서에서는 아래에서 소개할 DAG를 매일 실행하는 것이 좋다고 권장하고 있습니다. 데이터베이스의 크기를 작게 유지하는 것은 일정 기간이상 쌓여있는 로그를 지워야 합니다. 여기선 보관 기간(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS)을 설정하고 있습니다. 구글의 composer 문서에서는 30일이 초과한 로그는 지우도록 권장합니다.

    아래의 DAG는 airflow 데이터베이스의 job, dag_run, task_instance, log, xcom, sla_miss, dags, task_reschedule, task_fail, import_error 테이블에서 30일이 초과한 로그를 삭제합니다. 아래는 예시이므로 추가가 필요한 테이블이나 유지해야하는 테이블 목록은 코드에서 지우면 됩니다. 일반적으로 공간 절감 효과는 대부분 log, task_instance, dag_run, xcom 테이블의 데이터를 삭제하면 얻을 수 있습니다.

    코드

    """
    A maintenance workflow that you can deploy into Airflow to periodically clean
    out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
    having too much data in your Airflow MetaStore.
    
    ## Authors
    
    The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup)
    
    ## Usage
    
    1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME,
      ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values
    
    2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each
       dictionary in the list features the following parameters:
        - airflow_db_model: Model imported from airflow.models corresponding to
          a table in the airflow metadata database
        - age_check_column: Column in the model/table to use for calculating max
          date of data deletion
        - keep_last: Boolean to specify whether to preserve last run instance
            - keep_last_filters: List of filters to preserve data from deleting
              during clean-up, such as DAG runs where the external trigger is set to 0.
            - keep_last_group_by: Option to specify column by which to group the
              database entries and perform aggregate functions.
    
    3. Create and Set the following Variables in the Airflow Web Server
      (Admin -> Variables)
        - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain
          the log files if not already provided in the conf. If this is set to 30,
          the job will remove those files that are 30 days old or older.
    
    4. Put the DAG in your gcs bucket.
    """
    from datetime import timedelta
    import logging
    import os
    
    import airflow
    from airflow import settings
    from airflow.models import (
        DAG,
        DagModel,
        DagRun,
        Log,
        SlaMiss,
        TaskInstance,
        Variable,
        XCom,
    )
    from airflow.operators.python import PythonOperator
    from airflow.utils import timezone
    from airflow.version import version as airflow_version
    
    import dateutil.parser
    from sqlalchemy import and_, func, text
    from sqlalchemy.exc import ProgrammingError
    from sqlalchemy.orm import load_only
    
    now = timezone.utcnow
    
    # airflow-db-cleanup
    DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
    START_DATE = airflow.utils.dates.days_ago(1)
    # How often to Run. @daily - Once a day at Midnight (UTC)
    SCHEDULE_INTERVAL = "@daily"
    # Who is listed as the owner of this DAG in the Airflow Web Server
    DAG_OWNER_NAME = "operations"
    # List of email address to send email alerts to if this job fails
    ALERT_EMAIL_ADDRESSES = []
    # Airflow version used by the environment in list form, value stored in
    # airflow_version is in format e.g "2.3.4+composer"
    AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".")
    # Length to retain the log files if not already provided in the conf. If this
    # is set to 30, the job will remove those files that arE 30 days old or older.
    DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
        Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30)
    )
    # Prints the database entries which will be getting deleted; set to False
    # to avoid printing large lists and slowdown process
    PRINT_DELETES = False
    # Whether the job should delete the db entries or not. Included if you want to
    # temporarily avoid deleting the db entries.
    ENABLE_DELETE = True
    # List of all the objects that will be deleted. Comment out the DB objects you
    # want to skip.
    DATABASE_OBJECTS = [
        {
            "airflow_db_model": DagRun,
            "age_check_column": DagRun.execution_date,
            "keep_last": True,
            "keep_last_filters": [DagRun.external_trigger.is_(False)],
            "keep_last_group_by": DagRun.dag_id,
        },
        {
            "airflow_db_model": TaskInstance,
            "age_check_column": TaskInstance.start_date
            if AIRFLOW_VERSION < ["2", "2", "0"]
            else TaskInstance.start_date,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None,
        },
        {
            "airflow_db_model": Log,
            "age_check_column": Log.dttm,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None,
        },
        {
            "airflow_db_model": XCom,
            "age_check_column": XCom.execution_date
            if AIRFLOW_VERSION < ["2", "2", "5"]
            else XCom.timestamp,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None,
        },
        {
            "airflow_db_model": SlaMiss,
            "age_check_column": SlaMiss.execution_date,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None,
        },
        {
            "airflow_db_model": DagModel,
            "age_check_column": DagModel.last_parsed_time,
            "keep_last": False,
            "keep_last_filters": None,
            "keep_last_group_by": None,
        },
    ]
    
    # Check for TaskReschedule model
    try:
        from airflow.models import TaskReschedule
    
        DATABASE_OBJECTS.append(
            {
                "airflow_db_model": TaskReschedule,
                "age_check_column": TaskReschedule.execution_date
                if AIRFLOW_VERSION < ["2", "2", "0"]
                else TaskReschedule.start_date,
                "keep_last": False,
                "keep_last_filters": None,
                "keep_last_group_by": None,
            }
        )
    
    except Exception as e:
        logging.error(e)
    
    # Check for TaskFail model
    try:
        from airflow.models import TaskFail
    
        DATABASE_OBJECTS.append(
            {
                "airflow_db_model": TaskFail,
                "age_check_column": TaskFail.start_date,
                "keep_last": False,
                "keep_last_filters": None,
                "keep_last_group_by": None,
            }
        )
    
    except Exception as e:
        logging.error(e)
    
    # Check for RenderedTaskInstanceFields model
    if AIRFLOW_VERSION < ["2", "4", "0"]:
        try:
            from airflow.models import RenderedTaskInstanceFields
    
            DATABASE_OBJECTS.append(
                {
                    "airflow_db_model": RenderedTaskInstanceFields,
                    "age_check_column": RenderedTaskInstanceFields.execution_date,
                    "keep_last": False,
                    "keep_last_filters": None,
                    "keep_last_group_by": None,
                }
            )
    
        except Exception as e:
            logging.error(e)
    
    # Check for ImportError model
    try:
        from airflow.models import ImportError
    
        DATABASE_OBJECTS.append(
            {
                "airflow_db_model": ImportError,
                "age_check_column": ImportError.timestamp,
                "keep_last": False,
                "keep_last_filters": None,
                "keep_last_group_by": None,
                "do_not_delete_by_dag_id": True,
            }
        )
    
    except Exception as e:
        logging.error(e)
    
    if AIRFLOW_VERSION < ["2", "6", "0"]:
        try:
            from airflow.jobs.base_job import BaseJob
    
            DATABASE_OBJECTS.append(
                {
                    "airflow_db_model": BaseJob,
                    "age_check_column": BaseJob.latest_heartbeat,
                    "keep_last": False,
                    "keep_last_filters": None,
                    "keep_last_group_by": None,
                }
            )
        except Exception as e:
            logging.error(e)
    else:
        try:
            from airflow.jobs.job import Job
    
            DATABASE_OBJECTS.append(
                {
                    "airflow_db_model": Job,
                    "age_check_column": Job.latest_heartbeat,
                    "keep_last": False,
                    "keep_last_filters": None,
                    "keep_last_group_by": None,
                }
            )
        except Exception as e:
            logging.error(e)
    
    default_args = {
        "owner": DAG_OWNER_NAME,
        "depends_on_past": False,
        "email": ALERT_EMAIL_ADDRESSES,
        "email_on_failure": True,
        "email_on_retry": False,
        "start_date": START_DATE,
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    }
    
    dag = DAG(
        DAG_ID,
        default_args=default_args,
        schedule_interval=SCHEDULE_INTERVAL,
        start_date=START_DATE,
    )
    if hasattr(dag, "doc_md"):
        dag.doc_md = __doc__
    if hasattr(dag, "catchup"):
        dag.catchup = False
    
    def print_configuration_function(**context):
        logging.info("Loading Configurations...")
        dag_run_conf = context.get("dag_run").conf
        logging.info("dag_run.conf: " + str(dag_run_conf))
        max_db_entry_age_in_days = None
        if dag_run_conf:
            max_db_entry_age_in_days = dag_run_conf.get("maxDBEntryAgeInDays", None)
        logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
        if max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1:
            logging.info(
                "maxDBEntryAgeInDays conf variable isn't included or Variable "
                + "value is less than 1. Using Default '"
                + str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS)
                + "'"
            )
            max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS
        max_date = now() + timedelta(-max_db_entry_age_in_days)
        logging.info("Finished Loading Configurations")
        logging.info("")
    
        logging.info("Configurations:")
        logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days))
        logging.info("max_date:                 " + str(max_date))
        logging.info("enable_delete:            " + str(ENABLE_DELETE))
        logging.info("")
    
        logging.info("Setting max_execution_date to XCom for Downstream Processes")
        context["ti"].xcom_push(key="max_date", value=max_date.isoformat())
    
    print_configuration = PythonOperator(
        task_id="print_configuration",
        python_callable=print_configuration_function,
        provide_context=True,
        dag=dag,
    )
    
    def build_query(
        session,
        airflow_db_model,
        age_check_column,
        max_date,
        keep_last,
        keep_last_filters=None,
        keep_last_group_by=None,
    ):
        query = session.query(airflow_db_model).options(load_only(age_check_column))
    
        logging.info("INITIAL QUERY : " + str(query))
    
        if not keep_last:
            query = query.filter(
                age_check_column <= max_date,
            )
        else:
            subquery = session.query(func.max(DagRun.execution_date))
            # workaround for MySQL "table specified twice" issue
            # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
            if keep_last_filters is not None:
                for entry in keep_last_filters:
                    subquery = subquery.filter(entry)
    
                logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))
    
            if keep_last_group_by is not None:
                subquery = subquery.group_by(keep_last_group_by)
                logging.info("SUB QUERY [keep_last_group_by]: " + str(subquery))
    
            subquery = subquery.from_self()
    
            query = query.filter(
                and_(age_check_column.notin_(subquery)), and_(age_check_column <= max_date)
            )
    
        return query
    
    def print_query(query, airflow_db_model, age_check_column):
        entries_to_delete = query.all()
    
        logging.info("Query: " + str(query))
        logging.info(
            "Process will be Deleting the following "
            + str(airflow_db_model.__name__)
            + "(s):"
        )
        for entry in entries_to_delete:
            date = str(entry.__dict__[str(age_check_column).split(".")[1]])
            logging.info("\tEntry: " + str(entry) + ", Date: " + date)
    
        logging.info(
            "Process will be Deleting "
            + str(len(entries_to_delete))
            + " "
            + str(airflow_db_model.__name__)
            + "(s)"
        )
    
    def cleanup_function(**context):
        session = settings.Session()
    
        logging.info("Retrieving max_execution_date from XCom")
        max_date = context["ti"].xcom_pull(
            task_ids=print_configuration.task_id, key="max_date"
        )
        max_date = dateutil.parser.parse(max_date)  # stored as iso8601 str in xcom
    
        airflow_db_model = context["params"].get("airflow_db_model")
        state = context["params"].get("state")
        age_check_column = context["params"].get("age_check_column")
        keep_last = context["params"].get("keep_last")
        keep_last_filters = context["params"].get("keep_last_filters")
        keep_last_group_by = context["params"].get("keep_last_group_by")
    
        logging.info("Configurations:")
        logging.info("max_date:                 " + str(max_date))
        logging.info("enable_delete:            " + str(ENABLE_DELETE))
        logging.info("session:                  " + str(session))
        logging.info("airflow_db_model:         " + str(airflow_db_model))
        logging.info("state:                    " + str(state))
        logging.info("age_check_column:         " + str(age_check_column))
        logging.info("keep_last:                " + str(keep_last))
        logging.info("keep_last_filters:        " + str(keep_last_filters))
        logging.info("keep_last_group_by:       " + str(keep_last_group_by))
    
        logging.info("")
    
        logging.info("Running Cleanup Process...")
    
        try:
            if context["params"].get("do_not_delete_by_dag_id"):
                query = build_query(
                    session,
                    airflow_db_model,
                    age_check_column,
                    max_date,
                    keep_last,
                    keep_last_filters,
                    keep_last_group_by,
                )
                if PRINT_DELETES:
                    print_query(query, airflow_db_model, age_check_column)
                if ENABLE_DELETE:
                    logging.info("Performing Delete...")
                    query.delete(synchronize_session=False)
                session.commit()
            else:
                dags = session.query(airflow_db_model.dag_id).distinct()
                session.commit()
    
                list_dags = [str(list(dag)[0]) for dag in dags] + [None]
                for dag in list_dags:
                    query = build_query(
                        session,
                        airflow_db_model,
                        age_check_column,
                        max_date,
                        keep_last,
                        keep_last_filters,
                        keep_last_group_by,
                    )
                    query = query.filter(airflow_db_model.dag_id == dag)
                    if PRINT_DELETES:
                        print_query(query, airflow_db_model, age_check_column)
                    if ENABLE_DELETE:
                        logging.info("Performing Delete...")
                        query.delete(synchronize_session=False)
                    session.commit()
    
            if not ENABLE_DELETE:
                logging.warn(
                    "You've opted to skip deleting the db entries. "
                    "Set ENABLE_DELETE to True to delete entries!!!"
                )
    
            logging.info("Finished Running Cleanup Process")
    
        except ProgrammingError as e:
            logging.error(e)
            logging.error(
                str(airflow_db_model) + " is not present in the metadata. " "Skipping..."
            )
    
        finally:
            session.close()
    
    def cleanup_sessions():
        session = settings.Session()
    
        try:
            logging.info("Deleting sessions...")
            count_statement = "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);"
            before = session.execute(text(count_statement)).one_or_none()["cnt"]
            session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);"))
            after = session.execute(text(count_statement)).one_or_none()["cnt"]
            logging.info("Deleted %s expired sessions.", (before - after))
        except Exception as err:
            logging.exception(err)
    
        session.commit()
        session.close()
    
    def analyze_db():
        session = settings.Session()
        session.execute("ANALYZE")
        session.commit()
        session.close()
    
    analyze_op = PythonOperator(
        task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag
    )
    
    cleanup_session_op = PythonOperator(
        task_id="cleanup_sessions",
        python_callable=cleanup_sessions,
        provide_context=True,
        dag=dag,
    )
    
    cleanup_session_op.set_downstream(analyze_op)
    
    for db_object in DATABASE_OBJECTS:
        cleanup_op = PythonOperator(
            task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
            python_callable=cleanup_function,
            params=db_object,
            provide_context=True,
            dag=dag,
        )
    
        print_configuration.set_downstream(cleanup_op)
        cleanup_op.set_downstream(analyze_op)

    DATABASE_OBJECTS 설명

    • airflow_db_model: airflow 메타데이터 데이터베이스 내에 있는 테이블에 해당하는 airflow.models의 클래스를 선언
    • age_check_column: 데이터 삭제 기준 날짜 설정
      • 해당 값을 기준으로 설정된 일자를 계산하여 로그를 삭제
    • keep_last: 마지막에 실행된 인스턴스를 보존할지 여부를 지정하는 필드
      • 로그를 삭제했는데 해당 dag에 로그가 전부 지워졌을 시, 가장 최근 실행된 로그를 남김
    • keep_last_filters: 데이터 삭제를 방지하기 위한 필터들의 목록을 나타냄
      • 데이터 정리 프로세스를 실행할 때 여기서 설정된 필터들은 삭제 대상이 아닌 데이터를 식별하는 데 사용 (예: 외부 트리거가 0으로 설정된 DAG 실행 데이터를 보존하고자 할 때 해당 필터를 사용)
    • keep_last_group_by: 데이터베이스 항목들을 그룹화하고 집계 함수를 적용하기 위한 컬럼을 지정 (예: 데이터베이스 항목들을 DAG ID로 그룹화하고 각 그룹에서 마지막 실행 인스턴스만 보존하고자 할 때 해당 컬럼을 사용)
    • do_not_delete_by_dag_id: dag_id 별로 삭제를 수행할지 결정 (기본값: False)
      • 해당 항목을 True로 하면 테이블의 데이터를 일괄적으로 제거함

    요약

    • airflow 데이터베이스를 작게 유지하기 위해선 로그를 주기적으로 삭제하는 작업이 필요

    레퍼런스

    https://cloud.google.com/composer/docs/composer-2/cleanup-airflow-database?hl=ko

    댓글