-
[Airflow] 데이터베이스 로그 주기적으로 자동 삭제하기언어/파이썬 & 장고 2024. 3. 1. 18:16
시간이 지남에 따라 환경의 Airflow 데이터베이스에는 더 많은 데이터가 저장됩니다. 이 데이터에는 과거 DAG 실행, 태스크, 기타 Airflow 작업과 관련된 정보 및 로그가 포함됩니다.
Dag를 사용하여 주기적인 DB 유지보수
다음 유지보수 DAG를 사용하여 데이터베이스 콘텐츠 속도를 빠르게 할 수 있습니다.(프루닝) 데이터베이스 크기를 작게 유지하려면 유지보수 DAG를 주기적으로 실행해야 합니다. 구글의 composer 문서에서는 아래에서 소개할 DAG를 매일 실행하는 것이 좋다고 권장하고 있습니다. 데이터베이스의 크기를 작게 유지하는 것은 일정 기간이상 쌓여있는 로그를 지워야 합니다. 여기선 보관 기간(
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS
)을 설정하고 있습니다. 구글의 composer 문서에서는 30일이 초과한 로그는 지우도록 권장합니다.아래의 DAG는 airflow 데이터베이스의
job
,dag_run
,task_instance
,log
,xcom
,sla_miss
,dags
,task_reschedule
,task_fail
,import_error
테이블에서 30일이 초과한 로그를 삭제합니다. 아래는 예시이므로 추가가 필요한 테이블이나 유지해야하는 테이블 목록은 코드에서 지우면 됩니다. 일반적으로 공간 절감 효과는 대부분log
,task_instance
,dag_run
,xcom
테이블의 데이터를 삭제하면 얻을 수 있습니다.코드
""" A maintenance workflow that you can deploy into Airflow to periodically clean out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid having too much data in your Airflow MetaStore. ## Authors The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup) ## Usage 1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values 2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each dictionary in the list features the following parameters: - airflow_db_model: Model imported from airflow.models corresponding to a table in the airflow metadata database - age_check_column: Column in the model/table to use for calculating max date of data deletion - keep_last: Boolean to specify whether to preserve last run instance - keep_last_filters: List of filters to preserve data from deleting during clean-up, such as DAG runs where the external trigger is set to 0. - keep_last_group_by: Option to specify column by which to group the database entries and perform aggregate functions. 3. Create and Set the following Variables in the Airflow Web Server (Admin -> Variables) - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or older. 4. Put the DAG in your gcs bucket. """ from datetime import timedelta import logging import os import airflow from airflow import settings from airflow.models import ( DAG, DagModel, DagRun, Log, SlaMiss, TaskInstance, Variable, XCom, ) from airflow.operators.python import PythonOperator from airflow.utils import timezone from airflow.version import version as airflow_version import dateutil.parser from sqlalchemy import and_, func, text from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import load_only now = timezone.utcnow # airflow-db-cleanup DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") START_DATE = airflow.utils.dates.days_ago(1) # How often to Run. @daily - Once a day at Midnight (UTC) SCHEDULE_INTERVAL = "@daily" # Who is listed as the owner of this DAG in the Airflow Web Server DAG_OWNER_NAME = "operations" # List of email address to send email alerts to if this job fails ALERT_EMAIL_ADDRESSES = [] # Airflow version used by the environment in list form, value stored in # airflow_version is in format e.g "2.3.4+composer" AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".") # Length to retain the log files if not already provided in the conf. If this # is set to 30, the job will remove those files that arE 30 days old or older. DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int( Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30) ) # Prints the database entries which will be getting deleted; set to False # to avoid printing large lists and slowdown process PRINT_DELETES = False # Whether the job should delete the db entries or not. Included if you want to # temporarily avoid deleting the db entries. ENABLE_DELETE = True # List of all the objects that will be deleted. Comment out the DB objects you # want to skip. DATABASE_OBJECTS = [ { "airflow_db_model": DagRun, "age_check_column": DagRun.execution_date, "keep_last": True, "keep_last_filters": [DagRun.external_trigger.is_(False)], "keep_last_group_by": DagRun.dag_id, }, { "airflow_db_model": TaskInstance, "age_check_column": TaskInstance.start_date if AIRFLOW_VERSION < ["2", "2", "0"] else TaskInstance.start_date, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, }, { "airflow_db_model": Log, "age_check_column": Log.dttm, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, }, { "airflow_db_model": XCom, "age_check_column": XCom.execution_date if AIRFLOW_VERSION < ["2", "2", "5"] else XCom.timestamp, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, }, { "airflow_db_model": SlaMiss, "age_check_column": SlaMiss.execution_date, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, }, { "airflow_db_model": DagModel, "age_check_column": DagModel.last_parsed_time, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, }, ] # Check for TaskReschedule model try: from airflow.models import TaskReschedule DATABASE_OBJECTS.append( { "airflow_db_model": TaskReschedule, "age_check_column": TaskReschedule.execution_date if AIRFLOW_VERSION < ["2", "2", "0"] else TaskReschedule.start_date, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, } ) except Exception as e: logging.error(e) # Check for TaskFail model try: from airflow.models import TaskFail DATABASE_OBJECTS.append( { "airflow_db_model": TaskFail, "age_check_column": TaskFail.start_date, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, } ) except Exception as e: logging.error(e) # Check for RenderedTaskInstanceFields model if AIRFLOW_VERSION < ["2", "4", "0"]: try: from airflow.models import RenderedTaskInstanceFields DATABASE_OBJECTS.append( { "airflow_db_model": RenderedTaskInstanceFields, "age_check_column": RenderedTaskInstanceFields.execution_date, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, } ) except Exception as e: logging.error(e) # Check for ImportError model try: from airflow.models import ImportError DATABASE_OBJECTS.append( { "airflow_db_model": ImportError, "age_check_column": ImportError.timestamp, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, "do_not_delete_by_dag_id": True, } ) except Exception as e: logging.error(e) if AIRFLOW_VERSION < ["2", "6", "0"]: try: from airflow.jobs.base_job import BaseJob DATABASE_OBJECTS.append( { "airflow_db_model": BaseJob, "age_check_column": BaseJob.latest_heartbeat, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, } ) except Exception as e: logging.error(e) else: try: from airflow.jobs.job import Job DATABASE_OBJECTS.append( { "airflow_db_model": Job, "age_check_column": Job.latest_heartbeat, "keep_last": False, "keep_last_filters": None, "keep_last_group_by": None, } ) except Exception as e: logging.error(e) default_args = { "owner": DAG_OWNER_NAME, "depends_on_past": False, "email": ALERT_EMAIL_ADDRESSES, "email_on_failure": True, "email_on_retry": False, "start_date": START_DATE, "retries": 1, "retry_delay": timedelta(minutes=1), } dag = DAG( DAG_ID, default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, start_date=START_DATE, ) if hasattr(dag, "doc_md"): dag.doc_md = __doc__ if hasattr(dag, "catchup"): dag.catchup = False def print_configuration_function(**context): logging.info("Loading Configurations...") dag_run_conf = context.get("dag_run").conf logging.info("dag_run.conf: " + str(dag_run_conf)) max_db_entry_age_in_days = None if dag_run_conf: max_db_entry_age_in_days = dag_run_conf.get("maxDBEntryAgeInDays", None) logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf)) if max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1: logging.info( "maxDBEntryAgeInDays conf variable isn't included or Variable " + "value is less than 1. Using Default '" + str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + "'" ) max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS max_date = now() + timedelta(-max_db_entry_age_in_days) logging.info("Finished Loading Configurations") logging.info("") logging.info("Configurations:") logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days)) logging.info("max_date: " + str(max_date)) logging.info("enable_delete: " + str(ENABLE_DELETE)) logging.info("") logging.info("Setting max_execution_date to XCom for Downstream Processes") context["ti"].xcom_push(key="max_date", value=max_date.isoformat()) print_configuration = PythonOperator( task_id="print_configuration", python_callable=print_configuration_function, provide_context=True, dag=dag, ) def build_query( session, airflow_db_model, age_check_column, max_date, keep_last, keep_last_filters=None, keep_last_group_by=None, ): query = session.query(airflow_db_model).options(load_only(age_check_column)) logging.info("INITIAL QUERY : " + str(query)) if not keep_last: query = query.filter( age_check_column <= max_date, ) else: subquery = session.query(func.max(DagRun.execution_date)) # workaround for MySQL "table specified twice" issue # https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41 if keep_last_filters is not None: for entry in keep_last_filters: subquery = subquery.filter(entry) logging.info("SUB QUERY [keep_last_filters]: " + str(subquery)) if keep_last_group_by is not None: subquery = subquery.group_by(keep_last_group_by) logging.info("SUB QUERY [keep_last_group_by]: " + str(subquery)) subquery = subquery.from_self() query = query.filter( and_(age_check_column.notin_(subquery)), and_(age_check_column <= max_date) ) return query def print_query(query, airflow_db_model, age_check_column): entries_to_delete = query.all() logging.info("Query: " + str(query)) logging.info( "Process will be Deleting the following " + str(airflow_db_model.__name__) + "(s):" ) for entry in entries_to_delete: date = str(entry.__dict__[str(age_check_column).split(".")[1]]) logging.info("\tEntry: " + str(entry) + ", Date: " + date) logging.info( "Process will be Deleting " + str(len(entries_to_delete)) + " " + str(airflow_db_model.__name__) + "(s)" ) def cleanup_function(**context): session = settings.Session() logging.info("Retrieving max_execution_date from XCom") max_date = context["ti"].xcom_pull( task_ids=print_configuration.task_id, key="max_date" ) max_date = dateutil.parser.parse(max_date) # stored as iso8601 str in xcom airflow_db_model = context["params"].get("airflow_db_model") state = context["params"].get("state") age_check_column = context["params"].get("age_check_column") keep_last = context["params"].get("keep_last") keep_last_filters = context["params"].get("keep_last_filters") keep_last_group_by = context["params"].get("keep_last_group_by") logging.info("Configurations:") logging.info("max_date: " + str(max_date)) logging.info("enable_delete: " + str(ENABLE_DELETE)) logging.info("session: " + str(session)) logging.info("airflow_db_model: " + str(airflow_db_model)) logging.info("state: " + str(state)) logging.info("age_check_column: " + str(age_check_column)) logging.info("keep_last: " + str(keep_last)) logging.info("keep_last_filters: " + str(keep_last_filters)) logging.info("keep_last_group_by: " + str(keep_last_group_by)) logging.info("") logging.info("Running Cleanup Process...") try: if context["params"].get("do_not_delete_by_dag_id"): query = build_query( session, airflow_db_model, age_check_column, max_date, keep_last, keep_last_filters, keep_last_group_by, ) if PRINT_DELETES: print_query(query, airflow_db_model, age_check_column) if ENABLE_DELETE: logging.info("Performing Delete...") query.delete(synchronize_session=False) session.commit() else: dags = session.query(airflow_db_model.dag_id).distinct() session.commit() list_dags = [str(list(dag)[0]) for dag in dags] + [None] for dag in list_dags: query = build_query( session, airflow_db_model, age_check_column, max_date, keep_last, keep_last_filters, keep_last_group_by, ) query = query.filter(airflow_db_model.dag_id == dag) if PRINT_DELETES: print_query(query, airflow_db_model, age_check_column) if ENABLE_DELETE: logging.info("Performing Delete...") query.delete(synchronize_session=False) session.commit() if not ENABLE_DELETE: logging.warn( "You've opted to skip deleting the db entries. " "Set ENABLE_DELETE to True to delete entries!!!" ) logging.info("Finished Running Cleanup Process") except ProgrammingError as e: logging.error(e) logging.error( str(airflow_db_model) + " is not present in the metadata. " "Skipping..." ) finally: session.close() def cleanup_sessions(): session = settings.Session() try: logging.info("Deleting sessions...") count_statement = "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);" before = session.execute(text(count_statement)).one_or_none()["cnt"] session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);")) after = session.execute(text(count_statement)).one_or_none()["cnt"] logging.info("Deleted %s expired sessions.", (before - after)) except Exception as err: logging.exception(err) session.commit() session.close() def analyze_db(): session = settings.Session() session.execute("ANALYZE") session.commit() session.close() analyze_op = PythonOperator( task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag ) cleanup_session_op = PythonOperator( task_id="cleanup_sessions", python_callable=cleanup_sessions, provide_context=True, dag=dag, ) cleanup_session_op.set_downstream(analyze_op) for db_object in DATABASE_OBJECTS: cleanup_op = PythonOperator( task_id="cleanup_" + str(db_object["airflow_db_model"].__name__), python_callable=cleanup_function, params=db_object, provide_context=True, dag=dag, ) print_configuration.set_downstream(cleanup_op) cleanup_op.set_downstream(analyze_op)
DATABASE_OBJECTS 설명
- airflow_db_model: airflow 메타데이터 데이터베이스 내에 있는 테이블에 해당하는 airflow.models의 클래스를 선언
- age_check_column: 데이터 삭제 기준 날짜 설정
- 해당 값을 기준으로 설정된 일자를 계산하여 로그를 삭제
- keep_last: 마지막에 실행된 인스턴스를 보존할지 여부를 지정하는 필드
- 로그를 삭제했는데 해당 dag에 로그가 전부 지워졌을 시, 가장 최근 실행된 로그를 남김
- keep_last_filters: 데이터 삭제를 방지하기 위한 필터들의 목록을 나타냄
- 데이터 정리 프로세스를 실행할 때 여기서 설정된 필터들은 삭제 대상이 아닌 데이터를 식별하는 데 사용 (예: 외부 트리거가 0으로 설정된 DAG 실행 데이터를 보존하고자 할 때 해당 필터를 사용)
- keep_last_group_by: 데이터베이스 항목들을 그룹화하고 집계 함수를 적용하기 위한 컬럼을 지정 (예: 데이터베이스 항목들을 DAG ID로 그룹화하고 각 그룹에서 마지막 실행 인스턴스만 보존하고자 할 때 해당 컬럼을 사용)
- do_not_delete_by_dag_id: dag_id 별로 삭제를 수행할지 결정 (기본값: False)
- 해당 항목을 True로 하면 테이블의 데이터를 일괄적으로 제거함
요약
- airflow 데이터베이스를 작게 유지하기 위해선 로그를 주기적으로 삭제하는 작업이 필요
레퍼런스
https://cloud.google.com/composer/docs/composer-2/cleanup-airflow-database?hl=ko