ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Python] 병렬처리 (Concurrent.futures)
    언어/파이썬 & 장고 2017. 2. 15. 17:13

    Python3.2 에서 concurrent.futures 모듈이 추가되었습니다. 이 모듈은 멀티스레드와 멀티 프로세스에 대한 고수준 API를 포함하고 있으며 스레드 풀/프로세스 풀 기반의 동작을 기본적으로 지원합니다.

    파이썬 제약 : GIL

    Python은 두 개 이상의 스레드가 동시에 실행될 때 두 개 이상의 스레드가 하나의 자원을 동시에 액세스할 때 발생할 수 있는 문제점을 방지하기 위해 GIL(Global Internal Lock)이라는 것을 도입했습니다. 즉, 스레드가 실행될 때, 프로그램 내의 리소스 전체에 락이 걸립니다. 결국 Python 구현에서는 동시에 몇 개의 스레드가 실행이 되던 간에 GIL에 의해서 한 번에 하나의 스레드만 실행됩니다. 멀티 스레드의 경우 문맥교환(Context Switch)에 필요한 리소스까지 고려하면 단일 스레드보다 성능이 떨어지게 되는 것을 확인할 수 있습니다. GIL은 아주 오래전부터 파이썬의 약점으로 지적받았습니다. GIL 때문에 멀티스레드를 통한 분산처리는 파이썬 내에서 의미가 없으며, 분산처리를 통해 성능의 이익을 보려면 멀티 프로세스를 사용해야 합니다. 하지만 프로세스를 추가로 생성하는 것은 OS입장에서는 매우 비용이 많이 드는 일이며, 특히 윈도우 환경에서는 어지간히 커다란 작업이 아니면 프로세스 생성에 드는 시간이 가장 큰 병목이 될 가능성이 있습니다.

    concurrent.futures 모듈

     concurrent.futures 모듈은 별도 규격의 스레드 객체를 작성하지 않고 함수 호출을 객체화하여 다른 스레드나 다른 프로세스에서 이를 실행할 수 있게 해줍니다. 이때 중심 역할을 하는 것이 Executor 클래스입니다. Executor 클래스는 다시 ThreadPoolExecutor와 ProcessPoolExecutor로 나뉘는데 두 클래스의 차이는 동시성 작업을 멀티 스레드로 처리하느냐, 멀티 프로세스로 처리하느냐만 있지 거의 동일한 기능을 제공합니다.

    Future 

     executor를 이용한 동시성 처리는 호출해야 할 함수와 그에 전달될 인자들을 executor에 넘겨주는 것으로 시작되는데, executor의 해당 메소드는 다른 스레드의 리턴을 기다릴 필요가 없으므로 바로 리턴하게 됩니다. 이 때 리턴되는 객체가 Future 객체이며, 이 객체의 상태를 조사하여 완료 여부를 확인하거나, 해당 객체 내 작업이 완료되기를 기다리거나 혹은 미리 콜백을 넘겨놓아둘 수도 있습니다.

    Executor  

     Executor 객체는 풀 기반으로 작업을 관리합니다. 초기화 시에 몇 개의 worker가 사용될 것인지를 정해주면 전달되는 작업들을 큐에 넣고 worker pool에서 사용 가능한 worker로 하여금 작업을 처리하게 합니다.

    submit(fn, *args, **kwargs)

    함수 fn에 대해 주어진 인자들을 전달하여 실행할 수 있는 Future 객체를 리턴합니다. 해당 함수는 호출 즉시 스케줄링됩니다. 

    with ThreadPoolExcutor(max_workers=1) as executor:
        future = executor.submit(pow, 323, 1235)
        print(future.result())

    map(func, *iterables, timeout=None)

     일반함수 map과 동일하나, 각 호출은 병렬적으로 일어납니다. 만약 타임아웃 값이 지정된 경우, 맵핑 작업이 완료되지 않은 호출이 있으면 TimeoutError가 일어납니다. 입력데이터와 동작함수를 짝지어서 바로 스케줄링하도록 합니다. map()함수는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 됩니다.

     shutdown(wait=True)

     executor에게 종료 시그널을 보냅니다. 시그널을 받은 executor는 실행 중 및 대기 중인 모든 future에 대해 리소스를 정리합니다. shutdown 후에 submit이나 map을 호출하면 런타임에러가 발생합니다.만약 wait 값이 True로 정해지면 진행 및 대기중이던 작업이 종료된 후에 shutdown이 일어나고, 그 때까지 해당 함수는 리턴을 보류하게 됩니다. 만약 강제 shutdown을 피하고 싶다면 with 구문 내에서 사용합니다.

    import shutil
    with ThreadPoolExcutor(max_workers=4) as e:
        e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
        e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
        e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
        e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

    예제

    아래는 0.1초마다 처리해야할 데이터를 수집하고 이를 멀티프로세스에서 처리하는 예제입니다.

    from concurrent.futures import ProcessPoolExecutor
    from time import sleep
    
    def process(*args):
        result = # 입력받은 인자들을 처리한다...
        time.sleep(1)
        print(result)
    
    def main():
    	exe = ProcessPoolExecutor(max_workers=4) as exe:
    	while true:
        	cont, x = collect_arguments()
    	    if not cont:
        	    break
    	    if x:
        	    exe.submit(process, *x)
    	    else:
        	    sleep(0.1)
    	exe.shutdown(wait=True)
     
    if __name__ == "__main__":
    	main()

    위 코드는 일종의 데몬으로 다음과 같이 동작합니다.

    1. 매 입력된 데이터를 처리하는 함수는 process(). 이 함수는 처리가 끝나면 1초간 대기했다가 결과를 화면에 출력.
    2. 최대 4개까지의 프로세스를 사용할 수 있도록 executor를 생성
    3. 데이터를 수집하여 계속 진행할 것인지 여부를 확인하고, 진행한다면 수집된 값들을 처리하도록 executor에게 요청
    4. 무한 루프 내에서도 약간 텀을 줌
    5. 더이상 처리하지 않는다면 루프에서 빠져나오는데, 이 때 executor는 아직 실행되고 있는 작업들이 있으면 이들이 완료될 때까지 기다린 후 종료
    6. 프로그램 종료


    여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 worker는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 됩니다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 합니다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없습니다. 

    간단하게 말해 멀티프로세스를 실행하기 위해선 해당 파일을 직접적으로 실행해야 합니다. 다른 파일에서 호출하는 형식으로 진행하면 예상치 않게 동작할 수도 있습니다. (현재 확인한 바로는 shutdown()나 wait()이 없으면 전체가 실행이 되긴 합니다. 근데 shutdown()나 wait()이 존재하면 shutdown()나 wait()등 이 동작하지 않고 해당 부분에서 멈춥니다.) 


    만약 분산처리와 같이 worker가 처리해준 작업의 결과들을 다시 메인 스레드에서 넘겨받아 사용하려면 다음 예제에서 확인할 수 있습니다.

    from concurrent import futures
    total_result = 0
    with futures.ThreadPoolExecutor(max_workers=4) as exe:
        fs = exe.map(process, list(range(1, 10000, 1000)))
        for done in futures.as_completed(fs):
            result = done.result()
            print(result)
            total_result += result
    print(total_result)

     

    executor의 map() 메소드는 처리용 함수와 입력 값의 집합을 이용해서 한꺼번에 작업을 생성합니다. 그리고 Future의 집합을 리턴하는데, as_completed 함수는 작업들을 기다리면서, 집합 내에서 종료된 것들부터 차례로 이터레이션해줍니다.

    위 예제는 개별로 요청을 확인하는 예제이고, 아래는 요청한 작업 전체를 기다리다가 한꺼번에 처리하는 예제입니다.

    with futures.ThreadPoolExecutor(max_workers=10) as exe:
        fs = exe.map(process, ARGUMENTS)
        (done, not_done) = futures.wait(fs, timeout=2)
        total_result = sum([f.result() for f in done])

    주의점

    concurrent.futures 모듈로 멀티프로세스를 사용할 때 주의해야할 점은 객체를 생성한 후 하나의 객체를 여러 프로세스에서 접근하지 않도록 해야 합니다. 만약 하나의 객체에 여러 프로세스가 접근할 경우 ssl error decryption failed or bad record mac 가 발생하게 됩니다. 

    from concurrent.futures import ProcessPoolExecutor 
     
    def concurrent(list):
        for i in list:
            query = 'insert into test (id) values ({0})'.format(list[i])
            cursor.execute(query)
        conn.commit()
     
    db = Database() 
    cursor, conn = db.connect() # 입력된 정보를 바탕으로 db 연결을 하여 cursor, connection을 반환해주는 함수가 있다고 가정
    _list = ['1','11','111','1111','11111','111111']
     
     
    pool = ProcessPoolExecutor(max_workers=4)
    pool.map(concurrent, _list)
     
    # 실행 시 ssl error decryption failed or bad record mac 에러 발생


    아래와 같이 여러 프로세스가 각자의 객체를 갖도록 코딩해야 합니다.

    from concurrent.futures import ProcessPoolExecutor 
    def concurrent(list):
        cursor, conn = db.connect() # 각 프로세스마다 다른 데이터베이스 객체 생성
        for i in list:
            query = 'insert into test (id) values ({0})'.format(list[i])
            cursor.execute(query)
        conn.commit()
      
    db = Database()
     
    _list = ['1','11','111','1111','11111','111111']
     
    pool = ProcessPoolExecutor(max_workers=4)
    pool.map(concurrent, _list)


    댓글