데이터셋을 Foundry에서 작업하는 것이 가장 쉽습니다. Foundry는 데이터셋에 CSV 또는 JSON 파일이 포함되어 있을 경우 수동으로 스키마를 추가할 수 있도록 스키마 적용 버튼을 제공합니다. 스키마 적용 버튼은 데이터의 일부를 기반으로 자동으로 스키마를 추론합니다. 스키마가 적용되면 데이터셋 보기에서 스키마 편집을 선택하여 열 유형을 수정하거나 추가 파싱 옵션을 적용하여 불규칙한 행을 제거하고, 인코딩을 변경하거나, 파일 경로, 행의 바이트 오프셋, 가져오기 타임스탬프 또는 행 번호와 같은 추가 열을 추가할 수 있습니다.
초기 데이터셋의 파일을 기반으로 정적으로 적용된 스키마는 데이터가 변경되면 최신 상태가 아닐 수 있습니다. 따라서 반정형 데이터에 대한 변환 파이프라인의 첫 번째 단계로 스파크가 동적으로 스키마를 추론하도록 하는 것이 도움이 될 수 있습니다.
파이프라인 빌드마다 동적으로 스키마를 추론하는 것은 성능 비용이 들기 때문에 이 기술은 드물게 사용해야 합니다(예를 들어, 스키마가 변경될 수 있는 경우).
아래에는 CSV 및 JSON 입력값에 대한 예제가 있습니다.
Parquet은 변환의 기본 출력 파일 형식입니다. 이 형식은 자동으로 추론된 스키마에 있는 특정 특수 문자를 허용하지 않습니다. 따라서 아래 예제와 같이 sanitize_schema_for_parquet
를 사용하여 잠재적 문제를 방지하는 것이 좋습니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
from transforms.api import transform, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet @transform( output=Output("/Company/sourceA/parsed/data"), # 출력 경로 설정 raw=Input("/Company/sourceA/raw/data_csv"), # 입력 경로 설정 ) def read_csv(ctx, raw, output): filesystem = raw.filesystem() # 파일 시스템 접근 hadoop_path = filesystem.hadoop_path # Hadoop 경로 획득 files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()] # 파일 목록 획득 df = ( ctx .spark_session # Spark 세션 시작 .read # 데이터 읽기 시작 .option("encoding", "UTF-8") # 인코딩을 UTF-8로 설정. 기본값은 UTF-8입니다. .option("header", True) # 첫 번째 행을 헤더로 설정 .option("inferSchema", True) # 스키마 자동 인식 설정 .csv(files) # csv 파일로부터 데이터 프레임 생성 ) output.write_dataframe(sanitize_schema_for_parquet(df)) # 데이터 프레임을 parquet 형식으로 쓰기
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# 필요한 라이브러리를 임포트합니다. from transforms.api import transform, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet # transform 데코레이터를 이용해 함수를 선언합니다. 이 함수는 입력으로 받은 JSON 파일을 읽어 데이터프레임으로 변환하고, # 이를 parquet 형식으로 쓰는 역할을 합니다. @transform( output=Output("/Company/sourceA/parsed/data"), # 출력 경로 raw=Input("/Company/sourceA/raw/data_json"), # 입력 경로 ) def read_json(ctx, raw, output): filesystem = raw.filesystem() # 파일 시스템 인스턴스를 가져옵니다. hadoop_path = filesystem.hadoop_path # Hadoop 경로를 가져옵니다. files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()] # Hadoop 경로에 있는 모든 파일을 리스트로 가져옵니다. df = ( ctx .spark_session # Spark 세션을 시작합니다. .read # 읽기 작업을 시작합니다. .option("multiline", False) # 각 파일이 개행으로 구분된 여러 JSON 객체를 포함하면 False를, 한 파일에 하나의 JSON 객체만 있다면 True를 사용합니다. .json(files) # JSON 파일을 읽어 데이터프레임으로 변환합니다. ) # sanitize_schema_for_parquet 함수를 이용해 스키마를 정리하고, 이를 parquet 형식으로 씁니다. output.write_dataframe(sanitize_schema_for_parquet(df))