Pyspark에서 Header가 존재하지 않는 csv 파일을 Dataframe으로 불러올 때에 Schema를 지정하는 방법은 다음과 같습니다.
1. StructType을 이용하여 Schema를 정의하고, 이를 적용하는 방법
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 스키마 정의
schema = StructType([
StructField("column1", StringType(), True),
StructField("column2", IntegerType(), True),
StructField("column3", StringType(), True)
])
# CSV 파일을 DataFrame으로 읽어오기
df = spark.read.format("csv") \
.option("header", False) \
.schema(schema) \
.load("path/to/csv/file.csv")
2. SparkSession의 createDataFrame() 함수를 사용하여 Column 이름을 지정하는 방법
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSession 객체 생성
spark = SparkSession.builder.appName("myApp").getOrCreate()
# CSV 파일을 RDD로 읽어오기
rdd = spark.sparkContext.textFile("path/to/csv/file.csv")
# 스키마 정의
schema = StructType([
StructField("column1", StringType(), True),
StructField("column2", IntegerType(), True),
StructField("column3", StringType(), True)
])
# RDD를 DataFrame으로 변환하며 스키마 적용
df = spark.createDataFrame(
rdd.map(lambda x: x.split(",")),
schema=schema
)
헤더가 있는 경우에는 잘 알려진대로, inferschema 옵션을 true로 설정해주면 된다.
'Study > Data Engineering' 카테고리의 다른 글
[Apache Spark] "Python3" 명령어 실행 불가로 인한 오류 (0) | 2023.03.11 |
---|---|
[Apache Spark] 로컬 환경에서 Apache Spark 설치하기 (0) | 2023.02.14 |
[데이터 파이프라인 핵심 가이드] 2. 최신 데이터 인프라 (0) | 2023.02.08 |
[airflow] Docker에서 airflow 환경 설정 및 실행하기 (1) | 2023.02.06 |
[요리고 Airflow 도입기] 0. Airflow 사용을 결정한 이유 (0) | 2023.02.01 |