ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Django] 장고에서 concurrent.futures의 process 사용하기
    언어/파이썬 & 장고 2020. 3. 7. 17:39

    장고의 ORM을 사용하지 않고 DB 패키지인 (여기서는 PG를 선택) psycopg2를 사용하여 구현한다면 아래와 같은 코드로 예시를 들 수 있습니다.

    from concurrent.futures import ProcessPoolExecutor
    
    import psycopg2
    import psycopg2.extras
    from psycopg2.extensions import new_type, DECIMAL
    
    
    class MultiProcessTest:
    
        def __init__(self):
            db = {}  # DB 정보
            self.dsn = f"host='{db['HOST']}' port='{db['PORT']}' dbname='{db['NAME']}' user='{db['USER']}' password='{db['PASSWORD']}'"
    
        def __connect(self):
            # decimal 타입 제거
            DEC2FLOAT = new_type(DECIMAL.values, 'DEC2FLOAT',
                                 lambda value, curs: float(value) if value is not None else None)
            psycopg2.extensions.register_type(DEC2FLOAT)
            conn = psycopg2.connect(self.dsn)
            # dictionary 형태
            cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    
            return conn, cursor
    
        def multi_process_test(self, i):
            conn, cursor = self.__connect()
            cursor.execute(f"select * from item where item = {i}")
    
            result = cursor.fetchall()
    
            # 기타 작업
    
            cursor.close()
            conn.close()
    
        def run(self):
            _list = ['1', '11', '111', '1111', '11111', '111111']
            with ProcessPoolExecutor(max_workers=4) as pool:
                pool.map(self.multi_process_test, _list)
    
    
    MultiProcessTest().run()


    만약 위처럼 raw 쿼리를 사용하지 않고 ORM을 사용한다면 아래와 같이 사용할 수 있습니다.

    import os
    import signal
    from concurrent.futures import ProcessPoolExecutor
    
    from django import db
    
    
    class MultiProcessTest:
        PID = os.getpid()
    
        def multi_process_test(self, i):
    		self.PID = os.getpid()
            db.connections.close_all()
            result = Item.objects.filter(item=i)
            # 기타 작업
            temp = result[0]
            # ...
    
        def run(self):
            _list = ['1', '11', '111', '1111', '11111', '111111']
            with ProcessPoolExecutor(max_workers=4) as pool:
                pool.map(self.multi_process_test, _list, chunksize=200)
    
        def __del__(self):
            # 부모가 먼저 죽어서 자식 프로세스가 고아가 될 경우, 자식 프로세스 전부 삭제
            if os.getppid() == 1:
                os.killpg(os.getpgid(self.PID), signal.SIGKILL)
    
    
    MultiProcessTest().run()


    위와 같이 ORM을 사용하여 multi process를 구현할 때 주의해야 할 점이 있습니다.

    1. 복제된 프로세스는 부모 프로세스의 DB connection도 복제하여 가지고 있음

    프로세스를 fork 뜰 때, 부모 프로세스의 DB connection까지 복제가 됩니다. 따라서 멀티 프로세스 함수 내에서 동일한 DB connection을 맺고 있는 형태이므로 ORM query 시, 오류가 발생하게 됩니다. 

    이를 해결하기 위해 최상단에 연결되어 있는 DB connection을 전부 해제한 다음, ORM query를 통해 새로 커넥션을 맺도록 진행합니다. 새롭게 맺어진 DB connection은 프로세스 종료 시, 종료가 되므로 함수 마지막에 명시하지 않아도 됩니다.

    만약 부모의 프로세스에서 DB 커넥션이 1개라고 확신할 수 있으면 db.connections.close_all() 대신 db.connection.close()를 명시해도 괜찮습니다.

    2. 부모 프로세스가 먼저 죽을 경우, 고아 프로세스가 발생

    멀티 프로세스 코드를 작성한 다음, 실행 중에 사용자 또는 예기치 않은 문제로 인해 해당 프로세스를 종료할 때 부모 프로세스가 먼저 죽는 경우가 발생할 수 있습니다. 이러한 고아 프로세스가 발생하면 직접 서버에 들어가 프로세스를 종료하는 수 밖에 없습니다. 이러한 문제를 방지하기 위해서 고아 프로세스가 발생할 경우, 부모 프로세스에서 fork된 모든 자식 프로세스를 종료하도록 처리를 합니다.


    먼저 복제된 자식 프로세스의 PPID는 부모의 PID값과 동일합니다. 이 때, 부모가 먼저 죽을 경우, 해당 PPID값이 1로 변경됩니다. 따라서 PPID가 1일 경우, 부모가 먼저 죽은 고아 프로세스입니다. 이를 해결하기 위해서 프로세스가 종료될 때마다 __del__ 함수를 호출하는데 이 때, PPID가 1인지 확인한 다음 PPID가 1일 경우에 PGID를 찾아 부모, 자식 프로세스를 전부 죽입니다.

    Process ID (PID) :프로세스 고유ID

    Parent Process ID (PPID) : 부모 프로세스 PID

    Process Group ID (PGID): 프로세스 그룹 ID. 모든 프로세스는 하나의 그룹에 속하게 되며 PGID는 해당 그룹의 리더 프로세스 PID.


    여기서 2번의 경우는 예기치 못한 상황을 방지하는 해결책입니다. 따라서 정상적인 종료가 발생한다면 실행에는 문제가 없지만 1번의 문제를 간과한다면 실행조차 되지 않으므로 1번의 문제는 반드시 해결하고 코드를 작성해야 합니다.

    댓글