ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] 스파크 Structured Streaming 정리
    공부 2026. 4. 26. 23:37

    1. 개요

    Spark Structured Streaming은 Apache Spark 2.0에서 도입되고 2.2에서 정식 안정화된 확장 가능한 스트림 처리 엔진입니다. 기존 배치 처리에서 사용하던 DataFrame/Dataset API를 그대로 스트리밍에 적용할 수 있어 "한 번 작성, 배치/스트리밍 모두 실행"이 가능합니다.

    핵심 설계 철학: Unbounded Table 모델

    스파크 Structured Streaming은 입력 데이터 스트림을 무한히 추가되는 테이블로 개념화합니다.

    • 새로 도착하는 레코드 = 테이블에 추가되는 새 행(row)
    • 이 입력 테이블에 대한 쿼리가 결과 테이블(Result Table) 을 생성
    • 결과 테이블은 주기적으로 싱크(Sink)에 기록됨
    • 이 추상화 덕분에 배치 쿼리와 완전히 동일한 로직으로 스트리밍 처리 가능
    # 기본 예시: 소켓에서 단어 수 집계
    spark = SparkSession.builder.appName("WordCount").getOrCreate()
    
    lines = spark.readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()
    
    wordCounts = lines.select(explode(split(lines.value, " ")).alias("word")) \
                      .groupBy("word").count()
    
    query = wordCounts.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
    
    query.awaitTermination()

    2. Spark Structured Streaming 장단점

    장점

    • Unified Batch/Streaming API: 동일한 DataFrame/Dataset API로 배치와 스트리밍 코드를 작성할 수 있습니다. 기존 배치 파이프라인을 스트리밍으로 전환할 때 코드 변경이 최소화됩니다.
    • Exactly-Once 보장: 마이크로 배치 모드에서 end-to-end exactly-once 처리를 기본 제공합니다. Delta Lake와 조합 시 멱등 쓰기(idempotent write)로 중복 레코드를 방지할 수 있습니다.
    • Fault Tolerance (체크포인트): 체크포인트를 통한 자동 장애 복구를 지원합니다. 최근에는 changelog 기반 체크포인팅이 도입되어 전체 상태 스냅샷 대신 변경분만 저장해 I/O가 대폭 감소했습니다.
    • Catalyst Optimizer 활용: 스트리밍 쿼리에도 Catalyst 쿼리 최적화 엔진이 적용됩니다. 조건 푸시다운, 파티션 프루닝 등 배치 최적화 기법이 스트리밍에도 자동 적용됩니다.
    • Delta Lake 완벽 통합: Delta Lake를 소스/싱크로 직접 사용할 수 있습니다. ACID 트랜잭션, 스키마 진화, 타임 트래블 등 Lakehouse 기능을 풀로 활용할 수 있습니다.
    • 높은 확장성: 수십~수백 노드 클러스터로 수평 확장이 가능하며, 안정 조건에서 초당 최대 100만 이벤트 처리를 달성할 수 있습니다.

    단점

    • Micro-Batch 레이턴시: 기본 마이크로 배치 모델의 레이턴시는 수백 ms~수 초 수준으로, Flink의 ms급에 비해 불리합니다. 다만 2025년 12월 Spark 4.1에서 Real-Time Mode(RTM) 가 공개되어 p99 레이턴시 한 자릿수 ms가 달성되었습니다.
    • 복잡한 Stateful 연산: mapGroupsWithState/flatMapGroupsWithState 사용 시 상태 스키마나 타임아웃 타입을 변경하면 기존 체크포인트가 무효화됩니다. 또한 shuffle.partitions 수를 스트림 시작 후에는 변경할 수 없습니다.
    • 메모리 관리 복잡성: State Store가 메모리와 디스크 리소스를 점유하며, 상태 크기가 커질수록 GC 압박 및 지연이 증가합니다.
    • 복잡한 Checkpoint 관리: 입력 소스 추가, Kafka 토픽 변경, stateful 연산 타입 변경 등이 발생하면 새 체크포인트가 필요합니다. 장기 운영 시 체크포인트 디렉토리 용량이 증가하여 주기적인 관리가 필요합니다.
    • 운영 비용: 스트리밍 특성상 클러스터를 24/7 상시 운영해야 하므로 배치 대비 고정 비용이 높습니다.

    Spark Structured Streaming vs Apache Flink

    항목 Spark Structured Streaming Apache Flink
    처리 모델 마이크로 배치 (기본) / RTM(Spark 4.1+) 네이티브 이벤트 스트리밍
    레이턴시 수백 ms ~ 수 초 (RTM: 단 자릿수 ms) 기본 ms 이하
    Exactly-Once 기본 지원 기본 지원
    Stateful 처리 제한적 (스키마 변경 불가) 강력 (세밀한 시간/상태 제어)
    배치 통합 완전 통합 (Unified API) 별도 배치 API (제한적)
    Lakehouse 통합 Delta Lake 완벽 통합 상대적으로 약함

    선택 기준: 레이턴시 허용 + Lakehouse/배치 통합 필요 → Spark Structured Streaming / ms급 레이턴시 필수 + 복잡한 stateful 연산 → Apache Flink


    3. 핵심 개념 및 내부 아키텍처

    처리 모드

    Micro-Batch (기본)

    데이터를 시간 간격 또는 데이터 가용성에 따라 이산(discrete) 배치로 처리합니다. 드라이버가 매 배치마다 소스를 폴링하고 새 오프셋을 결정하여 플랜을 생성하고 실행합니다. end-to-end 레이턴시는 최소 ~100ms이며 exactly-once를 보장합니다.

    Continuous Processing (Spark 2.3+)

    레코드가 도착하는 즉시 처리하며, epoch 기반 체크포인팅을 사용합니다. 레이턴시는 ~1ms이지만 at-least-once만 보장하며, stateless 단순 변환(map, filter)만 지원합니다.

    Real-Time Mode (Spark 4.1, 2025년 12월 출시)

    마이크로 배치와 달리 처리 스테이지가 동시에 실행되며, 셔플 파일이 생성되는 즉시 리듀서가 시작됩니다. epoch 경계는 복구 북마크로만 사용되고, p99 레이턴시 한 자릿수 ms를 달성했습니다.

    Trigger 모드

    Trigger 구문 동작
    Default .trigger() 없음 이전 배치가 끝나는 즉시 새 배치 시작
    ProcessingTime processingTime="2 seconds" 고정 시간 간격 스케줄링
    Once Trigger.Once() 가용 데이터 처리 후 종료 (Deprecated)
    AvailableNow availableNow=True 가용 데이터를 증분 배치로 처리 후 종료
    Continuous continuous="1 second" 연속 처리, N초마다 체크포인트

    4. Output Modes

    결과 테이블의 어느 행이 싱크에 기록될지를 결정합니다.

    모드 동작 사용 시기
    Append (기본) 마지막 트리거 이후 새로 추가된 행만 기록 집계 없는 쿼리, 워터마크가 있는 집계
    Update 마지막 트리거 이후 변경된 행만 기록 집계 (쓰기 볼륨 최소화)
    Complete 전체 결과 테이블을 매 트리거마다 재기록 집계 (전체 상태가 항상 필요한 경우)
    # Update 모드 예시
    wordCounts.writeStream.outputMode("update").format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "output").start()

    5. Watermarking과 Late Data 처리

    이벤트는 네트워크 지연, 재시도 등으로 인해 순서가 뒤바뀌어 도착할 수 있습니다. Watermark 없이는 엔진이 모든 이벤트 타임 윈도우의 상태를 영구적으로 유지해야 하므로 메모리가 무한히 증가합니다.

    동작 원리

    • withWatermark(이벤트 시간 컬럼, 지연 임계값) 으로 설정
    • 엔진이 수신된 모든 이벤트에서 최대 이벤트 시간을 추적
    • 워터마크 = 최대 이벤트 시간 - 임계값
    • 이벤트 시간이 워터마크보다 이전인 데이터는 "너무 늦은 데이터"로 간주되어 드롭
    • 워터마크를 지난 윈도우의 상태는 자동 정리
    from pyspark.sql.functions import window
    
    # 최대 10분까지 지연 도착을 허용
    windowedCounts = events \
        .withWatermark("eventTime", "10 minutes") \
        .groupBy(
            window("eventTime", "10 minutes", "5 minutes"),  # 10분 윈도우, 5분 슬라이드
            "userId"
        ) \
        .count()

    핵심 규칙: withWatermark는 반드시 groupBy 이전에 적용해야 합니다. 다중 스트리밍 소스가 있는 쿼리에서 전역 워터마크는 개별 소스 워터마크 중 최솟값으로 결정됩니다.


    6. Stateful Operations (상태 기반 연산)

    내장 Stateful 연산자

    • groupBy().count(), groupBy().agg() — 실행 집계
    • dropDuplicates() — 중복 제거
    • join() (스트림-스트림 조인) — 매칭을 위한 버퍼 상태

    임의 상태 처리 (Arbitrary Stateful Processing)

    mapGroupsWithState (Spark 2.2+)

    그룹 키별 사용자 정의 상태를 유지하며, 트리거마다 그룹당 정확히 하나의 출력 행을 반환해야 합니다. 실행 중 총합, 고정 출력 형태의 세션 추적 등에 활용합니다.

    flatMapGroupsWithState (Spark 2.2+)

    mapGroupsWithState와 유사하지만 그룹당 0개 이상의 행을 반환할 수 있습니다. 세션 윈도우, 복잡 이벤트 처리(CEP) 등에 활용합니다.

    transformWithState (Spark 4.0+ — 차세대 API)

    mapGroupsWithState/flatMapGroupsWithState를 대체하는 차세대 API로, 객체 지향 방식의 StatefulProcessor 클래스를 구현합니다. ValueState, ListState, MapState 등 풍부한 상태 프리미티브와 TTL 기반 상태 소멸, 키별 다중 타이머, 스키마 진화를 지원합니다.


    7. Sources & Sinks

    주요 소스

    소스 Format 주요 옵션
    Kafka "kafka" kafka.bootstrap.servers, subscribe, startingOffsets
    Delta Lake "delta" path, maxFilesPerTrigger, ignoreChanges
    File "parquet", "json" path, maxFilesPerTrigger
    Rate "rate" rowsPerSecond (테스트용)

    주요 싱크

    싱크 Format 출력 모드
    Kafka "kafka" Append, Update, Complete
    Delta Lake "delta" Append, Complete
    File "parquet" Append only
    ForeachBatch 모두 지원 (가장 유연)
    # ForeachBatch: 다중 싱크에 동시 쓰기
    def write_to_multiple(batch_df, batch_id):
        batch_df.persist()
        batch_df.write.format("delta").mode("append").save("/delta/output")
        batch_df.write.format("jdbc").mode("append") \
            .option("url", "jdbc:postgresql://...").save()
        batch_df.unpersist()
    
    df.writeStream.foreachBatch(write_to_multiple) \
        .option("checkpointLocation", "/checkpoints/multi") \
        .start()

    8. Exactly-Once 보장 및 Fault Tolerance

    Spark Structured Streaming은 세 가지 핵심 요소로 exactly-once를 보장합니다.

    1. 재생 가능한 소스(Replayable Sources): Kafka는 특정 오프셋부터, Delta Lake는 특정 트랜잭션 버전부터 데이터를 재생할 수 있습니다.
    2. 멱등 싱크(Idempotent Sinks): 동일 데이터를 여러 번 기록해도 결과가 같아야 합니다. 파일 싱크는 원자적 파일 생성(임시 파일 → 이름 변경)으로, Delta Lake는 트랜잭션 로그로 이를 보장합니다.
    3. 체크포인팅 + Write-Ahead Log(WAL): 오프셋 로그(소스에서 읽은 위치), 커밋 로그(싱크에 완전히 커밋된 배치), 상태 스토어(집계·조인 상태)를 체크포인트에 저장합니다.
    query = df.writeStream \
        .format("delta") \
        .option("checkpointLocation", "/checkpoints/my-query") \  # 필수
        .start("/delta/output")

    장애 복구 흐름

    장애 감지 → 체크포인트에서 쿼리 재시작 → 마지막 커밋 오프셋부터 소스 재생 → 변환 재실행 → 멱등 싱크에 기록

    9. Spark Structured Streaming vs DStream (레거시)

    항목 DStream (Spark Streaming) Structured Streaming
    API RDD 기반 (DStream[T]) DataFrame/Dataset API
    도입 Spark 0.7 Spark 2.0 (안정 2.2)
    상태 Deprecated 현행 권장
    최적화 없음 (수동 RDD 연산) Catalyst + Tungsten
    Event-Time 미지원 기본 지원
    Watermarking 없음 withWatermark() 내장
    보장 수준 At-least-once Exactly-once (마이크로 배치)
    최소 레이턴시 ~500ms ~100ms (마이크로 배치) / ~1ms (연속 처리)
    스트림-스트림 조인 미지원 기본 지원

    10. EMR에서 사용 예시

    배포 방식 선택

    방식 장점 단점 권장 시나리오
    EMR on EC2 Spot 활용 최대화, 세밀한 제어 클러스터 관리 부담 대용량 안정 워크로드, 비용 최적화 최우선
    EMR on EKS K8s 환경 통합, 멀티 테넌시 K8s 운영 지식 필요 EKS 인프라를 이미 보유한 팀
    EMR Serverless 인프라 Zero, 자동 스케일 콜드 스타트, 일부 제약 빠른 도입, 예측 불가 워크로드

    EMR Serverless 클러스터 생성 및 잡 제출

    # 1단계: EMR Serverless Application 생성
    aws emr-serverless create-application \
      --name "streaming-app" \
      --type SPARK \
      --release-label emr-7.2.0 \
      --initial-capacity '{
        "DRIVER": {"workerCount": 1, "workerConfiguration": {"cpu": "4vCPU", "memory": "16GB"}},
        "EXECUTOR": {"workerCount": 10, "workerConfiguration": {"cpu": "4vCPU", "memory": "16GB"}}
      }'
    
    # 2단계: STREAMING 모드로 잡 제출
    aws emr-serverless start-job-run \
      --application-id <APPLICATION_ID> \
      --execution-role-arn arn:aws:iam::ACCOUNT_ID:role/EMRServerlessRole \
      --mode 'STREAMING' \
      --job-driver '{
        "sparkSubmit": {
          "entryPoint": "s3://my-bucket/scripts/kafka_to_s3.py",
          "sparkSubmitParameters": "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,software.amazon.msk:aws-msk-iam-auth:2.2.0"
        }
      }' \
      --retry-policy '{"maxFailedAttemptsPerHour": 5}'

    MSK Kafka → S3 파이프라인 (PySpark)

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, from_json, year, month, dayofmonth
    from pyspark.sql.types import *
    
    spark = SparkSession.builder.appName("KafkaToS3").getOrCreate()
    
    schema = StructType([
        StructField("order_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("event_time", TimestampType(), True),
    ])
    
    # MSK Kafka 소스 (IAM 인증)
    raw_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "<MSK_BOOTSTRAP>:9098") \
        .option("subscribe", "orders") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.mechanism", "AWS_MSK_IAM") \
        .option("kafka.sasl.jaas.config",
                "software.amazon.msk.auth.iam.IAMLoginModule required;") \
        .option("kafka.sasl.client.callback.handler.class",
                "software.amazon.msk.auth.iam.IAMClientCallbackHandler") \
        .load()
    
    parsed_df = raw_df \
        .select(from_json(col("value").cast("string"), schema).alias("data")) \
        .select("data.*") \
        .withWatermark("event_time", "10 minutes") \
        .withColumn("year", year("event_time")) \
        .withColumn("month", month("event_time")) \
        .withColumn("day", dayofmonth("event_time"))
    
    query = parsed_df.writeStream \
        .format("parquet") \
        .option("path", "s3://my-bucket/output/") \
        .option("checkpointLocation", "s3://my-bucket/checkpoints/") \
        .partitionBy("year", "month", "day") \
        .trigger(processingTime="5 minutes") \
        .start()
    
    query.awaitTermination()

    Kinesis Data Streams 연동

    EMR 7.1.0부터 Kinesis Connector가 릴리즈 이미지에 내장되어 별도 JAR가 필요 없습니다. GetRecords(공유 처리량)와 SubscribeToShard(Enhanced Fan-Out, 저지연 전용 처리량) 두 가지 모드를 지원합니다.

    kinesis_df = spark.readStream \
        .format("aws-kinesis") \
        .option("kinesis.region", "ap-northeast-2") \
        .option("kinesis.streamName", "sensor-data-stream") \
        .option("kinesis.consumerType", "GetRecords") \
        .option("kinesis.startingposition", "LATEST") \
        .load()

    AWS 서비스 통합 요약

    AWS 서비스 연동 방식
    MSK (Kafka) spark-sql-kafka + aws-msk-iam-auth JAR, IAM 인증
    Kinesis Data Streams aws-kinesis 커넥터 (EMR 7.1+ 내장)
    S3 EMRFS(S3A 내장), s3:// 경로 직접 사용
    Glue Data Catalog Hive Metastore Factory Class 설정

    모니터링: CloudWatch 연동

    from pyspark.sql.streaming import StreamingQueryListener
    import boto3
    
    class CloudWatchListener(StreamingQueryListener):
        def __init__(self):
            self.cw = boto3.client("cloudwatch", region_name="ap-northeast-2")
    
        def onQueryProgress(self, event):
            progress = event.progress
            self.cw.put_metric_data(
                Namespace="SparkStreaming/Production",
                MetricData=[
                    {"MetricName": "InputRowsPerSecond",
                     "Value": progress.inputRowsPerSecond or 0,
                     "Unit": "Count/Second",
                     "Dimensions": [{"Name": "JobName", "Value": "kafka-to-s3"}]},
                ]
            )
        def onQueryStarted(self, event): pass
        def onQueryTerminated(self, event): pass
    
    spark.streams.addListener(CloudWatchListener())

    11. Dataproc에서 사용 예시

    클러스터 생성

    gcloud dataproc clusters create my-streaming-cluster \
      --region=us-central1 \
      --master-machine-type=n2-standard-4 \
      --num-workers=4 \
      --worker-machine-type=n2-standard-4 \
      --image-version=2.2-debian12 \
      --enable-component-gateway \
      --metric-sources=spark \
      --properties="spark:spark.streaming.stopGracefullyOnShutdown=true" \
      --project=my-project

    주의: Dataproc의 Autoscaling은 Spark Structured Streaming을 지원하지 않습니다. 스트리밍 클러스터에서는 autoscaling 없이 고정 클러스터를 운영하는 것이 권장됩니다.

    Managed Kafka → BigQuery 파이프라인 (PySpark)

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, from_json, current_timestamp
    from pyspark.sql.types import *
    
    spark = SparkSession.builder.appName("KafkaToBigQuery").getOrCreate()
    
    schema = StructType([
        StructField("user_id", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("event_time", LongType(), True),
    ])
    
    raw_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers",
                "bootstrap.CLUSTER_ID.us-central1.managedkafka.PROJECT.cloud:9092") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.mechanism", "OAUTHBEARER") \
        .option("subscribe", "user-events") \
        .option("startingOffsets", "latest") \
        .load()
    
    parsed_df = raw_df \
        .select(from_json(col("value").cast("string"), schema).alias("data")) \
        .select("data.*") \
        .withColumn("ingested_at", current_timestamp())
    
    def write_to_bigquery(batch_df, epoch_id):
        batch_df.write \
            .format("bigquery") \
            .option("writeMethod", "direct") \
            .option("table", "my-project.analytics.events") \
            .mode("append") \
            .save()
    
    query = parsed_df.writeStream \
        .foreachBatch(write_to_bigquery) \
        .option("checkpointLocation", "gs://my-bucket/checkpoints/kafka-to-bq") \
        .trigger(processingTime="30 seconds") \
        .start()
    
    query.awaitTermination()

    GCS에 Parquet으로 쓰기

    query = df.writeStream \
        .format("parquet") \
        .outputMode("append") \
        .option("path", "gs://my-bucket/data/events/") \
        .option("checkpointLocation", "gs://my-bucket/checkpoints/gcs-parquet") \
        .trigger(processingTime="5 minutes") \
        .partitionBy("process_date", "process_hour") \
        .start()

    GCP 서비스 통합 요약

    GCP 서비스 연동 방식 비고
    Managed Kafka 표준 Kafka 커넥터, OAUTHBEARER 인증 Pub/Sub Lite 대체 권장 (2026년 종료 예정)
    BigQuery spark-bigquery-connector, writeMethod=direct Dataproc 2.1+ 내장
    GCS gs:// 경로 직접 사용 체크포인트/싱크 모두 지원
    Bigtable spark-bigtable-connector (v1.1+) Data Boost로 분석 부하 분리 가능

    잡 제출

    gcloud dataproc jobs submit pyspark gs://my-bucket/jobs/streaming.py \
      --cluster=my-streaming-cluster \
      --region=us-central1 \
      --packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
      --properties=spark.executor.memory=4g,spark.executor.cores=2,spark.executor.instances=8

    12. 데이터브릭스에서 사용 예시

    DLT vs 일반 Structured Streaming 선택 기준

    Structured Streaming 권장 Delta Live Tables (DLT) 권장
    foreachBatch로 외부 DB 쓰기 멀티-스테이지 ETL 파이프라인
    세밀한 클러스터 제어 필요 데이터 품질 거버넌스 필요
    SLA-민감 레이턴시 파이프라인 자동 체크포인트·스케일링 필요
    특수 소스/싱크 필요 Unity Catalog 기반 거버넌스

    Auto Loader (cloudFiles)

    Auto Loader는 S3, ADLS, GCS에 새로 도착하는 파일을 자동으로 감지하여 점진적으로 처리하는 Databricks 전용 Structured Streaming 소스입니다. 시간당 수백만 개 파일 처리가 가능하며, Directory Listing Mode(기본)와 File Notification Mode(대규모 권장) 두 가지를 지원합니다.

    # Auto Loader: S3 JSON → Delta Lake
    streaming_df = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", "s3://my-bucket/checkpoints/schema/")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("recursiveFileLookup", "true")
            .load("s3://my-bucket/raw/events/")
    )
    
    query = (
        streaming_df.writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", "s3://my-bucket/checkpoints/events/")
            .option("mergeSchema", "true")
            .trigger(processingTime="30 seconds")
            .toTable("catalog.bronze.raw_events")
    )

    Kafka → Delta Lake 파이프라인 (전체 예시)

    from pyspark.sql.functions import from_json, col, current_timestamp
    from pyspark.sql.types import *
    
    event_schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("user_id", StringType(), True),
        StructField("price", StringType(), True),
        StructField("event_time", TimestampType(), True),
    ])
    
    kafka_options = {
        "kafka.bootstrap.servers": "kafka-broker:9092",
        "subscribe": "orders-topic",
        "startingOffsets": "latest",
        "kafka.security.protocol": "SASL_SSL",
        "kafka.sasl.mechanism": "PLAIN",
        # Databricks Secret 사용 (하드코딩 금지)
        "kafka.sasl.jaas.config": (
            "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required "
            f"username='{dbutils.secrets.get('kafka', 'username')}' "
            f"password='{dbutils.secrets.get('kafka', 'password')}'; "
        ),
        "maxOffsetsPerTrigger": "50000",
        "failOnDataLoss": "false",
    }
    
    raw_kafka_df = spark.readStream.format("kafka").options(**kafka_options).load()
    
    parsed_df = (
        raw_kafka_df
            .select(from_json(col("value").cast("string"), event_schema).alias("data"),
                    col("offset"), col("timestamp").alias("kafka_timestamp"))
            .select("data.*", "offset", "kafka_timestamp", current_timestamp().alias("ingested_at"))
            .filter(col("order_id").isNotNull())
    )
    
    query = (
        parsed_df.writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", "s3://my-bucket/checkpoints/kafka-orders/")
            .option("mergeSchema", "true")
            .trigger(processingTime="10 seconds")
            .toTable("catalog.bronze.orders_raw")
    )

    Delta MERGE INTO (Upsert / CDC)

    def upsert_to_delta(micro_batch_df, batch_id):
        micro_batch_df.createOrReplaceTempView("batch_updates")
        micro_batch_df.sparkSession.sql("""
            MERGE INTO catalog.silver.customers AS target
            USING (
                SELECT * FROM (
                    SELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY updated_at DESC) AS rn
                    FROM batch_updates
                ) WHERE rn = 1
            ) AS source
            ON target.customer_id = source.customer_id
            WHEN MATCHED AND source.op = 'D' THEN DELETE
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED AND source.op != 'D' THEN INSERT *
        """)
    
    spark.readStream.table("catalog.bronze.customer_cdc") \
        .writeStream \
        .foreachBatch(upsert_to_delta) \
        .option("checkpointLocation", "s3://my-bucket/checkpoints/customers/") \
        .trigger(processingTime="2 minutes") \
        .start()

    Delta Live Tables (DLT) 파이프라인

    import dlt
    from pyspark.sql.functions import col, current_timestamp
    
    # Bronze: 원본 인제스트
    @dlt.table(name="bronze_orders", comment="Auto Loader로 S3에서 수집한 원본 주문 데이터")
    def bronze_orders():
        return (
            spark.readStream.format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("cloudFiles.schemaLocation", "/pipelines/bronze_orders/schema")
                .load("s3://my-bucket/raw/orders/")
                .withColumn("_ingested_at", current_timestamp())
        )
    
    # Silver: 정제 + 품질 검증
    @dlt.table(name="silver_orders")
    @dlt.expect("order_id_not_null", "order_id IS NOT NULL")
    @dlt.expect_or_drop("valid_amount", "amount > 0")
    @dlt.expect_or_fail("valid_event", "event_type IN ('CREATE', 'UPDATE', 'CANCEL')")
    def silver_orders():
        return (
            dlt.read_stream("bronze_orders")
                .withColumn("amount", col("amount").cast("decimal(18,2)"))
                .withColumn("event_date", col("event_time").cast("date"))
        )
    
    # Gold: 집계
    @dlt.table(name="gold_daily_revenue")
    def gold_daily_revenue():
        return (
            dlt.read("silver_orders")
                .filter(col("event_type") == "CREATE")
                .groupBy("event_date")
                .agg({"amount": "sum", "order_id": "count"})
        )

    비용 최적화 전략

    전략 효과 방법
    Job Cluster 사용 유휴 비용 제거 All-purpose 대신 Job Cluster로 실행
    Spot Instances 최대 80% 절감 Worker 노드에 Spot 적용, Driver는 On-demand
    availableNow Trigger 배치화로 비용 최소화 연속 스트리밍 대신 주기적 실행 후 자동 종료
    DLT Enhanced Autoscaling 부하에 따라 자동 축소 DLT 파이프라인 설정에서 활성화
    # availableNow: 배치 패턴으로 비용 최적화
    query = (
        spark.readStream.format("cloudFiles").option("cloudFiles.format", "parquet")
            .load("s3://my-bucket/raw/")
        .writeStream.format("delta")
            .option("checkpointLocation", "/checkpoints/batch/")
            .trigger(availableNow=True)  # 처리 완료 후 자동 종료 → 클러스터 자동 종료
            .toTable("catalog.silver.processed")
    )
    query.awaitTermination()

    권장 런타임: Databricks Runtime 17.3 LTS (현재 최신 LTS, 2025.10 출시). 프로덕션 스트리밍에는 LTS 버전 사용을 강력 권장합니다.

    댓글