이 콘텐츠는 learn.palantir.com ↗에서도 제공되며 접근성 목적으로 여기에 제시되어 있습니다.
교육 목적으로, 원시 항공 경보 및 매핑 데이터셋을 사용하여 데이터 변환 소개 자습서에서 수행한 것처럼 passenger_flight_alerts_csv_raw
및 passengers_json_raw
데이터셋의 사본을 datasource 프로젝트 폴더에 생성하기 위해 기본 PySpark 코드를 사용합니다. 결과적으로 승객 datasource 세그먼트 파이프라인에 전용 시작 지점이 생성되지만 일반적으로 Data Connection Sync를 사용하여 소스 파일을 가져온 다음 가져온 데이터셋을 입력으로 사용합니다.
원시 파일로 작업하고 있기 때문에 단순한 PySpark identity 변환으로는 충분하지 않습니다. @transform()
데코레이터와 고유한 Python 라이브러리에 의존하는 특별한 코드를 구현해야 합니다.
⚠️ 출력물을 생성하는데 데이터 소스가 위치한 Foundry Training and Resources 프로젝트가 아닌 다른 위치에서 출력물을 생성하기 때문에, 변환 파일에서 datasource 경로를 조정하고 여기에 설명된 대로 프로젝트 참조를 사용해야 합니다.
화면 왼쪽 상단의 Code Repositories 파일 섹션에서 폴더 구조를 확장하고, /src
아래의 /datasets
폴더에서 마우스 오른쪽 버튼을 클릭하여 새 폴더를 생성하고 /raw
로 명명합니다.
새로운 /raw
폴더에 다음 두 개의 Python 파일을 생성합니다:
passengers_raw.py
passenger_flight_alerts_raw.py
passenger_flight_alerts_raw.py
파일을 열고 6행의 출력 파일 경로에서 대/소문자 구분 공간을 확인합니다. 예를 들어 Output("/thisIsTheSpace/Foundry Training and Resources/...
에디터의 모든 기본 코드를 아래 코드 블록으로 대체합니다. 단, 학습 프로젝트 경로와 자습서 아티팩트 폴더에 맞게 입력 및 출력 경로를 수정해야 합니다.
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passenger_flight_alerts_raw"),
raw_file_input=Input("/${space}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passenger_flight_alerts_csv_raw"),
)
def compute(output, raw_file_input):
"""
입력 및 출력 데이터셋의 파일시스템에 대한 변수 생성
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
이 함수는 파일 메타데이터 행을 입력으로 사용합니다.
이 함수는 입력 데이터셋에서 출력 데이터셋으로 파일을 복사합니다.
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
입력 데이터셋 파일시스템의 모든 것에 대한 경로 및 기타 파일 메타데이터를 포함하는 데이터프레임 생성
이 데이터프레임에서 각 행은 입력 데이터셋의 단일 원시 파일을 나타냅니다.
"""
files_df = input_filesystem.files()
"""
Spark에서 병렬로 copy_file_without_doing_anything 실행
이 코드는 드라이버가 아닌 Spark를 활용하기 때문에 잘 확장됩니다.
"""
files_df.foreach(copy_file_without_doing_anything)
코드를 복사한 후 6행 및 7행의 ${space}
를 단계 4에서 확인한 공간으로 변경합니다.
6행의 ${yourName}
을 귀하의 /Tutorial Practice Artifacts
폴더 이름으로 변경합니다(예: .../Foundry Reference Project/Tutorial Practice Artifacts/jmeier/...
).
passengers_raw.py
파일에서 단계 3-6을 반복하고 기본 코드를 아래 코드 블록으로 대체합니다.
from transforms.api import transform, Input, Output
from shutil import copyfileobj
@transform(
output=Output("/${space}/Temporary Training Artifacts/${yourName}/Data Engineering Tutorials/Datasource Project: Passengers/data/raw/passengers_raw"),
raw_file_input=Input("/${space}/Foundry Training and Resources/Example Projects/[Datasource] Passengers/datasets/raw/passengers_json_raw"),
)
def compute(output, raw_file_input):
"""
입력 및 출력 데이터셋의 파일시스템에 대한 변수 생성
"""
input_filesystem = raw_file_input.filesystem()
output_filesystem = output.filesystem()
"""
이 함수는 파일 메타데이터 행을 입력으로 사용합니다.
이 함수는 입력 데이터셋에서 출력 데이터셋으로 파일을 복사합니다.
"""
def copy_file_without_doing_anything(files_df_row):
with input_filesystem.open(files_df_row.path, 'rb') as f:
with output_filesystem.open(files_df_row.path, 'wb') as o:
copyfileobj(f, o)
"""
입력 데이터셋 파일시스템의 모든 것에 대한 경로 및 기타 파일 메타데이터를 포함하는 데이터프레임 생성
이 데이터프레임에서 각 행은 입력 데이터셋의 단일 원시 파일을 나타냅니다.
"""
files_df = input_filesystem.files()
"""
Spark에서 병렬로 copy_file_without_doing_anything 실행
이 코드는 드라이버가 아닌 Spark를 활용하기 때문에 잘 확장됩니다.
"""
files_df.foreach(copy_file_without_doing_anything)
ℹ️ 이 자습서에서는 방금 복사한 코드의 세부 내용을 검토하지 않습니다. 하지만 코드 주석을 읽고 Python 원시 파일 액세스에 대한 문서를 참조하시기 바랍니다.
화면 오른쪽 상단의 미리보기 버튼을 클릭하여 출력물을 미리 봅니다. 미리보기 창에서 입력 파일 구성하라는 메시지가 표시되면 구성...
을 클릭하고 다음 화면에서 passenger_flight_alerts.csv
옆의 상자를 선택합니다.
그런 다음 오른쪽의 파란색 저장 및 미리보기 버튼을 클릭합니다. 미리보기의 결과는 행과 열의 일련이 아니라 원시 CSV 파일의 사본이 생성되는 것을 확인합니다.
의미 있는 메시지(예: feature: add raw transform files)로 코드를 커밋합니다.
feature 브랜치에서 두 데이터셋을 빌드합니다.
빌드가 완료되면 이전 자습서에서 설명한 프로세스를 사용하여 두 변환 파일의 데이터셋 경로를 RID로 교체하십시오.
이전 자습서에서 설명한 PR 프로세스를 사용하여 코드를 Master
브랜치로 병합합니다.
Master
브랜치에서 두 원시 출력을 빌드합니다.