많은 작업을 동시에 실행하는 파이썬 프로그램에서는 종종 작업들을 조율해 줘야합니다. 가장 유용한 병행 작업 방식 중 하나는 함수의 파이프라인입니다.

파이프라인은 제조 공장에서 사용하는 조립 라인처럼 작동합니다. 파이프라인은 일렬로 이어진 단계들로 구성되며, 각 단계에는 특정 함수가 연결되어 있습니다. 새 작업 요소는 끊임없이 파이프라인의 앞쪽에 추가됩니다. 각 함수는 동시에 자신이 속한 단계에 배정된 작업 요소를 처리할 수 있습니다. 남아 있는 단계가 더는 없을 때까지, 각 함수에서 처리를 완료할 때마다 작업은 다음 단계로 이동합니다. 이 방법은 파이썬으로 쉽게 병렬화할 수 있는 블로킹 I/O나 서브프로세스를 이용하는 작업에 특히 잘 맞습니다.


예를 들어 디지털 카메라에서 끊임없이 이미지들을 가져와 리사이즈하고 온라인 포토 갤러리에 추가하는 시스템을 구축한다고 가정합니다. 이런 프로그램에서는 파이프라인을 세 단계로 나눌 수 있습니다. 첫 번째 단계에서 새 이미지를 추출합니다. 두 번째 단계에서는 리사이즈 함수로 다운로드된 이미지를 처리합니다. 마지막 단계에서는 업로드 함수로 리사이즈된 이미지를 소비합니다.

이 세 단계를 실행하는 download, resize, upload 함수를 이미 작성했다고 할 때, 작업을 동시에 처리하려면 파이프라인을 어떻게 조립해야 될까요?

가장 먼저 필요한 건 파이프라인 단계 사이에서 작업을 전달할 방법입니다.

이 방법은 스레드 안전 생산자-소비자 큐(thread-safe producer-consumer queue)로 모델링할 수 있습니다.

from threading import Thread
from threading import Lock
from collections import deque
import time


class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    # 생산자(producer)인 디지털 카메라는 새 이미지를 대기 아이템 리스트의 끝에 추가
    def put(self, item):
        with self.lock:
            self.items.append(item)

    # 소비자(consumer)인 처리 파이프라인의 첫 번쨰 단계에서는 대기 아이템 리스트의 앞쪽에서 이미지 추출
    def get(self):
        with self.lock:
            return self.items.popleft()


"""여기서는 이러한 큐에서 작업을 꺼내와서 함수를 실행한 후 결과를 또 다른 큐에 넣는 파이썬 스레드로 파이프라인의 각 단계를 표현
또한 작업 스레드가 새 입력을 몇 번이나 체크하고 작업을 얼마나 완료하는지 추적"""


class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    # 가장 까다로운 부분은 이전 단계에서 아직 작업을 완료하지 않아 입력 큐가 비어 있는 경우를 작업스레드에서 적절히 처리하는 것
    # 다음 코드에서 IndexError 예외를 잡는 부분이 이에 해당. 이 경우를 조립 라인이 정체된 상황이라고 보면 됨

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)  # 처리할 아이템이 ㅇㅂㅅ음
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1


# 이제 작업 조율용 큐와 그에 해당하는 작업 스레드를 생성해 세 단계를 연결하면 됨

download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

# 스레드를 시작하고 파이프라인의 첫 번째 단계에 많은 작업을 추가. 다음 코드에서 download 함수에 실제 데이터 대신 일반 object 인스턴스를 사용

for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.get(object())

while len(download_queue.items) < 1000:
    # 기다리는 동안 유용한 작업을 수행
    # .....
    pass

# 이 코드는 제대로 동작하지만, 입력 큐에서 새 작업을 가져오는 스레드가 일으키는 흥미로운 부작용이 존재.
# 바로 run 메서드에서 IndexError 예외를 잡는 그 까다로운 부분이 매우 많이 실행됨

processed = len(done_queue.items)
polled = sum(t.polledcount for t in threads)

print('Processed', processed, 'items after polling', polled, 'times')

# 예상결과
# Prcessed 1000 items after polling 3030 times


작업 수행 함수의 실행 속도가 제각각이면 초기 단계가 후속 단계의 진행을 막아 파이프라인이 정체될 수 있습니다. 그러면 후속단계에서 처리할 것이 없어서 지속적으로 새 작업을 가져오려고 짧은 주기로 입력 큐를 확인하게 됩니다. 결국 작업 스레드는 유용한 작업을 전혀 하지 않으면서 CPU 시간을 낭비합니다.(끊임없이 IndexError 예외를 일으키고 잡는일만 합니다.)

하지만 이건 잘못된 구현의 시작일 뿐입니다. 피해야 할 문제가 세 가지 더 있습니다.

첫 번째로 입력 작업을 모두 완료했는지 판단하려면 done_queue에 결과가 모두 쌓일 때까지 기다려야 합니다.

두 번째로 Worker의 run 메서드는 루프에서 끊임없이 실행됩니다. 루프를 빠져나오도록 작업 스레드에 신호를 줄 방법이 없습니다.

세 번째로 최악의 문제로 파이프라인이 정체되면 프로그램이 제멋대로 고장날 수 있습니다. 첫 번째 단계는 빠르게 진행하지만 두 번째 단계가 느리게 진행되면 첫 번째와 두 번째 단계를 연결하는 큐의 크기가 계속 증가합니다. 두 번째 단계는 큐가 증가하는 속도를 따라잡지 못합니다. 충분한 시간과 충분한 입력 데이터가 있다면 프로그램은 결국 메모리 부족으로 죽습니다.

여기서 알 수 있는 사실은 파이프라인이 나쁘다는 것이 아닌 생산자-소비자 큐를 직접 만들기가 어렵다는 것입니다.

Queue로 문제 해결하기

내장 모듈 queue에 들어있는 Queue클래스는 이런 문제를 해결하는 데 필요한 기능을 모두 제공합니다.

Queue는 새 데이터가 생길 때까지 get 메서드가 블록되게 하여 작업 스레드가 계속해서 데이터가 있는지 체크하는 상황(busy waiting, 바쁜 대기)을 없애줍니다. 예를 들어 다음은 큐에서 입력 데이터를 기다리는 스레드를 시작합니다.

from threading import Thread
from queue import Queue

queue = Queue()


def consumer():
    print('Consumer waiting')  # 뒤에 나오는 put() 이후에 실행함
    queue.get()
    print('Consumer done')


thread = Thread(target=consumer)
thread.start()

# 스레드가 처음으로 실행할 때도 Queue 인스턴스에 아이템이 들어가서 get 메서드에서 반환할 아이템이 생기기 전에는 마치지 못함
print('Producer putting')
queue.put(object())  # 앞에 나온 get() 이전에 실행
thread.join()
print('Producer done')

# 결과
# Consumer waiting
# Producer putting
# Consumer done
# Producer done


파이프라인 정체 문제를 해결하려면 두 단계 사이에서 대기할 작업의 최대 개수를 Queue에 설정해야 합니다. 큐가 이미 이 버퍼 크기만큼 가득 차 있으면 put 호출이 블록됩니다. 예를 들어 다음 코드는 큐를 소비하기 전에 잠시 대기하는 스레드를 정의합니다.

from threading import Thread
from queue import Queue
import time

queue = Queue(1)  # 크기가 1인 버퍼


def consumer():
    time.sleep(0.1)  # 대기
    queue.get()  # 두 번째로 실행함
    print('Consumer got 1')
    queue.get()  # 네 번째로 실행함
    print('Consumer got 2')


thread = Thread(target=consumer)
thread.start()

""" 대기 결과로 consumer 스레드에서 get을 호출하기 전에 생산 스레드에서 put으로 객체 두 개를 큐에 집어넣는 동작이 일어나야 함
하지만 Queue의 크기가 1임. 다시 말해 두 번째 put 호출이 블록된 상태에서 빠져나와서 두 번째 아이템을 큐에 넣을 수 있으려면, 큐에 아이템을
추가하는 생산자는 소비 스레드에서 적어도 한 번은 get을 호출하기를 기다려야 함"""

queue.put(object())  # 첫 번째로 실행
print('Producer put 1')
queue.put(object())  # 세 번째로 실행
print('Producer put 2')
thread.join()
print('Producer done')

# 결과
# Producer put 1
# Consumer got 1
# Producer put 2
# Consumer got 2
# Producer done


Queue 클래스는 task_done 메서드로 작업 진행을 추적할 수도 있습니다. 작업 진행을 추적하면 특정 단계의 입력 큐가 빌 때까지 기다릴 수 있으므로 파이프라인의 끝에서 done_queue를 폴링하지 않아도 됩니다. 예를 들어 다음은 아이템으로 작업을 완료하면 task_done을 호출하는 소비 스레드를 정의합니다.

from threading import Thread
from queue import Queue
import time

in_queue = Queue()  # 크기가 1인 버퍼


def consumer():
    print('Consumer waiting')
    work = in_queue.get()  # 두 번째로 완료
    print('Consumer working')
    # 작업 수행
    # ...
    print('Consumer done')
    in_queue.task_done()  # 세 번째로 완려


thread = Thread(target=consumer).start()

""" 이제 생산자는 조인으로 소비 스레드를 대기하거나 폴링하지 않아도 됨. 그냥 Queue 인스턴스의 join을 호출해 in_queue가 완료하기를
기다리면 됨 심지어 큐가 비더라도 in_queue의 join메서드는 이미 큐에 추가된 모든 아이템에 task_done을 호출할 때까지 완료하지 않음"""

in_queue.put(object())  # 첫 번째로 완료
print('Producer waiting')
in_queue.join()  # 네 번째로 완료
print('Producer done')

# 결과
# Consumer waiting
# Producer waiting
# Consumer working
# Consumer done
# Producer done

요약

파이프라인은 여러 파이썬 스레드를 사용하여 동시에 실행하는 작업의 순서를 구성하기에 아주 좋은 방법

병행 파이프라인을 구축할 때는 많은 문제(바쁜 대기, 작업자 중단, 메모리 부족)가 일어날 수 있다는 점을 주의

Queue 클래스는 연산 블로킹, 버퍼 크기, 조인 등 견고한 파이프라인을 만드는 데 필요한 기능을 모두 갖춤

+ Random Posts