데이터 통합파이프라인 빌딩Pipelines on unstructured dataCSV 또는 JSON 파일에 대한 스키마 추론

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

CSV 또는 JSON 파일에 대한 스키마 추론

데이터셋을 Foundry에서 작업하는 것이 가장 쉽습니다. Foundry는 데이터셋에 CSV 또는 JSON 파일이 포함되어 있을 경우 수동으로 스키마를 추가할 수 있도록 스키마 적용 버튼을 제공합니다. 스키마 적용 버튼은 데이터의 일부를 기반으로 자동으로 스키마를 추론합니다. 스키마가 적용되면 데이터셋 보기에서 스키마 편집을 선택하여 열 유형을 수정하거나 추가 파싱 옵션을 적용하여 불규칙한 행을 제거하고, 인코딩을 변경하거나, 파일 경로, 행의 바이트 오프셋, 가져오기 타임스탬프 또는 행 번호와 같은 추가 열을 추가할 수 있습니다.

초기 데이터셋의 파일을 기반으로 정적으로 적용된 스키마는 데이터가 변경되면 최신 상태가 아닐 수 있습니다. 따라서 반정형 데이터에 대한 변환 파이프라인의 첫 번째 단계로 스파크가 동적으로 스키마를 추론하도록 하는 것이 도움이 될 수 있습니다.

파이프라인 빌드마다 동적으로 스키마를 추론하는 것은 성능 비용이 들기 때문에 이 기술은 드물게 사용해야 합니다(예를 들어, 스키마가 변경될 수 있는 경우).

아래에는 CSV 및 JSON 입력값에 대한 예제가 있습니다.

Parquet은 변환의 기본 출력 파일 형식입니다. 이 형식은 자동으로 추론된 스키마에 있는 특정 특수 문자를 허용하지 않습니다. 따라서 아래 예제와 같이 sanitize_schema_for_parquet를 사용하여 잠재적 문제를 방지하는 것이 좋습니다.

CSV

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from transforms.api import transform, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet @transform( output=Output("/Company/sourceA/parsed/data"), # 출력 경로 설정 raw=Input("/Company/sourceA/raw/data_csv"), # 입력 경로 설정 ) def read_csv(ctx, raw, output): filesystem = raw.filesystem() # 파일 시스템 접근 hadoop_path = filesystem.hadoop_path # Hadoop 경로 획득 files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()] # 파일 목록 획득 df = ( ctx .spark_session # Spark 세션 시작 .read # 데이터 읽기 시작 .option("encoding", "UTF-8") # 인코딩을 UTF-8로 설정. 기본값은 UTF-8입니다. .option("header", True) # 첫 번째 행을 헤더로 설정 .option("inferSchema", True) # 스키마 자동 인식 설정 .csv(files) # csv 파일로부터 데이터 프레임 생성 ) output.write_dataframe(sanitize_schema_for_parquet(df)) # 데이터 프레임을 parquet 형식으로 쓰기

JSON

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 필요한 라이브러리를 임포트합니다. from transforms.api import transform, Input, Output from transforms.verbs.dataframes import sanitize_schema_for_parquet # transform 데코레이터를 이용해 함수를 선언합니다. 이 함수는 입력으로 받은 JSON 파일을 읽어 데이터프레임으로 변환하고, # 이를 parquet 형식으로 쓰는 역할을 합니다. @transform( output=Output("/Company/sourceA/parsed/data"), # 출력 경로 raw=Input("/Company/sourceA/raw/data_json"), # 입력 경로 ) def read_json(ctx, raw, output): filesystem = raw.filesystem() # 파일 시스템 인스턴스를 가져옵니다. hadoop_path = filesystem.hadoop_path # Hadoop 경로를 가져옵니다. files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()] # Hadoop 경로에 있는 모든 파일을 리스트로 가져옵니다. df = ( ctx .spark_session # Spark 세션을 시작합니다. .read # 읽기 작업을 시작합니다. .option("multiline", False) # 각 파일이 개행으로 구분된 여러 JSON 객체를 포함하면 False를, 한 파일에 하나의 JSON 객체만 있다면 True를 사용합니다. .json(files) # JSON 파일을 읽어 데이터프레임으로 변환합니다. ) # sanitize_schema_for_parquet 함수를 이용해 스키마를 정리하고, 이를 parquet 형식으로 씁니다. output.write_dataframe(sanitize_schema_for_parquet(df))