ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [spark] join 전략이란
    공부 2026. 4. 26. 16:01

    스파크에서 조인은 성능에 가장 큰 영향을 미치는 작업 중 하나입니다. 카탈리스트 옵티마이저(Catalyst Optimizer)는 비용 기반 최적화(CBO)와 규칙 기반 최적화를 통해 데이터의 통계 정보(크기, 분포 등)와 설정값을 분석하여 최적의 조인 전략을 자동으로 선택합니다.


    조인 방식의 두 가지 큰 분류

    전체 노드간 통신을 유발하는 전략 (Shuffle Join)

    서로 큰 테이블 간에 조인할 때 사용되는 방식입니다. 셔플 조인은 전체 노드 간 통신이 발생합니다. 조인에 사용한 특정 키나 키 집합을 어떤 노드가 가졌는지에 따라 해당 노드와 데이터를 공유합니다. 이런 통신 방식 때문에 네트워크는 복잡해지고 많은 자원을 사용합니다. 특히 데이터가 잘 나뉘어 있지 않다면 더 심해집니다.

    두 데이터셋을 조인 키를 기준으로 익스큐터로 셔플링하여 일치하는 행이 동일한 익스큐터 노드에 함께 배치되도록 합니다. 조인 키가 균등하게 분산되어 메모리에 맞지 않는 대용량 데이터셋에 적합합니다.

    노드 별 연산 전략 (Broadcast Join)

    DataFrame을 클러스터의 전체 워커 노드에 복제하는 방식입니다. 얼핏 보기에는 자원을 많이 사용할 것처럼 보이지만, 대규모 노드 간 통신이 발생하는 것은 첫 브로드캐스트 때뿐입니다. 그 이후로는 노드 사이에 추가적인 통신이 발생하지 않습니다. 브로드캐스트되는 데이터는 작은 데이터이고 큰 테이블은 익스큐터들로 분산되어 조인이 수행됩니다.

    작은 테이블(<10MB 기본값)을 큰 테이블에 조인할 때, 작은 테이블은 메모리에 로드되어 각 익스큐터로 전송됩니다. 하나 이상의 테이블이 작고 메모리에 저장할 수 있는 데이터셋에 적합합니다.


    조인 전략 종류

    1. 브로드캐스트 해시 조인 (Broadcast Hash Join, BHJ)

    작은 테이블을 각 노드별로 복사하여 파티션별로 조인을 수행합니다. 셔플이 전혀 발생하지 않아 가장 빠른 조인 전략입니다.

    동작 원리:

    1. 작은 테이블 전체를 드라이버가 수집합니다.
    2. 드라이버가 해시 테이블을 생성한 후 모든 익스큐터에 브로드캐스트합니다.
    3. 각 익스큐터는 큰 테이블의 파티션을 스캔하면서 해시 테이블을 조회하여 조인합니다.

    적합한 경우:

    • 한쪽 테이블이 작을 때 (기본값 10MB 이하, spark.sql.autoBroadcastJoinThreshold 설정 가능)
    • 셔플 비용을 줄이고 싶을 때

    부적합한 경우:

    • 브로드캐스트할 테이블이 너무 클 때 (드라이버 OOM 위험)
    • Full Outer Join일 때

    2. 셔플 해시 조인 (Shuffle Hash Join, SHJ)

    동일한 익스큐터 노드에서 동일한 조인 키 값으로 데이터를 이동한 후 해시 조인을 수행합니다. 테이블이 상대적으로 커서 Broadcast를 사용하면 드라이버 및 익스큐터 측 메모리 문제가 발생할 수 있을 때 사용됩니다. Shuffling과 Hashing이 모두 포함되므로 비용이 많이 드는 조인입니다.

    동작 원리:

    1. 두 데이터셋 모두 조인 키를 기준으로 셔플링합니다.
    2. 같은 키를 가진 레코드가 동일한 익스큐터에 모입니다.
    3. 작은 파티션으로 해시 테이블을 구성한 후 큰 파티션을 스캔하며 조인합니다.

    적합한 경우:

    • 두 테이블이 모두 커서 브로드캐스트는 불가하지만 작은 쪽 파티션이 메모리에 들어갈 때
    • 데이터가 조인 키로 이미 파티셔닝되어 있을 때

    부적합한 경우:

    • 셔플 중 데이터 스큐가 발생할 때
    • 해시 테이블을 유지하기 위한 메모리가 부족할 때

    3. 셔플 정렬 병합 조인 (Shuffle Sort-Merge Join, SMJ)

    스파크 2.3부터 기본 조인 전략입니다. 데이터를 셔플하여 동일한 익스큐터에 동일한 조인 키를 일치시킨 다음 정렬 병합 조인 작업을 수행합니다. Shuffle Hash Join에 비해서 클러스터에서 데이터 이동을 최소화하는 경향이 있습니다.

    동작 원리:

    1. 셔플 단계: 두 데이터셋을 조인 키 기준으로 같은 파티션으로 셔플합니다.
    2. 정렬 단계: 각 파티션 내에서 조인 키를 기준으로 정렬합니다.
    3. 병합 단계: 정렬된 두 파티션을 순서대로 스캔하면서 일치하는 키를 병합합니다.

    적합한 경우:

    • 두 테이블 모두 대용량일 때 (가장 일반적인 케이스)
    • 데이터가 정렬되어 있거나 조인 키로 파티셔닝되어 있을 때
    • 모든 조인 타입(INNER, LEFT, RIGHT, FULL OUTER) 지원

    부적합한 경우:

    • 등호 조인(equi-join)이 아닐 때
    • 작은 데이터셋을 조인할 때 (BHJ보다 느림)

    4. 브로드캐스트 중첩 루프 조인 (Broadcast Nested Loop Join, BNLJ)

    가장 작은 데이터셋이 모든 익스큐터에 브로드캐스트되고 두 데이터 사이에 중첩 루프 조인이 수행되는 방식입니다.

    적합한 경우:

    • 등호 조건이 없는 조인(non-equi join)
    • 크로스 조인 또는 세타 조인(theta join)

    특징:

    • O(n × m) 복잡도 (n, m은 두 테이블의 크기)
    • 성능이 매우 나빠질 수 있으므로 주의가 필요합니다.

    5. 카티전 프로덕트 (Cartesian Product / Cross Join)

    Shuffle and Replication Nested Loop Join이라고도 하며, 데이터셋이 브로드캐스트되지 않는다는 점을 제외하면 BNLJ와 매우 유사하게 동작합니다.


    조인 전략 비교

    전략 셔플 발생 정렬 필요 메모리 요구 적합한 상황
    BHJ 없음 없음 높음 (작은 테이블 전체) 한쪽이 소용량 (<10MB)
    SHJ 있음 없음 중간 (파티션 단위) 중간 크기 테이블
    SMJ 있음 있음 낮음 두 테이블 모두 대용량
    BNLJ 없음 없음 높음 Non-equi join
    Cross Join 있음 없음 매우 높음 가급적 사용 금지

    조인 힌트 (Join Hints)

    스파크의 조인 전략에 더 많은 제어를 원한다면 힌트(Hint)를 사용할 수 있습니다. 옵티마이저가 최적의 전략을 자동으로 선택하지 못하는 경우나 명시적으로 전략을 지정하고 싶을 때 유용합니다.

    여러 힌트가 충돌할 경우 우선순위가 높은 힌트가 적용됩니다.

    BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL

    힌트 키워드 전략 별칭 우선순위
    BROADCAST Broadcast Hash Join BROADCASTJOIN, MAPJOIN 1 (최고)
    MERGE Sort-Merge Join SHUFFLE_MERGE, MERGEJOIN 2
    SHUFFLE_HASH Shuffle Hash Join - 3
    SHUFFLE_REPLICATE_NL Cartesian Product - 4 (최저)

    SparkSQL 예시

    -- 1. Broadcast Hash Join (BHJ) - 작은 테이블에 힌트 적용
    SELECT /*+ BROADCAST(customers) */
        o.order_id,
        o.amount,
        c.customer_name
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id;
    
    -- 별칭 사용 가능 (BROADCASTJOIN, MAPJOIN)
    SELECT /*+ BROADCASTJOIN(customers) */ *
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id;
    
    -- 2. Sort-Merge Join (SMJ)
    SELECT /*+ MERGE(orders) */
        o.order_id,
        o.amount,
        c.customer_name
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id;
    
    -- 3. Shuffle Hash Join (SHJ)
    SELECT /*+ SHUFFLE_HASH(orders) */
        o.order_id,
        o.amount,
        c.customer_name
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id;
    
    -- 4. 여러 테이블에 힌트 동시 적용
    SELECT /*+ BROADCAST(dim_product), BROADCAST(dim_region) */
        f.sales,
        p.product_name,
        r.region_name
    FROM fact_sales f
    JOIN dim_product p ON f.product_id = p.product_id
    JOIN dim_region r  ON f.region_id  = r.region_id;

    DataFrame API 예시 (PySpark)

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import broadcast
    
    spark = SparkSession.builder.appName("JoinExample").getOrCreate()
    
    orders_df    = spark.read.parquet("s3://data/orders/")
    customers_df = spark.read.parquet("s3://data/customers/")  # 작은 테이블
    
    # 1. Broadcast Hash Join - broadcast() 함수 사용 (가장 일반적)
    result_bhj = orders_df.join(
        broadcast(customers_df),
        orders_df["customer_id"] == customers_df["customer_id"],
        "inner"
    )
    
    # 2. Sort-Merge Join - hint() 메서드 사용
    result_smj = orders_df.join(
        customers_df.hint("merge"),
        "customer_id",
        "left"
    )
    
    # 3. Shuffle Hash Join - hint() 메서드 사용
    result_shj = orders_df.hint("shuffle_hash").join(
        customers_df,
        "customer_id",
        "inner"
    )
    
    # 자동 브로드캐스트 임계값 조정 (기본값: 10MB)
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)  # 50MB
    
    result_bhj.show()

    DataFrame API 예시 (Scala)

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.broadcast
    
    val spark = SparkSession.builder().appName("JoinExample").getOrCreate()
    import spark.implicits._
    
    val ordersDF    = spark.read.parquet("s3://data/orders/")
    val customersDF = spark.read.parquet("s3://data/customers/")
    
    // 1. Broadcast Hash Join
    val resultBHJ = ordersDF.join(
      broadcast(customersDF),
      ordersDF("customer_id") === customersDF("customer_id"),
      "inner"
    )
    
    // 2. Sort-Merge Join
    val resultSMJ = ordersDF.join(
      customersDF.hint("merge"),
      Seq("customer_id"),
      "left"
    )
    
    // 3. Shuffle Hash Join
    val resultSHJ = ordersDF.hint("shuffle_hash").join(
      customersDF,
      Seq("customer_id"),
      "inner"
    )

    문제점과 해소 방법

    1. 데이터 스큐 (Data Skew)

    데이터 스큐는 특정 조인 키의 데이터가 다른 키에 비해 비정상적으로 많아서 특정 파티션이 과부하 상태가 되는 현상입니다. 일부 태스크만 오랫동안 실행되고 나머지는 대기 상태가 되는 병목이 발생합니다.

    증상:

    • Spark UI에서 특정 파티션의 크기가 비정상적으로 큼
    • 특정 스테이지의 일부 태스크만 오랫동안 실행됨 (나머지는 이미 완료)
    • OOM(Out of Memory) 오류 발생

    해결 방법 1: AQE 스큐 조인 처리 (권장)

    스파크 3.0 이상에서는 AQE(Adaptive Query Execution)가 자동으로 스큐된 파티션을 감지하고 분할하여 처리합니다.

    # AQE 활성화 (Spark 3.2+ 에서 기본값 true)
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
    
    # 스큐 판단 기준: 파티션 크기 256MB 이상 AND 중앙값의 5배 이상
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5.0")

    해결 방법 2: 솔팅 (Salting)

    스큐된 키에 랜덤 접두사를 붙여서 여러 파티션으로 분산시키는 방법입니다.

    from pyspark.sql import functions as F
    from pyspark.sql.types import IntegerType
    
    SALT_NUM = 10  # 솔팅 분산 수
    
    # 큰 테이블(orders)에 랜덤 솔트 접두사 추가
    orders_salted = orders_df.withColumn(
        "salted_key",
        F.concat(
            (F.rand() * SALT_NUM).cast(IntegerType()).cast("string"),
            F.lit("_"),
            F.col("customer_id").cast("string")
        )
    )
    
    # 작은 테이블(customers)을 SALT_NUM배로 복제
    customers_salted = customers_df.withColumn(
        "salt",
        F.explode(F.array([F.lit(i) for i in range(SALT_NUM)]))
    ).withColumn(
        "salted_key",
        F.concat(
            F.col("salt").cast("string"),
            F.lit("_"),
            F.col("customer_id").cast("string")
        )
    )
    
    # 솔팅된 키로 Broadcast 조인 후 불필요 컬럼 제거
    result = orders_salted.join(
        broadcast(customers_salted),
        "salted_key",
        "inner"
    ).drop("salted_key", "salt")

    해결 방법 3: 스큐 데이터 분리 처리

    스큐된 키와 일반 키를 분리하여 각각 다른 전략으로 조인 후 합칩니다.

    SKEWED_KEY = "99999"  # 스큐된 customer_id
    
    # 스큐 데이터와 정상 데이터 분리
    skewed_orders = orders_df.filter(F.col("customer_id") == SKEWED_KEY)
    normal_orders  = orders_df.filter(F.col("customer_id") != SKEWED_KEY)
    
    # 스큐 데이터는 브로드캐스트 조인 (해당 customer 1건만 브로드캐스트)
    skewed_customer = customers_df.filter(F.col("customer_id") == SKEWED_KEY)
    result_skewed   = skewed_orders.join(broadcast(skewed_customer), "customer_id")
    
    # 정상 데이터는 Sort-Merge Join
    result_normal = normal_orders.join(customers_df, "customer_id")
    
    # 결과 합치기
    result = result_skewed.union(result_normal)

    2. OOM (Out of Memory) 오류

    원인 및 해결 방법:

    # 원인 1: 브로드캐스트 테이블이 너무 클 때
    # → autoBroadcastJoinThreshold를 낮추거나 -1로 설정하여 비활성화
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    # 원인 2: 셔플 파티션 수가 너무 적을 때
    # → shuffle.partitions를 늘려 파티션당 데이터 크기를 줄임
    spark.conf.set("spark.sql.shuffle.partitions", "800")
    
    # SparkSession 생성 시 통합 설정 예시
    spark = SparkSession.builder \
        .appName("JoinExample") \
        .config("spark.executor.memory", "8g") \
        .config("spark.sql.shuffle.partitions", "400") \
        .config("spark.sql.autoBroadcastJoinThreshold", "10m") \
        .getOrCreate()

    AQE (적응형 쿼리 실행, Adaptive Query Execution)

    AQE는 스파크 3.0부터 도입되고 3.2부터 기본 활성화된 기능으로, 런타임 통계를 기반으로 쿼리 실행 계획을 동적으로 최적화합니다. 정적으로 수립된 초기 실행 계획을 셔플 이후 수집된 실제 데이터 통계를 바탕으로 재최적화합니다.

    AQE의 주요 기능 4가지

      1. Sort-Merge Join → Broadcast Hash Join 자동 변환
      2. 런타임에 조인 한쪽의 크기가 브로드캐스트 임계값보다 작다고 판명되면 자동으로 BHJ로 전환합니다. 이를 통해 불필요한 셔플을 제거하여 성능을 크게 향상시킵니다.
      1. 셔플 후 파티션 자동 합치기 (Dynamic Coalescing)
      2. 셔플 후 파티션 수가 너무 많아 파티션당 데이터가 작은 경우, AQE가 자동으로 작은 파티션을 합쳐 태스크 오버헤드를 줄입니다. spark.sql.adaptive.coalescePartitions.enabled 설정으로 제어합니다.
      1. 데이터 스큐 자동 처리 (Dynamic Skew Handling)
      2. Sort-Merge Join 또는 Shuffle Hash Join 시 스큐된 파티션을 감지하고 자동으로 분할하여 균등하게 처리합니다. 파티션의 절대 크기가 256MB 이상이고 중앙값의 5배 이상인 경우 스큐로 판단합니다.
      1. 빈 관계 전파 (Empty Relation Propagation)
      2. 조인의 한쪽이 비어 있을 경우 나머지 조인을 스킵하여 성능을 향상시킵니다.

    AQE 설정 예시

    # AQE 관련 전체 설정 예시
    spark.conf.set("spark.sql.adaptive.enabled", "true")                               # AQE 활성화 (3.2+ 기본 true)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")            # 파티션 자동 합치기
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")                      # 스큐 조인 처리
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")  # 스큐 크기 임계값
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5.0")         # 스큐 배율 임계값
    spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "10m")             # AQE BHJ 자동 변환 임계값

    조인 전략 선택 가이드

    상황 권장 전략 이유
    작은 테이블 (<10MB) BHJ (broadcast) 셔플 없이 가장 빠름
    두 테이블 모두 대용량 SMJ (기본값) 안정적, 모든 조인 타입 지원
    중간 크기, 파티션 메모리 충분 SHJ 정렬 비용 없이 해시 조인
    Non-equi join (범위, 부등호 조건) BNLJ 등호 조건 없는 조인에만 사용 가능
    데이터 스큐 발생 AQE + Salting 편향된 파티션을 균등하게 분산 처리

    출처:

    댓글