ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Python] asyncio 파헤치기
    언어/파이썬 & 장고 2021. 3. 1. 20:12

    asyncio는 파이썬 버전 별로 사용하는 형태가 조금씩 다릅니다. 아래 설명에서는 파이썬 3.8 기준으로 작성했습니다.

    asyncio란?

    파이썬 3.5부터 지원하는 asyncio는 비동기 프로그래밍을 위한 모듈입니다. 동기란 빨래를 시작하고 종료가 되면 설거지를 시작하고 완료가 되면 TV를 보는 것처럼 한 번에 하나의 작업을 하는 것이고, 비동기는 빨래를 시작시키고 설거지를 하면서 TV를 보는 것과 같이 여러 작업을 동시에 하는 것과 같은 행동을 하는 것입니다. 하지만 파이썬에서는 GIL때문에 비동기 프로그래밍이 동기 프로그래밍보다 느릴 수도 있습니다.

    asyncio는 이벤트 루프와 코루틴을 기반으로 동작하며 데이터를 요청하고 응답을 기다리는 I/O bound한 작업에서 효율적입니다. 코루틴 기반이므로 멀티 스레드와 비교하여 문맥교환에 따른 비용이 다소 적게 들어갑니다.

    공식문서: https://docs.python.org/ko/3.8/library/asyncio.html

    이벤트 루프란?

    이벤트 루프는 작업들을 반복문 돌면서 하나씩 실행을 시킵니다. 이때, 실행한 작업이 데이터를 요청하고 응답을 기다린다면 다른 작업에게 event loop에 대한 권한을 넘깁니다. 권한을 받은 event loop는 다음 작업을 실행하고 응답을 받은 순서대로 대기하던 작업부터 다시 권한을 가져와 작업을 마무리합니다.

    공식 문서: https://docs.python.org/ko/3.8/library/asyncio-eventloop.html

    코루틴이란?

    https://brownbears.tistory.com/490 에 설명이 되어 있습니다.

    asyncio

    코루틴으로 태스크를 만들었다면, asyncio.get_event_loop함수를 활용해 이벤트 루프를 정의하고 run_until_complete으로 실행시킬 수 있습니다.

    asyncio를 사용하기 위해선 함수 앞에 async를 붙이면 코루틴으로 만들 수 있습니다. I/O가 발생하거나 권한을 다른 작업에 넘기고 싶을 때엔 해당 로직 앞에 await을 붙입니다. 이때 await 뒤에 오는 코드는 코루틴으로 작성된 코드여야 합니다. 만약 await 뒤에 time.sleep과 같이 사용한다면 스레드가 중단되므로 의도한대로 동작하지 않습니다. 따라서 코루틴으로 구현되어 있는 asyncio.sleep을 사용해야 합니다. 밑에서 설명하겠지만 코루틴으로 만들어진 모듈이 아니라면 (requests, psycopg, django orm 등등) await을 붙여도 소용이 없습니다.

    아래는 asyncio를 사용한 예제입니다.

    import asyncio
    import time
    
    async def main():
        print('start')
        await asyncio.sleep(1)
        print('end')
    
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(end - start)
    
    # start
    # end
    # 1.0051159858703613

    위 예제는 태스크가 1개이므로 동기 프로그래밍과 크게 다르지 않습니다. 아래는 2개의 태스크를 동기와 비동기로 실행한 코드입니다.

    동기

    import time
    
    def sync_task_1():
        print('sync_task_1 시작')
        print('sync_task_1 3초 대기')
        time.sleep(3)
        print('sync_task_1 종료')
    
    def sync_task_2():
        print('sync_task_2 시작')
        print('sync_task_2 2초 대기')
        time.sleep(2)
        print('sync_task_2 종료')
    
    start = time.time()
    sync_task_1()
    sync_task_2()
    end = time.time()
    print(end-start)
    
    # sync_task_1 시작
    # sync_task_1 3초 대기
    # sync_task_1 종료
    # sync_task_2 시작
    # sync_task_2 2초 대기
    # sync_task_2 종료
    # 5.00602912902832

    비동기

    import asyncio
    import time
    
    async def async_task_1():
        print('async_task_1 시작')
        print('sync_task_1 3초 대기')
        await asyncio.sleep(3)
        print('sync_task_1 재시작')
    
    async def async_task_2():
        print('async_task_2 시작')
        print('sync_task_2 2초 대기')
        await asyncio.sleep(2)
        print('sync_task_2 재시작')
    
    async def main():
        start = time.time()
        await async_task_1()
        await async_task_2()
        end = time.time()
        print(f'time taken: {end - start}')
    
    asyncio.run(main())
    
    # async_task_1 시작
    # sync_task_1 3초 대기
    # sync_task_1 재시작
    # async_task_2 시작
    # sync_task_2 2초 대기
    # sync_task_2 재시작
    # time taken: 5.004269123077393

    asyncio를 쓰면 더 빠르게 처리되어야 하는데 시간이 동일하게 걸렸습니다. 위와 같이 코루틴 함수 여러개를 한번에 실행해야 하는데 await 코루틴함수() 이런 식으로 나열하면 코루틴을 호출하는 것이지 다음 태스크를 실행하도록 예약하는 행동은 아닙니다. 여러 코루틴을 동시에 실행하여 원하는 동작을하기 위해선 아래와 같이 create_task()로 등록해야 합니다.

    import asyncio
    import time
    
    async def async_task_1():
        print('async_task_1 시작')
        print('sync_task_1 3초 대기')
        await asyncio.sleep(3)
        print('sync_task_1 재시작')
    
    async def async_task_2():
        print('async_task_2 시작')
        print('sync_task_2 2초 대기')
        await asyncio.sleep(2)
        print('sync_task_2 재시작')
    
    async def main():
        start = time.time()
        task1 = asyncio.create_task(async_task_1())
        task2 = asyncio.create_task(async_task_2())
        await task1
        await task2
        end = time.time()
        print(f'time taken: {end - start}')
    
    asyncio.run(main())
    
    # async_task_1 시작
    # sync_task_1 3초 대기
    # async_task_2 시작
    # sync_task_2 2초 대기
    # sync_task_2 재시작
    # sync_task_1 재시작
    # time taken: 3.0023632049560547

    현재 파이썬 3.8을 사용해서 손쉽게 asyncio.run(메인함수())를 사용하는데 이하 버전에서는 이벤트 루프에서 이벤트 루프를 가져온 다음 실행시켜야 합니다. asyncio.run() 함수 내부를 보면 다음과 같습니다.

    def run(main, *, debug=None):
        """Execute the coroutine and return the result.
    
        This function runs the passed coroutine, taking care of
        managing the asyncio event loop and finalizing asynchronous
        generators.
    
        This function cannot be called when another asyncio event loop is
        running in the same thread.
    
        If debug is True, the event loop will be run in debug mode.
    
        This function always creates a new event loop and closes it at the end.
        It should be used as a main entry point for asyncio programs, and should
        ideally only be called once.
    
        Example:
    
            async def main():
                await asyncio.sleep(1)
                print('hello')
    
            asyncio.run(main())
        """
        if events._get_running_loop() is not None:
            raise RuntimeError(
                "asyncio.run() cannot be called from a running event loop")
    
        if not coroutines.iscoroutine(main):
            raise ValueError("a coroutine was expected, got {!r}".format(main))
    
        loop = events.new_event_loop()
        try:
            events.set_event_loop(loop)
            if debug is not None:
                loop.set_debug(debug)
            return loop.run_until_complete(main)
        finally:
            try:
                _cancel_all_tasks(loop)
                loop.run_until_complete(loop.shutdown_asyncgens())
            finally:
                events.set_event_loop(None)
                loop.close()

    다른 asyncio 이벤트 루프가 동일한 스레드에서 실행 중일 때, asyncio.run() 함수를 호출한다면 에러가 발생합니다. 해당 함수를 호출하면 항상 새 이벤트 루프를 만들고 다 사용한 다음, 이벤트 루프를 종료 시킵니다. 따라서 asyncio 프로그램을 짤 때, 해당 함수는 메인 진입 지점으로 사용하고 1번만 호출하는 것이 좋습니다.

    동시에 실행하기

    위 예제 중, asyncio.create_task()로 각 awaitable 함수를 등록해서 하는 방법도 있지만 등록해야 하는 함수가 많다면 아래와 같이 asyncio.gather()를 사용하는 것이 좋습니다.

    import asyncio
    
    async def factorial(name, number):
        f = 1
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
        return f
    
    async def main():
        # 동시에 3개를 예약
        result = await asyncio.gather(factorial("A", 2), factorial("B", 3), factorial("C", 4))
        # 또는
        # result = await asyncio.gather(*[factorial("A", 2), factorial("B", 3), factorial("C", 4)])
    
        print(result)
    
    asyncio.run(main())
    
    # Task A: Compute factorial(2)...
    # Task B: Compute factorial(2)...
    # Task C: Compute factorial(2)...
    # Task A: factorial(2) = 2
    # Task B: Compute factorial(3)...
    # Task C: Compute factorial(3)...
    # Task B: factorial(3) = 6
    # Task C: Compute factorial(4)...
    # Task C: factorial(4) = 24
    # [2, 6, 24]

    등록된 모든 코루틴 함수가 성공했다면 등록된 순서대로 결과값이 리스트로 반환됩니다. asyncio.gather() 함수의 세번째 파라미터는 return_exceptions로 기본값은 False이며 등록된 코루틴 함수 중 1개라도 에러가 발생하면 즉시 중단시키고 에러를 발생시킵니다. True로 설정하면 에러가 난 코루틴 함수는 실행되지 않고 결과 리스트에 에러 정보를 담아서 반환합니다.

    import asyncio
    
    async def factorial(name, number):
        f = 1
        if number == 4:
            raise ValueError('에러')
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
        return f
    
    async def main():
        # 동시에 3개를 예약
        result = await asyncio.gather(
            factorial("A", 2),
            factorial("B", 3),
            factorial("C", 4),
            return_exceptions=True
        )
    
        print(result)
    
    asyncio.run(main())
    
    # Task A: Compute factorial(2)...
    # Task B: Compute factorial(2)...
    # Task A: factorial(2) = 2
    # Task B: Compute factorial(3)...
    # Task B: factorial(3) = 6
    # [2, 6, ValueError('에러')]

    시간제한두기

    만약 코루틴 함수에 시간제한을 두고 싶다면 asyncio.wait_for() 함수를 사용하여 아래와 같이 사용할 수 있습니다.

    import asyncio
    
    async def eternity():
        await asyncio.sleep(4)
        print('살았다.')
    
    async def main():
        try:
            # 대기 시간이 1초가 넘어가면 에러처리
            await asyncio.wait_for(eternity(), timeout=1.0)
        except asyncio.TimeoutError:
            print('timeout!')
    
    asyncio.run(main())
    # timeout!

    해당 함수는 실제로 취소될 때까지 대기하므로 총 대기 시간이 timeout을 넘을 수도 있습니다.

    awaitable

    await 표현식에서 사용할 수 있는 객체를 awaitable 객체라고 칭합니다. 이러한 객체는 코루틴(coroutine), 태스크(task), 퓨처(future)가 있습니다.

    코루틴

    코루틴은 awaitable 객체이므로 다른 코루틴에서 호출할 수 있습니다.

    async def nested():
        return 42
    
    async def main():
        # 코루틴 함수를 await을 안붙이고 호출하면 호출되지 않음
        # 코루틴은 생성이 되지만 await하지 않음 -> 그래서 아무것도 실행하지 않는다.
        nested()
    
        # 42 반환됨
        print(await nested()) 
    
    asyncio.run(main())

    여기서 코루틴이란 용어는 코루틴 함수와 코루틴 객체라는 두 의미를 내포하는 것을 알 수 있습니다.

    • 코루틴 함수: async def ~~ 로 정의된 함수
    • 코루틴 객체: 코루틴 함수를 호출하고 반환되는 객체

    asyncio는 이전 버전인 제너레이터 기반 코루틴도 지원한다고 하지만 3.10부터는 사라질 기술이므로 사용하지 않는 것이 좋아 보입니다.

    태스크

    태스크는 코루틴을 동시에 예약하는데 사용됩니다. 위 예제에서 사용했던 asyncio.create_task()와 같은 함수를 사용해 코루틴이 실행되도록 자동으로 예약합니다.

    async def nested():
        return 42
    
    async def main():
        # nested()함수가 동시에 실행되도록 예약
        task = asyncio.create_task(nested())
    
        # task 변수를 취소하거나 완료될때까지 대기
        await task
    
    asyncio.run(main())

    퓨처

    퓨처는 비동기 연산의 최종 결과를 나타내는 저수준 awaitable 객체입니다. 현재까지 asyncio를 사용하는데 퓨처 객체를 어디서 사용해야 되는지 정확하게 이해가 되지 않습니다. asyncio와 퓨처를 사용하는 좋은 예는 loop.run_in_executor() 입니다.

    아래 예제는 run_in_executor() 예제인데 보면 알겠지만 asyncio를 사용한 코루틴 동작이 아닌, 멀티 스레드나 멀티 프로세스를 활용한 방식입니다. 코루틴이 아니므로 당연히 문맥 교환 비용이 발생합니다. 장점이라면 일반 함수를 수정하지 않고 비동기로 동작하게끔 만들 수 있습니다.

    • 아직까지 퓨처 객체만 사용해서 스레드나 프로세스를 사용하는 방식보다 asyncio + 퓨처를 사용하는데에 이점이 무엇인지를 모르겠음..
    import asyncio
    import concurrent.futures
    
    def blocking_io():
        # 로깅과 같은 파일 작업은 이벤트 루프를 차단할 수 있으므로 스레드 풀에서 실행합니다.
        with open('/dev/urandom', 'rb') as f:
            return f.read(100)
    
    def cpu_bound():
        # CPU bound 작업은 이벤트 루프를 차단하므로 프로세스 풀에서 실행하는 것이 좋습니다.
        return sum(i * i for i in range(10 ** 7))
    
    async def main():
        loop = asyncio.get_running_loop()
    
        # 첫 번째 인자가 None이면 자체적으로 ThreadPoolExecutor를 생성 - 테스트 했을 때, worker가 41개 등록됐음 
        result = await loop.run_in_executor(None, blocking_io)
        print('default thread pool', result)
    
        with concurrent.futures.ThreadPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, blocking_io)
            print('custom thread pool', result)
    
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, cpu_bound)
            print('custom process pool', result)
    
    asyncio.run(main())

    관련 문서: https://docs.python.org/ko/3.8/library/asyncio-future.html#asyncio.Future

    asyncio를 사용해 API 호출하기

    많이 사용하는 requests 모듈은 코루틴 기반으로 만들어진 라이브러리가 아니기 때문에 asyncio를 사용해 API 호출을 하려면 aiohttp 라는 새로운 라이브러리를 사용해야 합니다.

    아래는 aiohttp를 사용해 API를 호출한 예시입니다. 아래에 나오는 uri는 경로에 숫자를 입력하면 입력된 숫자(초 단위) 만큼 결과 반환이 지연됩니다.

    from time import time
    import aiohttp
    import asyncio
    
    async def call(session, delay):
        print(f'call {delay}짜리 시작')
        # 전달 받은 session으로 async 처리
        async with session.get(f'http://httpbin.org/delay/{delay}') as response:
            result = await response.json()
            print(f'call {delay}짜리 끝')
            return result
    
    async def main():
        # 타이머 시작
        start = time()
    
        # 비동기 처리 세션 생성
        async with aiohttp.ClientSession() as session:
            # 작업 동시 예약
            two_task = asyncio.create_task(call(session, 2))
            three_task = asyncio.create_task(call(session, 3))
    
            # await asyncio.sleep(1)
            # 다른 작업도 가능
    
            result1 = await two_task
            result2 = await three_task
    
            end = time()
            print(end - start)
    
    asyncio.run(main())
    
    # call 2짜리 시작
    # call 3짜리 시작
    # call 2짜리 끝
    # call 3짜리 끝
    # 3.465127944946289

    더 자세한 예시는 https://docs.aiohttp.org/en/stable/에 잘 설명되어 있습니다.

    asyncio를 사용해 Database 사용하기

    현재 asyncio를 지원하는 라이브러리는 PostgreSQL, MySQL, SQLite 3가지입니다. 여기서 asyncpg, aiomysql, aiosqlite 와 같이 raw 쿼리를 작성해서 사용할 수도 있고 이를 한 번 더 감싼 https://github.com/encode/databases 모듈로 간편하게 사용할 수도 있습니다.

    아래는 https://github.com/encode/databases 에서 sqlite를 접속하여 사용하는 예시입니다.

    # Create a database instance, and connect to it.
    from databases import Database
    database = Database('sqlite:///example.db')
    await database.connect()
    
    # Create a table.
    query = """CREATE TABLE HighScores (id INTEGER PRIMARY KEY, name VARCHAR(100), score INTEGER)"""
    await database.execute(query=query)
    
    # Insert some data.
    query = "INSERT INTO HighScores(name, score) VALUES (:name, :score)"
    values = [
        {"name": "Daisy", "score": 92},
        {"name": "Neil", "score": 87},
        {"name": "Carol", "score": 43},
    ]
    await database.execute_many(query=query, values=values)
    
    # Run a database query.
    query = "SELECT * FROM HighScores"
    rows = await database.fetch_all(query=query)
    print('High Scores:', rows)

    댓글