import psycopg2.extras
self.conn = psycopg2.connect(host='127.0.0.1', dbname='postgres', user='postgres', password='postgres', port=5432)
self.cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)


python 3.6 이하 버전에서는 datetime을 생성하면 timezone 정보가 없어 astimezone 같은 메소드를 실행하면 에러가 발생합니다. 지금부터 삽질해서 찾은 방법을 설명합니다. 


python 3.6은 datetime을 생성할 때 timezone을 지정하지 않아도 기본으로 내장으로 설정이 되어 있어 astimezone()를 실행해도 에러가 발생하지 않습니다. 하지만 3.6이하 버전은 datetime을 생성할 때 timezone을 지정하지 않으면 timezone이 None이므로 astimezone()을 실행하면 에러가 발생합니다. 따라서 결론은 datetime을 생성할 때 timezone을 지정하는 것입니다.


하지만 이미 생성된 datetime이라면 어떨까..

여기서 방법은 2가지로 나뉩니다.

datetime → time → datetime 으로 변환

먼저 datetime을 time타입으로 변환 시킨 다음, 이 값으로 timezone을 지정하여 datetime객체를 새로 생성하는 방법입니다.

이 방법은 아래에서 설명할 replace()함수가 지원되지 않는 python 버전에서 사용할 수 있습니다.

import datetime
import time

now_dt = datetime.datetime.now()
print(now_dt)

now_t = time.mktime(now_dt.timetuple()) + now_dt.microsecond / 1E6

print(now_t)

c_now_dt = datetime.datetime.fromtimestamp(now_t, datetime.timezone.utc)
print(c_now_dt)

# 결과
# 2017-10-15 18:45:32.357988
# 1508060732.357988
# 2017-10-15 09:45:32.357988+00:00

replace() 사용 (pyhton 3.3 이상)

위 방법은 상당히 귀찮으므로 python 버전이 3.3 이상이면 replace()함수를 사용합니다.

import datetime


now_dt = datetime.datetime.now()
print(now_dt)

c_now_dt = now_dt.replace(tzinfo=datetime.timezone.utc)
print(c_now_dt)

# 결과
# 2017-10-15 18:48:32.402106
# 2017-10-15 18:48:32.402106+00:00

주의점

위 두 결과를 보고 확인이 되듯이 replace()함수를 사용해 timezone을 입력하면 timezone은 변경이 되지만 utc 시간대로 변경되지 않습니다(!!)

아래는 python 3.6대에서 astimezone()도 추가하여 비교한 코드입니다.

import datetime
import time

now_dt = datetime.datetime.now()
print('현재 시간: ', now_dt)

now_t = time.mktime(now_dt.timetuple()) + now_dt.microsecond / 1E6
c_now_dt = datetime.datetime.fromtimestamp(now_t, datetime.timezone.utc)

print('datetime-> time -> datetime 변환시간: ', c_now_dt)

print('replace() 변환시간: ',now_dt.replace(tzinfo=datetime.timezone.utc))
print('astimezone() 변환시간: ',now_dt.astimezone(datetime.timezone.utc))


# 현재 시간:  2017-10-16 19:09:44.162545
# datetime-> time -> datetime 변환시간:  2017-10-16 10:09:44.162545+00:00
# replace() 변환시간:  2017-10-16 19:09:44.162545+00:00
# astimezone() 변환시간:  2017-10-16 10:09:44.162545+00:00


이것처럼 replace()를 사용할 때, utc 시간대로 변경되지 않는 다는 점을 숙지하고 사용해야 합니다.

결론

datetime 생성할떄 timezone과 timedeltal를 이용해서 필요한 타임존 설정을 해주면 모든게 해결됨. (근데 귀찮다)

파이썬 프로그래밍의 대부분은 데이터를 담은 클래스들을 정의하고 이 객체들이 연계되는 방법을 명시하는 일입니다. 모든 파이썬 클래스는 일종의 컨테이너로, 속성과 기능을 함께 캡슐화합니다. 파이썬은 데이터 관리용 내장 컨테이너 타입(리스트, 튜플, 세트, 딕셔너리)도 제공합니다.

시퀀스처럼 쓰임새가 간단한 클래스를 설계할 때는 파이썬의 내장 list 타입에서 상속받으려고 하는 게 당연합니다. 예를 들어 멤버의 빈도를 세는 메서드를 추가로 갖춘 커스텀 리스트 타입을 생성한다고 가정합니다.

class FrequencyList(list):
    def __init__(self, members):
        super().__init__(members)

    def frequency(self):
        counts = {}
        for item in self:
            counts.setdefault(item, 0)
            counts[item] += 1
        return counts


# list에서 상속받아 서브클래스를 만들어서 list의 표준 기능을 모두 갖춰 파이썬 프로그래머에게 시맨틱을 유지
# 또한 추가한 메서드로 필요한 커스텀 동작을 더할 수 있음


foo = FrequencyList(['a', 'b', 'a', 'c', 'b', 'a', 'd'])
print('Length is ', len(foo))
foo.pop()
print('After pop: ', repr(foo))
print('Frequency: ', foo.frequency())

# 결과
# Length is  7
# After pop:  ['a', 'b', 'a', 'c', 'b', 'a']
# Frequency:  {'a': 3, 'b': 2, 'c': 1}


이제 list의 서브클래스는 아니지만 인덱스로 접근할 수 있게 해서  list처럼 보이는 객체를 제공한다고 합니다. 예를 들어 바이너리 트리 클래스에 (list나 tuple같은) 시퀀스 시맨틱을 제공한다고 가정합니다.

class BinaryNode:
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right


# 이 클래스가 시퀀스 타입처럼 동작하기 위해 어떻게 해야 할까?
#  파이썬은 특별한 이름을 붙인 인스턴스 메서드로 컨테이너 동작을 구현

bar = [1, 2, 3]
bar[0]

# 위와 같이 시퀀스의 아이템을 인덱스로 접근하면 다음과 같이 해석
bar.__getitem__(0)


# BinaryNode클래스가 시퀀스처럼 동작하게 하려면 객체의 트리를 깊이 우선으로 탐색하는 __getitem__을 구현

class IndexableNode(BinaryNode):
    def _search(self, count, index):
        # ...
        # (found, count) 반환
        pass

    def __getitem__(self, index):
        found, _ = self._search(0, index)

        if not found:
            raise IndexError('Index out of range')

        return found.value

tree = IndexableNode(10,
                     left=IndexableNode(5,
                                        left=IndexableNode(2),
                                        right=IndexableNode(6,
                                                            right=IndexableNode(7)
                                                            )
                                        ),
                     right=IndexableNode(15, left=IndexableNode(11)))

# 트리탐색은 물론 list처럼 접근 가능

print(tree.left.right.right.value)
print(tree[0])


# 결과
7
2


문제는 __getitem__을 구현한 것만드로 기대하는 시퀀스 시맨틱을 모두 제공하지 못합니다.

len(tree)# 에러

이러한 문제처럼 count, index 메서드 등 프로그래머들이 사용하는 기본 메서드를 전부 정의하기는 생각보다 어렵습니다. 


이런 어려움을 피하기 위해 내장 collections.abc 모듈은 각 컨테이너 타입에 필요한 일반적인 메서드를 모두 제공하는 추상 기반 클래스들을 정의합니다. 이 추상 기반 클래스들에서 상속받아 서브클래스를 만들다가 깜빡 잊고 필수 메서드를 구현하지 않으면, 모듈이 에러를 뱉어냅니다.

from collections.abc import Sequence

class BadType(Sequence):
    pass

foo = BadType()

# 결과
# TypeError: Can't instantiate abstract class BadType with abstract methods __getitem__, __len__


앞에서 다룬 SequenceNode처럼 추상 기반 클래스가 요구하는 메서드를 모두 구혀ㅑㄴ하면 별도로 작업하지 않아도 클래스가 index와 count 같은 부가적인 메서드를 모두 제공합니다.

요약

쓰임새가 간단할 때는 list나 dict같은 파이썬의 컨테이너 타입에서 직접 상속

커스텀 컨테이너 타입을 올바르게 구현하는 데 필요한 많은 메서드에 주의

커스텀 컨테이너 타입이 collections.abc에 정의된 인터페이스에서 상속받게 만들어서 클래스가 필요한 인터페이스, 동작과 일치도록


사전작업

virtualenv로 django를 설치한 경우

Pycharm 내의 터미널을 사용하실 경우 선택된 interpreter에 따라 자동으로 변경되므로 아래와 같은 작업을 할 필요가 없습니다.

예시) djangoDjango라는 virtualenv를 생성한 경우

$ cd virtualenv
$ source bin/activate
(virtualDjango) $         # virtualDjango라는 virtualenv 터미널에 접속한 상태
# virtualenv 접속해제는 deactivate
$ 프로젝트로 이동

터미널

settings 파일이 쪼개져 있지 않은 경우

$ python3 manage.py inspectdb > models.py (아무이름이나 가능)

settings 파일이 쪼개져 있는 경우

$ python3 manage.py inspectdb --settings=djangotest(프로젝트이름).settings.dev > models.py (아무이름이나 가능)

결과

# This is an auto-generated Django model module.
# You'll have to do the following manually to clean this up:
#   * Rearrange models' order
#   * Make sure each model has one field with primary_key=True
#   * Make sure each ForeignKey has `on_delete` set to the desired behavior.
#   * Remove `managed = False` lines if you wish to allow Django to create, modify, and delete the table
# Feel free to rename the models, but don't rename db_table values or field names.
from __future__ import unicode_literals

from django.db import models



class TCart(models.Model):
    cart_no = models.BigAutoField(primary_key=True)
    user_no = models.BigIntegerField()
    guest_session_key = models.CharField(max_length=-1)
    save_mileage_amount = models.BigIntegerField()
    delivery_amount = models.DecimalField(max_digits=16, decimal_places=2)
    coupon_sale_amount = models.DecimalField(max_digits=16, decimal_places=2)
    coupon_save_amount = models.DecimalField(max_digits=16, decimal_places=2)
    pay_mileage = models.BigIntegerField()
    pay_deposit = models.BigIntegerField()
    device_type = models.IntegerField()
    insert_timestamp = models.DateTimeField()
    updated_timestamp = models.DateTimeField()
    is_deleted = models.CharField(max_length=1)

    class Meta:
        managed = False
        db_table = 't_cart'
        unique_together = (('user_no', 'guest_session_key'),)

....

주의사항

Django로 가져온 테이블의 컬럼 필드와 postgreSQL 내 테이블의 컬럼 필드가 다를 경우가 존재합니다. (예를 들어, varchar(20)인데 text필드로 migrate가 되었다던지..) 따라서 생성한 후 확인 하는 것을 권장합니다.


len()
은 컨테이너에 포함된 항목의 수를 계산합니다. 다시 말해 문자열일 경우 문자의 길이를 반환하고 컨테이너타입인 튜플, 딕셔너리, 리스트의 경우는 속해있는 값의 개수를 반환합니다.

반면 sys.getsizeof()는 객체의 메모리 사이즈를 바이트 단위로 반환합니다. 객체는 모든 유형이 될 수 있습니다.

파이썬 문자열 객체는 문자 당 1바이트의 간단한 문자 시퀀스가 아닙니다. 특히, sys.getsizeof()함수에는 가비지 컬렉터의 오버헤드(아마 1바이트)가 포함되어 출력됩니다. 

getsizeof()는 object의 __sizeof__ 메서드를 호출하고 object가 가비지 컬렉터에서 관리되는 경우에는 가비지 컬렉터의 오버 헤드를 추가합니다.

예시

# python 3.4
import sys

en = 'a'
ko = 'ㅁ'

print('len()')
print(len(en))
print(len(ko))

# 결과
# 1
# 1

print('encode - utf8')
print(en.encode('utf-8'))
print(ko.encode('utf-8'))


# 결과
# b'a'
# b'\xe3\x85\x81'

print('sys.getsizeof()')
print(sys.getsizeof(''))
print(sys.getsizeof(en))
print(sys.getsizeof(ko))
 
# 결과
# 49
# 50
# 76


len()함수로 알파벳과 한글을 비교하면 길이가 1이 나옵니다. 파이썬3.x는 기본 유니코드를 utf-8로 사용합니다. 따라서 한글이나 다른 문자가 있으면 utf8로 해석을 합니다. 

이 문자열들을 utf-8로 encode하면 바이트 타입으로 변환이되고 알파벳은 1바이트, 한글은 3바이트로 표현이 됩니다.

마지막으로 sys.getsizeof()는 메모리에 실제로 올라가는 크기입니다. ''와 같이 빈 문자열도 49바이트가 차지하게 됩니다.


유니 코드 문자열의 경우 문자 별 크기는 최대 2 또는 4가됩니다 (컴파일 옵션에 따라 다름). Python 3.3 및 이후 버전에서 유니 코드 문자열은 문자열의 내용에 따라 문자 당 1에서 4 바이트를 차지합니다.

pyximport는 Cython의 한 부분입니다. import를 해서 사용할 수 있습니다. 만약 특별한 C 라이브러리를 요구하지 않거나 특별한 setup을 빌드하기를 원하지 않으면 pyximport 모듈을 사용하면 됩니다. pyximport을 import하면 setup.py을 작성하지 않고 .pyx파일을 직접 로드할 수 있습니다.

import pyximport; pyximport.install()
>>> import helloworld
Hello World


파이썬이 아닌 다른 언어를 사용했다면 아마도 urllib와 urllib2는 사용하기 쉽고, 코드가 많지 않으며, 성능이 저 좋다는 생각이 들 것입니다. 그래서 생각했던 것입니다. 그러나 Requests 패키지는 믿을 수 없을 만큼 유용하고 짧아서 모든 사람이 사용해야 하는 패키지 중 하나입니다.


requests 모듈을 Rest API를 지원합니다.

import requests
...

resp = requests.get('http://www.mywebsite.com/user')
resp = requests.post('http://www.mywebsite.com/user')
resp = requests.put('http://www.mywebsite.com/user/put')
resp = requests.delete('http://www.mywebsite.com/user/delete')


GET / POST가 매개 변수를 다시 인코딩 할 필요가 없는지 여부와 관계없이 dictionary를 인수로 사용하기만하면 됩니다.

userdata = {"firstname": "John", "lastname": "Doe", "password": "jdoe123"}
resp = requests.post('http://www.mywebsite.com/user', data=userdata)


게다가 json 디코더가 내장되어 있습니다 (다시말해, json.loads()와 같이 추가적으로 json 모듈을 사용하지 않아도 됩니다)

resp.json()


또는 response 데이터가 text라면 아래와 같이 사용할 수 있습니다.

resp.text


위의 설명은 아주 간단한 것만 설명한 내용입니다. 다음은 requests의 기능 list입니다.

  • International Domains and URLs
  • Keep-Alive & Connection Pooling
  • Sessions with Cookie Persistence
  • Browser-style SSL Verification
  • Basic/Digest Authentication
  • Elegant Key/Value Cookies
  • Automatic Decompression
  • Unicode Response Bodies
  • Multipart File Uploads
  • Connection Timeouts
  • .netrc support
  • List item
  • Python 2.6—3.4
  • Thread-safe.


PostgreSQL은 데이터를 관리, 구성, 질의 및 검색하는 데 매우 뛰어나지만 Insert 자체가 매우 느릴 수 있습니다. PostgreSQL에서 가장 빠른 Insert 방법은 COPY문을 사용하는 것입니다. 응용 프로그램이 PostgreSQL을 사용할 수 있는 권한이 있다고 해도 소프트웨어 구성 요소 간에 엄격한 기능 분리를 유지하는 관점에 있어서는 COPY문을 사용하는 것을 권하지 않습니다. 다음은 COPY문을 제외한 100,000개의 row들을 insert할 때 찾은 효율적인 방법을 소개합니다.

Test 구성

테스트는 아래의 테이블에 insert하는 데 걸리는 시간을 기준으로 합니다.

CREATE TABLE upload_time_test(
        uuid uuid primary key default uuid_generate_v4(),
        created timestamp with time zone not null default now(),
        text text not null,
        properties hstore not null default ''::hstore
    );

    GRANT ALL ON upload_time_test TO test;


다음은 psycopg2의 연결을 담당하는 함수 부분입니다.

def connect():
    connection= psycopg2.connect(host=HOST,database=DATABASE,user=USER,password=PASSWORD)
    psycopg2.extras.register_hstore(connection)
    return connection
def execute(sql,params={}):
    with connect() as connection:
        with connection.cursor() as cursor:
            cursor.execute(sql,params)

1000개 row insert

아래 Tester 클래스는 인스턴스화 할 때마다 샘플 테이블을 파괴하고 다시 만들면서 진행합니다. 데이터베이스에 row를 insert하는 세 가지 방법을 설명합니다. 각 기능은 서로 다른 기술을 기반으로 설명되어 있습니다.

slowInsert()는 각 행에 대해 새 데이터베이스 연결을 만들기 때문에 가장 느립니다.

insert()는 보통 사용하는 방법입니다. 하나의 연결을 만들고 각 연결에 다시 연결하는 방식입니다. 이 방법은 보통 psycopg2의 executemany()가 수행하는 작업입니다.

fastInsert()는 새로운 방법으로 unnest()를 사용하여 psycopg2를 통해 전달된 배열 집합을 푸는 방식입니다.

class Tester():
    SINGLE_INSERT = """
        INSERT INTO upload_time_test(text,properties)
         VALUES (%(text)s, %(properties)s)
        """
    def __init__(self, count):
        execute(SETUP_SQL)
        self.count = count
        self.data = [
            {
                'text': 'Some text',
                'properties': {"key": "value"},
            }
            for i in range(count)
        ]
    def slowInsert(self):
        '''
            Creates a new connection for each insertion
        '''
        for row in self.data:
            text = row['text']
            properties = row['properties']
            execute(SINGLE_INSERT, locals())
    def insert(self):
        '''
            One connection.
            Multiple queries.
        '''
        with connect() as connection:
            with connection.cursor() as cursor:
                for row in self.data:
                    text = row['text']
                    properties = row['properties']
                    cursor.execute(SINGLE_INSERT, locals())
    def fastInsert(self):
        '''
            One connection, one query.
        '''
        sql = '''
            INSERT INTO upload_time_test(text,properties)
              SELECT unnest(ARRAY '%(texts)s' ) ,
                     unnest(ARRAY '%(properties)s')
        '''
        texts = [r['text'] for r in self.data]
        properties = [r['properties'] for r in self.data]
        execute(sql, locals())

tester = Tester(1000)
    with timer('slow'):
        tester.slowInsert()
    with timer('normal'):
        tester.insert()
    with timer('fast'):
        tester.fastInsert()
 
# 결과
# slow: 7.160489320755005 second(s)
# normal: 0.1441025733947754 second(s)
# fast: 0.042119503021240234 second(s)

위 테스트는 1000개의 데이터를 insert한다고 가정하고 진행했습니다. 

연결을 매 insert마다 새로이 하는 것보다 하나를 연결한 다음 재사용할 시 50배정도 빠른 것을 확인할 수 있습니다. 그리고 unnest를 사용하면 이보다 3배정도 빠른 것을 확인할 수 있습니다.

100,000개 row insert

slowInsert()는 매우 느리다는 것을 확인되었으니 insert()와 fastInsert()만 확인하도록 합니다.

ester=Tester(count=100000)
with timer('normal'):
    tester.insert()

tester=Tester(count=100000)
with timer('fast'):
    tester.fastInsert()
 
# 결과
# normal: 14.866096019744873 second(s)
# fast: 3.9566986560821533 second(s)

100,000개의 row를 insert할 때, unnest를 사용한 fastInsert()가 기본 insert()보다 4배정도 빠른 것을 확인이 가능합니다.


아래는 삽입개수와 삽입 속도를 나타내는 표입니다. (count / 총 소요시간)


 countbulknormal
504485.6947303867.856879
10010159.3896094847.897547
20015212.1862766057.106548
50027340.8427207081.049689
100033248.5453827694.657609
200035640.6957677070.777670
500041223.2004738027.790910
1000040948.7231067785.005392
2000042604.3879147568.314015
5000040795.2334707291.552509
10000027014.3541196872.935483

결론

동시에 다수의 row들을 insert하는데 unnest를 사용하면 다음과 같은 장점이 존재합니다.

  • 보통 수천 개의 행을 insert 할 때 일반 insert 반복문보다 상당히 빠름
  • unnest() 사용의 이점은 적어도 최대 50,000 개의 행을 증가시킴
  • 문자열 연결없이 (합리적으로) 직접 매개 변수화된 SQL을 작성할 수 있습니다. (리스트를 그대로 사용 가능)
  • 원격 데이터베이스에서 이 unnest를 시도하면, 네트워크를 통해 전송되는 데이터의 양이 현저히 줄어듬


먼저 프로젝트 구조가 아래와 같이 구성되어 있다고 가정합니다.

project	
	-- test
		+-- sub1
			-- __init__.py
			-- aa.py
			-- bb.py
		+-- sub2
			-- __init__.py
			-- cc.py
			-- dd.py
		-- ee.py
		-- ff.py
		-- __init__.py
	-- gg.py

ee.py에서 다른 모듈 참조 (하위 폴더 내 파일, 동일 폴더 내 파일 참조)

이와 같은 방법은 간단합니다.

# aa.py를 참조할 경우
from sub1 import aa
 
# ff.py를 참조할 경우
import ff 
# 또는
from . import ff # from . 은 동일폴더라는 의미를 나타냄.

상위 폴더 내 파일 참조

예를들어 aa.py에서 sub2에 있는 cc.py을 참조하는 방법은 두 가지가 있습니다.

1. 부모폴더의 절대경로를 참조 path에 추가

모듈의 시작부분의 import에 아래와 같은 코드를 추가하면 문제는 해결됩니다.

# aa.py
 
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
 
import cc.py
현재 모듈의 절대경로를 알아내어 상위 폴더 절대경로를 참조 path에 추가하는 방식입니다. 위에 있는 코드는 1단계 상위 폴더의 경로를 추가할 때 사용합니다.
만약 aa.py에서 gg.py를 참조한다고 하면 2단계 상위 폴더 경로를 추가해야 하므로 아래와 같이 코드가 길어집니다.


# gg.py

import os
import sys

sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))))

import gg.py


2. 시스템의 환경변수 PYTHONPATH에 프로젝트 추가

이 방법은 모든 파이썬 프로젝트에서 built-in 모듈을 그냥 import 할 수 있는 것과 마찬가지의 원리로 시스템의 파이썬 컴파일러가 기본적으로 참조하게 될 패키지 모듈에 자신의 프로젝트를 추가하는 형태입니다.

윈도우

제어판 - 시스템 - 고급 - 환경변수 에 가서 PYTHON_PATH를 편집하여 자신의 프로젝트 홈 폴더를 맨 뒤에 붙여주면 됩니다.

리눅스

추가할 프로젝트의 절대경로가 /home/user/project일 경우, 홈 폴더의 .bash_profile 에 아래와 같은 코드를 추가하여 시스템 환경변수를 변경하는 것입니다.

$ vi ~/bash_profile

========= bash_profile =========
...
PYTHONPATH=$PYTHONPATH:/home/user/test
export PYTHONPATH
============================
$ source ~/bash_profile


  1. dd 2018.03.09 02:16 신고

    코딩으로 블로그할꺼면 티스토리 복붙은 좀 해제하고 해라

Python3.2 에서 concurrent.futures 모듈이 추가되었습니다. 이 모듈은 멀티스레드와 멀티 프로세스에 대한 고수준 API를 포함하고 있으며 스레드 풀/프로세스 풀 기반의 동작을 기본적으로 지원합니다.

파이썬 제약 : GIL

Python은 두 개 이상의 스레드가 동시에 실행될 때 두 개 이상의 스레드가 하나의 자원을 동시에 액세스할 때 발생할 수 있는 문제점을 방지하기 위해 GIL(Global Internal Lock)이라는 것을 도입했습니다. 즉, 스레드가 실행될 때, 프로그램 내의 리소스 전체에 락이 걸립니다. 결국 Python 구현에서는 동시에 몇 개의 스레드가 실행이 되던 간에 GIL에 의해서 한 번에 하나의 스레드만 실행됩니다. 멀티 스레드의 경우 문맥교환(Context Switch)에 필요한 리소스까지 고려하면 단일 스레드보다 성능이 떨어지게 되는 것을 확인할 수 있습니다. GIL은 아주 오래전부터 파이썬의 약점으로 지적받았습니다. GIL 때문에 멀티스레드를 통한 분산처리는 파이썬 내에서 의미가 없으며, 분산처리를 통해 성능의 이익을 보려면 멀티 프로세스를 사용해야 합니다. 하지만 프로세스를 추가로 생성하는 것은 OS입장에서는 매우 비용이 많이 드는 일이며, 특히 윈도우 환경에서는 어지간히 커다란 작업이 아니면 프로세스 생성에 드는 시간이 가장 큰 병목이 될 가능성이 있습니다.

concurrent.futures 모듈

 concurrent.futures 모듈은 별도 규격의 스레드 객체를 작성하지 않고 함수 호출을 객체화하여 다른 스레드나 다른 프로세스에서 이를 실행할 수 있게 해줍니다. 이때 중심 역할을 하는 것이 Executor 클래스입니다. Executor 클래스는 다시 ThreadPoolExecutor와 ProcessPoolExecutor로 나뉘는데 두 클래스의 차이는 동시성 작업을 멀티 스레드로 처리하느냐, 멀티 프로세스로 처리하느냐만 있지 거의 동일한 기능을 제공합니다.

Future 

 executor를 이용한 동시성 처리는 호출해야 할 함수와 그에 전달될 인자들을 executor에 넘겨주는 것으로 시작되는데, executor의 해당 메소드는 다른 스레드의 리턴을 기다릴 필요가 없으므로 바로 리턴하게 됩니다. 이 때 리턴되는 객체가 Future 객체이며, 이 객체의 상태를 조사하여 완료 여부를 확인하거나, 해당 객체 내 작업이 완료되기를 기다리거나 혹은 미리 콜백을 넘겨놓아둘 수도 있습니다.

Executor  

 Executor 객체는 풀 기반으로 작업을 관리합니다. 초기화 시에 몇 개의 worker가 사용될 것인지를 정해주면 전달되는 작업들을 큐에 넣고 worker pool에서 사용 가능한 worker로 하여금 작업을 처리하게 합니다.

submit(fn, *args, **kwargs)

함수 fn에 대해 주어진 인자들을 전달하여 실행할 수 있는 Future 객체를 리턴합니다. 해당 함수는 호출 즉시 스케줄링됩니다. 

with ThreadPoolExcutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None)

 일반함수 map과 동일하나, 각 호출은 병렬적으로 일어납니다. 만약 타임아웃 값이 지정된 경우, 맵핑 작업이 완료되지 않은 호출이 있으면 TimeoutError가 일어납니다. 입력데이터와 동작함수를 짝지어서 바로 스케줄링하도록 합니다. map()함수는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 됩니다.

 shutdown(wait=True)

 executor에게 종료 시그널을 보냅니다. 시그널을 받은 executor는 실행 중 및 대기 중인 모든 future에 대해 리소스를 정리합니다. shutdown 후에 submit이나 map을 호출하면 런타임에러가 발생합니다.만약 wait 값이 True로 정해지면 진행 및 대기중이던 작업이 종료된 후에 shutdown이 일어나고, 그 때까지 해당 함수는 리턴을 보류하게 됩니다. 만약 강제 shutdown을 피하고 싶다면 with 구문 내에서 사용합니다.

import shutil
with ThreadPoolExcutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

예제

아래는 0.1초마다 처리해야할 데이터를 수집하고 이를 멀티프로세스에서 처리하는 예제입니다.

from concurrent.futures import ProcessPoolExecutor
from time import sleep

def process(*args):
    result = # 입력받은 인자들을 처리한다...
    time.sleep(1)
    print(result)

def main():
	exe = ProcessPoolExecutor(max_workers=4) as exe:
	while true:
    	cont, x = collect_arguments()
	    if not cont:
    	    break
	    if x:
    	    exe.submit(process, *x)
	    else:
    	    sleep(0.1)
	exe.shutdown(wait=True)
 
if __name__ == "__main__":
	main()

위 코드는 일종의 데몬으로 다음과 같이 동작합니다.

  1. 매 입력된 데이터를 처리하는 함수는 process(). 이 함수는 처리가 끝나면 1초간 대기했다가 결과를 화면에 출력.
  2. 최대 4개까지의 프로세스를 사용할 수 있도록 executor를 생성
  3. 데이터를 수집하여 계속 진행할 것인지 여부를 확인하고, 진행한다면 수집된 값들을 처리하도록 executor에게 요청
  4. 무한 루프 내에서도 약간 텀을 줌
  5. 더이상 처리하지 않는다면 루프에서 빠져나오는데, 이 때 executor는 아직 실행되고 있는 작업들이 있으면 이들이 완료될 때까지 기다린 후 종료
  6. 프로그램 종료


여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 worker는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 됩니다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 합니다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없습니다. 

간단하게 말해 멀티프로세스를 실행하기 위해선 해당 파일을 직접적으로 실행해야 합니다. 다른 파일에서 호출하는 형식으로 진행하면 예상치 않게 동작할 수도 있습니다. (현재 확인한 바로는 shutdown()나 wait()이 없으면 전체가 실행이 되긴 합니다. 근데 shutdown()나 wait()이 존재하면 shutdown()나 wait()등 이 동작하지 않고 해당 부분에서 멈춥니다.) 


만약 분산처리와 같이 worker가 처리해준 작업의 결과들을 다시 메인 스레드에서 넘겨받아 사용하려면 다음 예제에서 확인할 수 있습니다.

from concurrent import futures
total_result = 0
with futures.ThreadPoolExecutor(max_workers=4) as exe:
    fs = exe.map(process, list(range(1, 10000, 1000)))
    for done in futures.as_completed(fs):
        result = done.result()
        print(result)
        total_result += result
print(total_result)

 

executor의 map() 메소드는 처리용 함수와 입력 값의 집합을 이용해서 한꺼번에 작업을 생성합니다. 그리고 Future의 집합을 리턴하는데, as_completed 함수는 작업들을 기다리면서, 집합 내에서 종료된 것들부터 차례로 이터레이션해줍니다.

위 예제는 개별로 요청을 확인하는 예제이고, 아래는 요청한 작업 전체를 기다리다가 한꺼번에 처리하는 예제입니다.

with futures.ThreadPoolExecutor(max_workers=10) as exe:
    fs = exe.map(process, ARGUMENTS)
    (done, not_done) = futures.wait(fs, timeout=2)
    total_result = sum([f.result() for f in done])

주의점

concurrent.futures 모듈로 멀티프로세스를 사용할 때 주의해야할 점은 객체를 생성한 후 하나의 객체를 여러 프로세스에서 접근하지 않도록 해야 합니다. 만약 하나의 객체에 여러 프로세스가 접근할 경우 ssl error decryption failed or bad record mac 가 발생하게 됩니다. 

from concurrent.futures import ProcessPoolExecutor 
 
def concurrent(list):
    for i in list:
        query = 'insert into test (id) values ({0})'.format(list[i])
        cursor.execute(query)
    conn.commit()
 
db = Database() 
cursor, conn = db.connect() # 입력된 정보를 바탕으로 db 연결을 하여 cursor, connection을 반환해주는 함수가 있다고 가정
_list = ['1','11','111','1111','11111','111111']
 
 
pool = ProcessPoolExecutor(max_workers=4)
pool.map(concurrent, _list)
 
# 실행 시 ssl error decryption failed or bad record mac 에러 발생


아래와 같이 여러 프로세스가 각자의 객체를 갖도록 코딩해야 합니다.

from concurrent.futures import ProcessPoolExecutor 
def concurrent(list):
    cursor, conn = db.connect() # 각 프로세스마다 다른 데이터베이스 객체 생성
    for i in list:
        query = 'insert into test (id) values ({0})'.format(list[i])
        cursor.execute(query)
    conn.commit()
  
db = Database()
 
_list = ['1','11','111','1111','11111','111111']
 
pool = ProcessPoolExecutor(max_workers=4)
pool.map(concurrent, _list)


+ Recent posts