-
[Spark] Python → Spark 안티패턴 & 올바른 사용법공부 2026. 4. 26. 23:31
1부. 핵심 패러다임
1. 명령형(Imperative) vs 선언형(Declarative)
Spark와 일반 Python 프로그래밍의 가장 큰 차이점은 프로그래밍 패러다임에 있습니다.
구분 일반 Python (for 루프) Spark (DataFrame) 방식 명령형 (Imperative) 선언형 (Declarative) 실행 위치 드라이버 노드 (단일) 워커 노드 (분산) 병렬 처리 불가 가능 메모리 위험 모든 데이터를 드라이버로 로드 각 노드에서 분산 처리 일반 Python (명령형): 각 행을 하나씩 꺼내서 처리하고 결과를 리스트에 담습니다.
result_list = [] for row in data_list: result_list.append({ "table": row["table"], "call_count": row["call_count"], "user": row["user"], }) result_df = pd.DataFrame(result_list)Spark (선언형): 이 DataFrame에서 이런 결과를 원한다고 선언합니다.
from pyspark.sql.functions import col, count result_df = ( source_df .groupBy("table", "user") .agg(count("*").alias("call_count")) .select("table", "call_count", "user") )
2. Spark에서 for 루프를 사용하지 않는 이유
2-1. 분산 병렬 처리 (Distributed Parallel Processing)
Spark의 DataFrame은 클러스터의 여러 워커 노드에 파티션(Partition)으로 나뉘어 분산 저장됩니다. for 루프를 사용하려면 반드시
.collect()로 모든 데이터를 드라이버로 가져와야 하는데, 이는 분산 처리의 이점을 완전히 포기하는 것입니다.2-2. 불변 DataFrame (Immutable DataFrame)
Spark의 DataFrame은 불변(Immutable)입니다. 기존 DataFrame을 수정하는 것이 아니라, 항상 새로운 DataFrame을 생성하는 방식으로만 작업이 이루어집니다. 따라서 빈 DataFrame에 for 루프로 행을 하나씩 추가하는 개념 자체가 Spark에는 존재하지 않습니다.
2-3. 지연 연산 (Lazy Evaluation)
Spark의 트랜스포메이션(
withColumn,select,filter등)은 즉시 실행되지 않습니다. 액션(show(),count(),write())을 호출하는 시점에 전체 실행 계획을 최적화하여 한 번에 실행합니다.# 아래 코드들은 즉시 실행되지 않음 (트랜스포메이션) df1 = source_df.filter(col("status") == "active") df2 = df1.withColumn("upper_name", upper(col("name"))) df3 = df2.select("id", "upper_name") # 이 코드가 실행될 때 비로소 위의 모든 작업이 한꺼번에 실행됨 (액션) df3.show()
2부. 데이터 변환 패턴
3. 기본 패턴: withColumn + select
새로운 테이블을 만들 때 기존 DataFrame에 컬럼을 추가하고 마지막에 필요한 컬럼만 추출하는 방식이 처음에는 어색하게 느껴질 수 있습니다. 하지만 이것이 Spark의 가장 표준적이고 효율적인 방식입니다.
from pyspark.sql.functions import col result_df = ( query_history_df .withColumn("referenced_tables", count_source_tables_udf(col("statement_text"))) .select("user_name", "referenced_tables") )Catalyst Optimizer와 Column Pruning
Spark의 카탈리스트 옵티마이저(Catalyst Optimizer)는 실행 계획을 자동으로 최적화합니다. 최종적으로 필요한 컬럼이
user_name과referenced_tables뿐이라면, 데이터 소스를 읽을 때부터 불필요한 컬럼은 아예 읽지 않습니다. 이 과정을 컬럼 프루닝(Column Pruning)이라고 합니다.논리적 계획 (개발자가 작성한 코드 기준): SELECT user_name, referenced_tables FROM (SELECT *, udf(statement_text) AS referenced_tables FROM query_history_df) 물리적 실행 계획 (Spark 최적화 후 실제 실행): → 디스크에서 user_name, statement_text 두 컬럼만 읽음 → UDF 적용하여 referenced_tables 계산 → 결과 반환
4. 임시 컬럼 패턴 (Temporary Column Pattern)
중간 계산이 필요한 경우, 기존 DataFrame에 임시 컬럼을 추가하고 작업이 끝난 후 제거하는 방식을 사용합니다.
일반 Python 방식 ❌
result_list = [] for row in data_list: intermediate_value = some_heavy_function(row["raw_data"]) final_value = another_function(intermediate_value) result_list.append({"id": row["id"], "final_result": final_value}) result_df = pd.DataFrame(result_list)Spark 방식 ✅
from pyspark.sql.functions import col result_df = ( source_df .withColumn("_tmp_intermediate", some_heavy_udf(col("raw_data"))) # 중간 계산용 임시 컬럼 .withColumn("final_result", another_udf(col("_tmp_intermediate"))) # 최종 결과 계산 .drop("_tmp_intermediate") # 임시 컬럼 제거 .select("id", "final_result") )임시 컬럼 이름 앞에
_tmp_접두사를 붙이는 것은 나중에 제거해야 할 컬럼임을 명시하는 관례입니다.구분 Python for 루프 Spark 임시 컬럼 패턴 실행 위치 드라이버 노드 (단일) 워커 노드 (분산 병렬) 중간 결과 저장 드라이버 메모리 파티션별 분산 메모리 최적화 없음 Catalyst Optimizer 적용 확장성 데이터 증가 시 선형 저하 노드 추가로 스케일 아웃 중간 계산 없이 단일 함수로 바로 결과를 낼 수 있다면 임시 컬럼 없이 더 간결하게 작성할 수 있습니다.
result_df = ( source_df .withColumn("final_result", direct_udf(col("raw_data"))) .select("id", "final_result") )실전 예시: SQL 쿼리에서 참조 테이블 목록 추출
import re import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import ArrayType, StringType @pandas_udf(ArrayType(StringType())) def extract_tables(statements: pd.Series) -> pd.Series: def parse_tables(sql): tables = re.findall(r"FROM\s+(\w+)", sql or "", re.IGNORECASE) return list(set(tables)) return statements.apply(parse_tables) result_df = ( query_history_df .withColumn("referenced_tables", extract_tables(col("statement_text"))) .select("user_name", "query_date", "referenced_tables") ) result_df.write.saveAsTable("query_table_references")
5. 여러 컬럼 생성 시 효율적인 방법
A, B 컬럼을 가공하여 C, D, E 세 개의 컬럼을 만들어야 하는 경우를 예로 들겠습니다.
5-1. Spark 내장 함수 사용 시 (권장)
from pyspark.sql.functions import col, upper, length, concat_ws # withColumn 체이닝 방식 result_df = ( source_df .withColumn("C", upper(col("A"))) .withColumn("D", length(col("B"))) .withColumn("E", concat_ws("-", col("A"), col("B"))) .select("C", "D", "E") ) # select에서 한 번에 정의하는 방식 (동일한 성능, 더 간결) result_df = source_df.select( upper(col("A")).alias("C"), length(col("B")).alias("D"), concat_ws("-", col("A"), col("B")).alias("E") )Spark 내장 함수를 사용하면 파이프라이닝(Pipelining) 최적화가 적용됩니다.
withColumn을 3번 호출해도 각 행이 컨베이어 벨트를 단 한 번만 지나가면서 C, D, E를 동시에 계산합니다.5-2. Python UDF 사용 시 — StructType 반환 패턴
UDF를 여러 번 호출하면 JVM-Python 직렬화가 그 횟수만큼 발생합니다. 하나의 UDF가 StructType을 반환하도록 설계하면 직렬화를 단 한 번만 수행합니다.
from pyspark.sql.functions import pandas_udf, col from pyspark.sql.types import StructType, StructField, StringType, IntegerType import pandas as pd return_schema = StructType([ StructField("C", StringType()), StructField("D", IntegerType()), StructField("E", StringType()), ]) @pandas_udf(return_schema) def make_c_d_e(a: pd.Series, b: pd.Series) -> pd.DataFrame: return pd.DataFrame({ "C": a.str.upper(), "D": b.str.len().astype("int32"), "E": a + "-" + b, }) result_df = ( source_df .withColumn("results", make_c_d_e(col("A"), col("B"))) .select( col("results.C").alias("C"), col("results.D").alias("D"), col("results.E").alias("E"), ) )
3부. Python 안티패턴 → Spark 올바른 방식
6. 불필요한 collect() 사용
collect()는 클러스터 전체의 데이터를 드라이버 노드 하나로 가져오는 액션입니다. 꼭 필요한 경우가 아니라면 사용을 피해야 합니다.비효율적인 방식 ❌
rows = df.collect() result = [row for row in rows if row["status"] == "active"]Spark 방식 ✅
result_df = df.filter(col("status") == "active")collect()를 사용해도 되는 경우:- 집계 후 소량의 결과를 드라이버에서 후처리할 때
- 단위 테스트에서 결과를 직접 비교할 때
7. 조건 처리 — if/else → when().otherwise()
비효율적인 방식 ❌
from pyspark.sql.functions import udf from pyspark.sql.types import StringType @udf(StringType()) def classify_score(score): if score >= 90: return "A" elif score >= 80: return "B" elif score >= 70: return "C" else: return "F" df.withColumn("grade", classify_score(col("score")))Spark 방식 ✅
from pyspark.sql.functions import when df.withColumn("grade", when(col("score") >= 90, "A") .when(col("score") >= 80, "B") .when(col("score") >= 70, "C") .otherwise("F") )
8. Null 처리 — == None → isNull() / isNotNull()
Python의
== None비교는 Spark DataFrame에서 올바르게 동작하지 않습니다. SQL의 NULL 직비교 시맨틱 때문입니다.비효율적인 방식 ❌
df.filter(col("name") == None) # 의도한 대로 동작하지 않음 df.filter(col("name") != None) # 마찬가지Spark 방식 ✅
from pyspark.sql.functions import coalesce, lit # Null 여부 확인 df.filter(col("name").isNull()) df.filter(col("name").isNotNull()) # Null 값을 기본값으로 대체 df.fillna({"name": "unknown", "age": 0}) # 특정 컬럼의 Null만 대체 df.withColumn("name", coalesce(col("name"), lit("unknown")))
9. 문자열 및 값 검색 — Python 연산자 → Spark 함수
비효율적인 방식 ❌
from pyspark.sql.functions import udf from pyspark.sql.types import BooleanType @udf(BooleanType()) def contains_keyword(text, keyword): return keyword in (text or "") @udf(BooleanType()) def is_valid_status(status): return status in ["active", "enabled", "on"]Spark 방식 ✅
from pyspark.sql.functions import col # 문자열 포함 여부 df.filter(col("text").contains("keyword")) # 문자열 시작/끝 df.filter(col("name").startswith("Kim")) df.filter(col("name").endswith("_v2")) # 정규식 매칭 df.filter(col("email").rlike(r"^[\w.+-]+@[\w-]+\.[a-zA-Z]{2,}$")) # 값 목록에 포함 여부 (Python의 `in` 연산자 대응) df.filter(col("status").isin(["active", "enabled", "on"])) df.filter(~col("status").isin(["deleted", "banned"])) # NOT IN
10. 그룹별 집계 — for 루프 → groupBy().agg()
비효율적인 방식 ❌
rows = df.collect() group_result = {} for row in rows: key = row["department"] if key not in group_result: group_result[key] = {"total": 0, "count": 0} group_result[key]["total"] += row["salary"] group_result[key]["count"] += 1 result = [(k, v["total"] / v["count"]) for k, v in group_result.items()]Spark 방식 ✅
from pyspark.sql.functions import avg, sum, count, max, min result_df = ( df .groupBy("department") .agg( avg("salary").alias("avg_salary"), sum("salary").alias("total_salary"), count("*").alias("employee_count"), max("salary").alias("max_salary"), ) )
4부. 성능 최적화
11. Python UDF 대신 Spark 내장 함수
Python UDF는 JVM-Python 직렬화 비용 때문에 느립니다. Spark에 이미 동일한 기능의 내장 함수가 있다면 반드시 그것을 사용해야 합니다.
비효율적인 방식 ❌
@udf(StringType()) def to_upper(text): return text.upper() if text else None df.withColumn("upper_name", to_upper(col("name")))Spark 방식 ✅
from pyspark.sql.functions import upper df.withColumn("upper_name", upper(col("name")))자주 쓰는 내장 함수 목록:
작업 Python 방식 Spark 내장 함수 대소문자 변환 .upper(),.lower()upper(),lower()공백 제거 .strip()trim(),ltrim(),rtrim()문자열 분리 .split()split()문자열 결합 f"{a}-{b}"concat(),concat_ws()길이 len()length()날짜 포맷 strftime()date_format()타입 변환 int(),str()cast()반올림 round()round(),floor(),ceil()Null 대체 x or defaultcoalesce(),fillna()
12. Python UDF vs Pandas UDF
구분 Python UDF Pandas UDF 처리 단위 행(Row) 단위 배치(Batch) 단위 직렬화 방식 Pickle (느림) Apache Arrow (빠름) 성능 느림 빠름 사용 시점 단순 로직, 프로토타이핑 복잡한 로직, 대규모 데이터 from pyspark.sql.functions import udf, pandas_udf from pyspark.sql.types import StringType import pandas as pd # Python UDF (행 단위 처리, 느림) @udf(StringType()) def python_udf_func(text: str) -> str: return text.upper() if text else None # Pandas UDF (배치 단위 처리, 빠름) @pandas_udf(StringType()) def pandas_udf_func(text: pd.Series) -> pd.Series: return text.str.upper() # 사용법은 동일 df.withColumn("result", python_udf_func(col("text"))) df.withColumn("result", pandas_udf_func(col("text")))
13. 스키마 자동 추론 → 명시적 스키마 정의
CSV나 JSON을 읽을 때 스키마를 자동 추론하도록 두면 Spark가 전체 데이터를 한 번 스캔해야 합니다. 명시적으로 스키마를 정의하면 이 스캔을 건너뜁니다.
비효율적인 방식 ❌
# inferSchema=True → 전체 파일을 두 번 스캔 (캐스팅 + 실제 로드) df = spark.read.csv("data.csv", header=True, inferSchema=True)Spark 방식 ✅
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType schema = StructType([ StructField("id", IntegerType(), nullable=False), StructField("name", StringType(), nullable=True), StructField("salary", DoubleType(), nullable=True), StructField("department", StringType(), nullable=True), ]) df = spark.read.csv("data.csv", header=True, schema=schema)명시적 스키마를 쓰면:
- 파일 스캔 횟수 감소 (성능 향상)
- 타입 오류를 읽는 시점에서 즉시 발견 가능
- 코드 가독성 향상
14. 소규모 테이블 조인 → broadcast join
한쪽 테이블이 작을 때 일반 join을 사용하면 Spark는 두 테이블을 셔플(shuffle)합니다.
broadcast()를 사용하면 작은 테이블을 모든 워커 노드에 복사하여 셔플 없이 조인할 수 있습니다.비효율적인 방식 ❌
# 양쪽 모두 셔플 발생 result = large_df.join(small_df, on="id", how="left")Spark 방식 ✅
from pyspark.sql.functions import broadcast # 작은 테이블을 broadcast → 셔플 없이 조인 result = large_df.join(broadcast(small_df), on="id", how="left")spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10mb")로 자동 broadcast 임계값을 설정할 수도 있습니다. 기본값은 10MB입니다.
15. 파티션 관리 — repartition() vs coalesce()
파티션 수를 줄일 때
repartition()과coalesce()중 어느 것을 쓰느냐가 성능에 영향을 미칩니다.비효율적인 방식 ❌
# 파티션을 줄이려는데 repartition() 사용 → 불필요한 전체 셔플 발생 df.repartition(10)Spark 방식 ✅
# 파티션 수를 줄일 때: coalesce() 사용 (셔플 없이 인접 파티션 병합) df.coalesce(10) # 파티션 수를 늘리거나, 특정 컬럼 기준으로 재분배할 때: repartition() df.repartition(200) df.repartition(200, col("department"))구분 repartition() coalesce() 셔플 발생 (전체 재분배) 없음 (인접 파티션 병합) 파티션 수 증가 가능 불가 파티션 수 감소 가능 가능 데이터 균형 균일 불균일할 수 있음 사용 시점 늘릴 때, 컬럼 기준 분배 줄일 때
16. cache() / persist() 활용
동일한 DataFrame을 여러 곳에서 재사용하는 경우,
cache()나persist()로 중간 결과를 메모리에 저장하면 재계산을 방지할 수 있습니다.from pyspark import StorageLevel base_df = ( source_df .filter(col("status") == "active") .withColumn("processed", my_udf(col("data"))) .cache() # 기본: 메모리에만 저장 # .persist(StorageLevel.MEMORY_AND_DISK) # 메모리 부족 시 디스크도 사용 ) # base_df를 여러 번 사용해도 재계산하지 않음 count_by_user = base_df.groupBy("user").count() count_by_table = base_df.groupBy("table").count() # 사용이 끝나면 반드시 캐시 해제 base_df.unpersist()주의:
cache()는 액션(Action)이 호출될 때 실제로 캐싱이 이루어집니다. 트랜스포메이션 단계에서는 캐싱이 되지 않으므로, 첫 액션 호출 후부터 캐시가 활성화됩니다.
5부. 디버깅
17. 실행 계획 확인 (explain)
explain()메서드를 사용하면 Spark가 실제로 어떤 실행 계획을 세우는지 확인할 수 있습니다. Column Pruning 등의 최적화가 올바르게 적용되는지 검증할 때 유용합니다.from pyspark.sql.functions import col, upper result_df = ( source_df .withColumn("upper_name", upper(col("name"))) .select("id", "upper_name") ) # 기본 실행 계획 출력 result_df.explain() # 상세 실행 계획 (논리적 계획 포함) result_df.explain(extended=True) # 출력 예시: # == Physical Plan == # *(1) Project [id#0, upper(name#1) AS upper_name#2] # +- *(1) FileScan parquet [id#0, name#1] # ↑ name, id 두 컬럼만 읽음 → Column Pruning 적용 확인 가능