ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Django] select_for_update를 사용해 안전하게 데이터 수정하기
    언어/파이썬 & 장고 2021. 5. 16. 16:20

    아래에서 설명할 때 사용한 장고는 1.11 버전입니다.

     

    테이블이 아래와 같이 있다고 가정합니다.

    아직 사용하지 않은 토큰을 유저가 사용했을 때, 해당 row에 유저의 id를 입력하도록 만든 테이블입니다.

    이를 위해 아래와 같이 프로그래밍을 했다고 가정합니다.

    from datetime import datetime
    
    class UserToken(models.Model):
        id = models.BigAutoField(primary_key=True, db_column='id')
        token = models.UUIDField()
        user_id = models.BigIntegerField()
        updated_at = models.DateTimeField(auto_now=True)
    
        class Meta:
            managed = False
            db_table = 'user_token'
    
    def update(user_id: int):
        user_token = UserToken.objects.filter(user_no__isnull=True).first()
        user_token.user_id = user_id
        user_token.updated_at = datetime.now()
        user_token.save()

    이렇게 만든 다음, 서버를 띄운 후 해당 함수가 호출이 된다면 user_id가 null인 데이터 중 가장 첫 번째 데이터에 user_id 값을 추가하게 됩니다.

    위와 같은 코드는 트래픽이 몰리지 않은 상황에서는 정상적으로 동작하지만 트래픽이 몰리게 된다면 동시성 문제가 발생할 수 있습니다. 위 테이블에서 user_id가 비어있는 데이터 중, pk가 1이 가장 첫 번째라고 할 때, 사용자 A와 B가 거의 동시에 해당 로직을 접근하게 된다면 user_token = UserToken.objects.filter(user_no__isnull=True).first() 해당 ORM에서 pk 1에 해당되는 객체를 사용자 A와 B 둘 다 가지게 되므로 둘 중 1명은 데이터가 유실됩니다.

    따라서 select_for_update() 메소드를 사용해 락을 사용해 수정해야 합니다.

    장고 1.11 버전에서는 select_for_update()의 파라미터로 nowait과 skip_locked이 있습니다. (장고 3.2에서는 of와 no_key도 있습니다.) nowait과 skip_locked의 기본값은 False로 동작합니다. nowait=False는 조회하고자 하는 데이터에 row 락이 잡혀있다면 해당 락이 풀릴 때까지 대기를 하게 되고 True일 경우, 에러를 발생시킵니다. skip_locked=True는 조회한 데이터가 락이 잡혀있을 경우 무시를 합니다. 이러한 속성 때문에 nowait과 skip_locked는 둘 중 1가지만 True로 사용할 수 있습니다. (둘 다 True는 에러)

    이제 해당 메소드를 사용해 위 로직을 변경해 봅니다. 우리가 원하는 로직은 접근한 데이터에 락이 잡혀있는 경우, user_id가 null인 다음 데이터를 가져오면 성공입니다.

    user_token = UserToken.objects.select_for_update().filter(user_no__isnull=True).first()

    만약 이렇게 변경하게 되면, nowait=False이기 때문에 락이 잡힌 데이터를 만나게 된다면 대기를 하고 락이 풀렸을 때, 해당 데이터에 user_id를 덮어쓰게 됩니다. 즉, 문제가 해결되지 않은 상태입니다. 그러므로 nowait=True로 에러를 발생시켜 다음 데이터로 넘어갈 수 있도록 진행합니다.

    user_token = UserToken.objects.select_for_update(nowait=True).filter(user_no__isnull=True).first()

    이제 접근한 데이터가 이미 선점되어 락이 잡혀 있으면 다음 데이터로 넘어가야 하는데 위의 코드로는 불가능합니다. 여기서 사용한 방법은 먼저 user_id가 null인 데이터들의 pk 리스트를 가져오고, 해당 pk list 순회하면서 다시 조회하는 방법입니다. 조회 비용이 더 들지만 해당 방법이 가장 안전하다 판단하여 진행했습니다.

    def update(user_id: int):
        user_token_id_list = UserToken.objects.filter(user_no__isnull=True).values_list(
            'id', flat=True)
    
        for user_token_id in user_token_id_list:
            try:
                with transaction.atomic(using='default'):
                    user_token = UserToken.objects.select_for_update(nowait=True).get(
                        pk=user_token_id,
                        user_id__isnull=True
                    )
                    user_token.user_id = user_id
                    user_token.updated_at = datetime.now()
                    user_token.save()
                    break
            except OperationalError:
                continue
            except UserToken.DoesNotExist:
                continue
    
        if not user_token:
            print('사용 가능한 데이터가 없음')

    위 코드는 아래와 같은 순서로 진행됩니다.

    1. user_id가 null인 pk 리스트를 생성
    2. 해당 리스트 순회
    3. user_token 테이블에서 2번에서 순회 중인 pk와 user_id가 null인 데이터를 조회
    4. user_id와 update_at 수정 후 저장하고 반복문 종료
    5. 만약 조회한 데이터가 lock이 잡혀 있다면 (OperationalError가 발생) 다음 pk 순회
    6. 만약 조건에 맞는 데이터가 없는 경우(UserToken.DoesNotExist) 해당 pk는 다른 사용자가 이미 사용한 것이기 때문에 다음 pk로 순회

    해당 코드의 문제점은 만약 100명이 순간적으로 몰리게 된다면 100번째 사용자는 1, 2, 3, 4, 5, 6, 7, ...., 100번째까지 반복문이 진행된다는 점입니다. 이러한 문제를 해결하기 위해 리스트를 무작위로 섞어서 사용하도록 했습니다. 이러한 방법에 worst case는 물론 동일하지만 평균적으로는 문제를 해결할 수 있습니다.

    import random
    
    def update(user_id: int):
        user_token_id_list = UserToken.objects.filter(user_no__isnull=True).values_list(
            'id', flat=True)
    
        user_token_id_list = list(user_token_id_list)
        # 추출한 대상을 섞음
        random.shuffle(user_token_id_list)
    
        for user_token_id in user_token_id_list:
            try:
                with transaction.atomic(using='default'):
                    user_token = UserToken.objects.select_for_update(nowait=True).get(
                        pk=user_token_id,
                        user_id__isnull=True
                    )
                    user_token.user_id = user_id
                    user_token.updated_at = datetime.now()
                    user_token.save()
                    break
            except OperationalError:
                continue
            except UserToken.DoesNotExist:
                continue
    
        if not user_token:
            print('사용 가능한 데이터가 없음')
    

    위와 같이 user_id가 null인 리스트를 섞게 된다면 접근한 모든 사용자가 [1,2,3,4,...., 100] 이란 pk를 가지게 되는 것이 아닌, [1, 43, 25, 65, ..., 3], [95, 23, 67, ..., 69] 와 같이 무작위 순서를 가지게 되므로 n 번째 사용자에게 발생하는 기아 현상을 해결할 수 있습니다. (물론 여기서는 테이블에 저장된 데이터의 순서에 맞게 사용 처리한다 라는 조건이 없어서 사용할 수 있었습니다.)

    요약

    1. 동시성 문제가 발생할 것 같다면 락을 사용 (select_for_update)
    2. 테이블 조회 비용을 조금 더 가져가고 데이터 수정을 보장할 수 있는 안전한 코드를 작성하자.

    댓글