ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] 메모리 관리
    공부 2026. 4. 27. 21:21

    1. Unified Memory Manager 구조

    Spark 1.6부터 도입된 UnifiedMemoryManager는 Executor JVM 힙을 세 가지 영역으로 구분합니다.

    메모리 영역 구조

    ┌──────────────────────────────────────────────────────────┐
    │                  Executor JVM Heap (예: 20GB)             │
    ├──────────────────────────────────────────────────────────┤
    │  Reserved Memory       │  300MB (고정, 변경 불가)          │
    ├──────────────────────────────────────────────────────────┤
    │  User Memory           │  Usable × (1 - 0.6) = 40%       │
    │  (사용자 코드, UDF 등)   │                                  │
    ├──────────────────────────────┬───────────────────────────┤
    │  Storage Memory  (30%)       │  Spark Memory             │
    │  캐시, 브로드캐스트 변수       │  Usable × 0.6 = 60%       │
    ├──────────────────────────────┤                           │
    │  Execution Memory (30%)      │                           │
    │  셔플, 조인, 정렬, 집계        │                           │
    └──────────────────────────────┴───────────────────────────┘

    동적 메모리 차용 (Dynamic Borrowing)

    • Execution → Storage 강제 퇴출 가능: Execution 메모리가 부족하면 Storage 캐시 블록을 내보냄
    • Storage → Execution 유휴 공간 활용 가능: 단, 나중에 Execution이 요청하면 즉시 반환해야 함
    • spark.memory.storageFraction은 Storage의 최소 보장선으로 작용

    메모리 계산 공식

    # 실제 예시 (executor.memory = 20GB, 기본값 기준)
    executor_memory    = 20 * 1024  # MB
    reserved_memory    = 300        # MB (고정)
    usable_memory      = executor_memory - reserved_memory  # 19,900MB
    
    user_memory        = usable_memory * (1 - 0.6)   # 7,960MB (~40%)
    spark_memory       = usable_memory * 0.6          # 11,940MB (~60%)
    
    storage_memory     = spark_memory * 0.5           # 5,970MB
    execution_memory   = spark_memory * 0.5           # 5,970MB

    2. 핵심 설정 파라미터

    파라미터 기본값 설명
    spark.executor.memory 1g Executor당 JVM 힙 메모리
    spark.executor.memoryOverhead max(10%, 384MB) JVM 외부 오버헤드 (Metaspace, 네이티브 등)
    spark.driver.memory 1g Driver JVM 힙 메모리
    spark.driver.maxResultSize 1g collect() 결과 최대 크기
    spark.memory.fraction 0.6 힙 중 Spark가 관리하는 비율
    spark.memory.storageFraction 0.5 Spark Memory 중 Storage 최소 보장 비율
    spark.sql.shuffle.partitions 200 셔플 파티션 수
    spark.memory.offHeap.enabled false Off-Heap 메모리 활성화 여부
    spark.memory.offHeap.size 0 Off-Heap 크기 (bytes)

    설정 예시 (PySpark)

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("MemoryTuning") \
        .config("spark.executor.memory", "8g") \
        .config("spark.executor.memoryOverhead", "2g") \
        .config("spark.driver.memory", "4g") \
        .config("spark.driver.maxResultSize", "2g") \
        .config("spark.memory.fraction", "0.6") \
        .config("spark.memory.storageFraction", "0.5") \
        .config("spark.sql.shuffle.partitions", "400") \
        .getOrCreate()

    설정 예시 (Scala)

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    val conf = new SparkConf()
      .setAppName("MemoryTuning")
      .set("spark.executor.memory", "8g")
      .set("spark.executor.memoryOverhead", "2g")
      .set("spark.driver.memory", "4g")
      .set("spark.memory.fraction", "0.6")
      .set("spark.memory.storageFraction", "0.5")
      .set("spark.sql.shuffle.partitions", "400")
    
    val spark = SparkSession.builder().config(conf).getOrCreate()

    spark-submit 명령어 예시

    spark-submit \
      --class com.example.MyJob \
      --master yarn \
      --deploy-mode cluster \
      --executor-memory 8g \
      --driver-memory 4g \
      --conf "spark.executor.memoryOverhead=2g" \
      --conf "spark.memory.fraction=0.6" \
      --conf "spark.sql.shuffle.partitions=400" \
      myapp.jar

    3. 캐시와 퍼시스트 전략

    StorageLevel 종류 비교

    StorageLevel 메모리 디스크 직렬화 특징
    MEMORY_ONLY O X X RDD 기본값. 메모리 부족 시 재계산
    MEMORY_AND_DISK O O X DataFrame 기본값. 메모리 초과 시 디스크로
    MEMORY_ONLY_SER O (직렬화) X O 메모리 절약, CPU 오버헤드 증가
    MEMORY_AND_DISK_SER O (직렬화) O O 직렬화 + 디스크 조합, 안전한 선택
    DISK_ONLY X O O 메모리 매우 부족할 때 사용
    MEMORY_ONLY_2 O X X 2 복제본 유지, 내결함성 필요 시
    OFF_HEAP Off-heap X O JVM GC 부하 없음, offHeap 설정 필요

    StorageLevel 선택 가이드

    코드 예시 (PySpark)

    from pyspark import StorageLevel
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("CacheDemo").getOrCreate()
    df = spark.range(10_000_000)
    
    # cache() — DataFrame 기본: MEMORY_AND_DISK
    df.cache()
    
    # 명시적 StorageLevel 지정
    df.persist(StorageLevel.MEMORY_ONLY)          # 메모리만, 부족 시 재계산
    df.persist(StorageLevel.MEMORY_AND_DISK)      # 메모리 후 디스크 스필
    df.persist(StorageLevel.MEMORY_ONLY_SER)      # 직렬화로 메모리 절약
    df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # 직렬화 + 디스크 조합
    df.persist(StorageLevel.DISK_ONLY)            # 디스크만
    df.persist(StorageLevel.OFF_HEAP)             # Off-Heap (설정 필요)
    
    # 캐시 해제 (중요: 안 하면 메모리 누수)
    df.unpersist()

    코드 예시 (Scala)

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession.builder().appName("CacheDemo").getOrCreate()
    val df = spark.range(10000000L)
    
    df.cache()
    df.persist(StorageLevel.MEMORY_ONLY)
    df.persist(StorageLevel.MEMORY_AND_DISK)
    df.persist(StorageLevel.MEMORY_ONLY_SER)
    df.persist(StorageLevel.OFF_HEAP)
    
    df.unpersist()
    spark.stop()

    4. Off-Heap 메모리

    전체 메모리 구성

    Total Container Memory
    = spark.executor.memory         (JVM 힙)
    + spark.executor.memoryOverhead (JVM 외부 오버헤드)
    + spark.memory.offHeap.size     (Spark 관리 Off-Heap)
    + pyspark.executor.memory       (Python 프로세스, PySpark만)

    설정 및 사용 예시

    from pyspark.sql import SparkSession
    from pyspark import StorageLevel
    
    spark = SparkSession.builder \
        .appName("OffHeapDemo") \
        .config("spark.executor.memory", "4g") \
        .config("spark.memory.offHeap.enabled", "true") \
        .config("spark.memory.offHeap.size", "4294967296")  # 4GB in bytes
        .getOrCreate()
    
    df = spark.range(5_000_000)
    
    # OFF_HEAP StorageLevel로 캐시
    df.persist(StorageLevel.OFF_HEAP)
    df.count()  # 액션 실행으로 캐시 활성화
    
    print(df.count())  # 캐시에서 읽기
    df.unpersist()

    Off-Heap 권장 상황

    • 매우 큰 데이터셋을 캐시할 때 GC 부하 감소가 필요할 경우
    • 긴 GC pause로 성능이 저하될 때
    • Tungsten 기반 연산 최적화가 필요할 때

    5. GC 튜닝 (G1GC + Kryo 직렬화)

    Kryo 직렬화

    spark = SparkSession.builder \
        .appName("KryoDemo") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        .config("spark.kryoserializer.buffer", "64m") \
        .getOrCreate()
    val conf = new SparkConf()
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryoserializer.buffer.max", "512m")
      // 커스텀 클래스 등록으로 직렬화 크기 최소화
      .registerKryoClasses(Array(
        classOf[MyDataClass],
        classOf[AnotherClass]
      ))

    G1GC 설정

    Spark 4.0부터 G1GC가 기본 GC입니다. Spark 3.x에서는 명시적으로 설정해야 합니다.

    PySpark에서도 JVM Executor에 그대로 적용됩니다. 단, Python Worker 프로세스는 Python GC로 별도 관리되며 G1GC 설정의 영향을 받지 않습니다.

    spark-submit \
      --conf "spark.executor.extraJavaOptions=\
        -XX:+UseG1GC \
        -XX:G1HeapRegionSize=16m \
        -XX:+UseCompressedOops \
        -XX:InitiatingHeapOccupancyPercent=35 \
        -XX:ConcGCThreads=4 \
        -verbose:gc \
        -XX:+PrintGCDetails" \
      --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \
      myapp.jar
    spark = SparkSession.builder \
        .config("spark.executor.extraJavaOptions",
                "-XX:+UseG1GC "
                "-XX:G1HeapRegionSize=16m "
                "-XX:+UseCompressedOops "
                "-XX:InitiatingHeapOccupancyPercent=35") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .getOrCreate()

    GC 진단 가이드

    증상 원인 해결책
    Full GC 빈번 발생 Execution 메모리 부족 spark.memory.fraction 증가
    Minor GC 잦음 Eden 영역 부족 -Xmn 값 증가
    OldGen 거의 꽉 참 캐시 과다 사용 storageFraction 감소
    GC pause > 1초 G1GC 설정 미최적화 G1HeapRegionSize 증가

    6. OOM 해결 방법

    6-1. Executor OOM

    # 원인: 파티션당 데이터 과다, 셔플/조인 시 메모리 부족
    
    # 해결 1: 파티션 수 증가 (파티션당 128~256MB 목표)
    df_repartitioned = df.repartition(500)
    
    # 해결 2: 셔플 파티션 수 증가
    spark.conf.set("spark.sql.shuffle.partitions", "500")
    
    # 해결 3: Executor 메모리 및 Overhead 증가
    spark = SparkSession.builder \
        .config("spark.executor.memory", "12g") \
        .config("spark.executor.memoryOverhead", "2g") \
        .getOrCreate()
    
    # 해결 4: Execution 메모리 비율 증가 (Storage 줄이기)
    spark.conf.set("spark.memory.storageFraction", "0.3")

    6-2. Driver OOM

    # 잘못된 방법 — Driver OOM 위험
    result = df.collect()  # 전체 데이터를 Driver로 가져옴
    
    # 올바른 방법
    result = df.take(100)             # 일부만 가져오기
    df.write.parquet("/output/path")  # 스토리지에 직접 쓰기
    
    # Driver 메모리 설정
    spark = SparkSession.builder \
        .config("spark.driver.memory", "8g") \
        .config("spark.driver.maxResultSize", "4g") \
        .getOrCreate()
    
    # 브로드캐스트 임계값 조정
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")

    6-3. Skew (데이터 편향) 처리

    # AQE로 Skew Join 자동 처리 (Spark 3.0+)
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
    
    # 수동 Salting 기법 (AQE 미지원 환경)
    from pyspark.sql.functions import col, rand, concat, lit, floor
    
    df_salted = df.withColumn(
        "salted_key",
        concat(col("key"), lit("_"), (floor(rand() * 10)).cast("string"))
    )

    종합 프로덕션 설정 템플릿

    spark = SparkSession.builder \
        .appName("ProductionJob") \
        # === 메모리 기본 설정 ===
        .config("spark.executor.memory", "8g") \
        .config("spark.executor.memoryOverhead", "2g") \
        .config("spark.driver.memory", "4g") \
        .config("spark.driver.maxResultSize", "2g") \
        # === 메모리 비율 ===
        .config("spark.memory.fraction", "0.6") \
        .config("spark.memory.storageFraction", "0.4") \
        # === 직렬화 ===
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        # === AQE (Spark 3.0+) ===
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.skewJoin.enabled", "true") \
        # === 파티션 ===
        .config("spark.sql.shuffle.partitions", "400") \
        # === GC ===
        .config("spark.executor.extraJavaOptions",
                "-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:+UseCompressedOops") \
        .getOrCreate()

    7. Spark 3.x 메모리 관리 개선사항

    버전별 주요 변경사항

    • Spark 3.0 — AQE 도입 및 StaticMemoryManager 제거
      • StaticMemoryManager 완전 제거: UnifiedMemoryManager만 남음
      • AQE (Adaptive Query Execution) 정식 도입
      • Off-Heap 분리: memoryOverheadoffHeap.size가 완전히 별개 항목으로 분리
      • REST API 메모리 메트릭 강화: Peak JVM heap, execution/storage 메모리 추적 가능
      • # AQE 활성화 (3.0에서는 기본값 false) spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    • Spark 3.2 — AQE 기본 활성화
      • AQE 기본 활성화: spark.sql.adaptive.enabled=true가 기본값으로 변경
      • AQE 파티션 자동 병합: 소규모 셔플 파티션을 자동으로 코얼레스
      • Push-Based Shuffle: 메모리 효율 향상
      • # 3.2+에서는 별도 설정 없어도 AQE 활성화됨 spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
    • Spark 3.4 / 3.5 — Bloom Filter 및 ZSTD 압축
      • Bloom Filter Join: 메모리 효율적 조인 전략 도입
      • ZSTD 압축 기본 지원: 셔플 데이터 압축으로 메모리/디스크 절약
      • State Store 메모리 개선: 구조적 스트리밍 상태 저장소 최적화
      • spark = SparkSession.builder \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.optimizer.runtime.bloomFilter.enabled", "true") \ .config("spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold", "10m") \ .config("spark.io.compression.codec", "zstd") \ .getOrCreate()

    AQE가 OOM을 방지하는 원리


    8. 메모리 관리 체크리스트

    ✨ 설정 단계 체크리스트

    기본 메모리 설정

    • spark.executor.memory 데이터 크기에 맞게 설정
    • spark.executor.memoryOverhead 설정 완료 (executor.memory의 10% 이상 또는 384MB)
    • spark.driver.memory 설정 완료
    • spark.driver.maxResultSize 제한 설정 (collect() 남용 방지)
    • spark.sql.shuffle.partitions 데이터 크기에 맞게 조정 (파티션당 128~256MB 목표)

    직렬화 설정

    • Kryo 직렬화 활성화 (spark.serializer=KryoSerializer) — Scala/Java 권장 (PySpark는 JVM 셔플 일부에만 적용됨)
    • spark.kryoserializer.buffer.max 설정 (512m 권장)
    • Scala 프로젝트라면 코드내 커스텀 클래스 Kryo 등록 여부 확인
    • PySpark라면 Python UDF 대신 Pandas UDF 사용 여부 검토 (Arrow 기반, 성능 대폭 향상)

    GC 설정

    • spark.executor.extraJavaOptions에 G1GC 설정 포함 (-XX:+UseG1GC)
    • spark.memory.fraction 기본값(0.6) 사용 여부 검토
    • spark.memory.storageFraction 케시 비율 요구사항에 맞게 조정

    Off-Heap (필요 시에만)

    • spark.memory.offHeap.enabled=true 설정
    • spark.memory.offHeap.size 적절히 설정
    • 컨테이너 전체 메모리 = executor.memory + memoryOverhead + offHeap.size 카운팅 확인

    💻 코드 단계 체크리스트

    캐시 관리

    • 반복 사용되는 DataFrame/RDD에만 캐시 적용
    • 사용 완료된 캐시는 unpersist() 명시적으로 해제
    • 데이터 크기에 맞는 StorageLevel 선택 (메모리 부족 시 MEMORY_AND_DISK_SER 고려)
    • 사용되지 않는 컨럼은 조기에 select()/drop()으로 제거

    데이터 처리

    • collect() 호출 없음 (대신 write() 또는 take() 사용)
    • 브로드캐스트 변수 크기 확인 (100MB 이하 권장)
    • 파티션 크기 적절함 (파티션당 128~256MB 목표)
    • Skew 위험이 있는 조인에 AQE 또는 Salting 적용 검토

    🔍 운영 모니터링 체크리스트

    Spark UI 확인 항목

    • Executors 탭 → Storage Memory Used 비율 80% 이하 유지
    • Stages 탭 → Spill (Memory) / Spill (Disk) 발생 여부
    • Stages 탭 → GC Time 비율 5% 이하 유지
    • Tasks 탭 → Duration 편차 확인 (Skew 여부)

    메모리 지표 임계값

    지표 정상 경고 위험
    GC Time 비율 < 5% 5 ~ 10% > 10%
    Storage Memory 사용률 < 70% 70 ~ 85% > 85%
    Spill (Disk) 0 가끔 발생 빈번 발생
    Task 최대/평균 Duration 비율 < 2x 2 ~ 5x > 5x (Skew 의심)

    🚨 OOM 발생 시 진단 체크리스트

    Executor OOM 진단

    • 에러 메시지에 Java heap space 포함 여부 확인
    • Spark UI에서 가장 크거나 느린 Task 식별
    • Spill (Memory/Disk) 발생 여부 확인
    • spark.sql.shuffle.partitions 증가 (2배씨 조정)
    • spark.executor.memory 증가
    • spark.executor.memoryOverhead 증가 (네이티브 라이브러리 사용 시)

    Driver OOM 진단

    • 에러 메시지에 Java heap space 포함 여부 확인
    • 코드내 collect() 호출 여부 확인
    • spark.driver.maxResultSize 초과 여부 확인
    • 브로드캐스트 변수 크기 확인 (100MB 이상 제한 검토)
    • spark.driver.memory 증가

    Skew OOM 진단

    • Task Duration 편차가 큰지 Spark UI에서 확인
    • 특정 키에 데이터가 편중되는지 확인
    • AQE skewJoin 활성화 여부 확인
    • 필요 시 Salting 기법 적용 검토

    참고 자료

    댓글