이 내용은 learn.palantir.com ↗에서도 확인할 수 있으며 접근성을 위해 여기에 제공됩니다.
약간의 변형을 포함하여 이전 튜토리얼에서 다룬 프로세스를 이용해 1) 이 클리닝 단계의 코드를 작성하고, 2) 결과물을 미리보고, 3) 브랜치에 빌드합니다. 클리닝 단계의 주요 기능은 매핑 파일을 항공편 경보 데이터셋에 결합하여 숫자 우선 순위 및 상태 값 대신 문자열과 동일하게 사용할 수 있게끔 합니다. 즉, Low
, Medium
, High
대신 3
, 2
, 1
값을 가진 priority
의 flight_alerts
데이터셋을 원합니다. 따라서 PySpark 변환은 여러 입력을 사용하는 고유한 구조를 가집니다.
flight_alerts_clean.py
파일을 엽니다.#
)를 제거하여 코드에서 pyspark.sql의 functions
모듈을 가져옵니다.source_df
를 별칭으로 사용하면 더 이상 충분하지 않습니다. 대신 flight_alerts
를 입력합니다.flight_alerts_preprocessed
의 RID로 바꾸어 주세요. 이는 flight_alerts_preprocessed.py
파일에서 정의된 결과물에서 얻을 수 있습니다.priority_mapping
및 status_mapping
입력에 대한 두 개의 새로운 행을 작성하고, 각각 priority_mapping_preprocessed
와 status_mapping_preprocessed
의 RID를 제공하십시오. 아래 예제에서 결과물 경로와 RID는 설명 목적으로 축소되어 있습니다. 입력 목록의 마지막에 쉼표를 남겨두십시오.Copied!1 2 3 4 5 6 7 8 9 10
@transform_df( # "../Temporary Training Artifacts/..." 경로에 출력을 설정합니다. Output("../Temporary Training Artifacts/..."), # "ri.foundry.main.dataset..."로부터 flight_alerts 데이터를 입력받습니다. flight_alerts=Input("ri.foundry.main.dataset..."), # "ri.foundry.main.dataset..."로부터 priority_mapping 데이터를 입력받습니다. priority_mapping=Input("ri.foundry.main.dataset..."), # "ri.foundry.main.dataset..."로부터 status_mapping 데이터를 입력받습니다. status_mapping=Input("ri.foundry.main.dataset..."), )
@transform_df
데코레이터의 닫힌 )
이후의 모든 것)를 아래의 코드 블록으로 교체하세요. 코드 로직에 대한 간단한 검토는 다음 작업에서 진행할 예정입니다.코드 블록
def compute(flight_alerts, priority_mapping, status_mapping):
# priority_mapping 데이터프레임을 조인하기 위해 준비
priority_mapping = priority_mapping.select(
F.col('value').alias('priority'),
F.col('mapped_value').alias('priority_value')
)
# status_mapping 데이터프레임을 조인하기 위해 준비
status_mapping = status_mapping.select(
F.col('value').alias('status'),
F.col('mapped_value').alias('status_value')
)
# flight_alerts를 priority_mapping 및 status_mapping과 조인하여 priority와 status의 인간 친화적 이름 가져오기
df = flight_alerts.join(priority_mapping, on='priority', how='left')
df = df.join(status_mapping, on='status', how='left')
# 조인 후 컬럼 선택
df = df.select(
'alert_display_name',
'flight_id',
'flight_display_name',
'flight_date',
'rule_id',
'rule_name',
'category',
F.col('priority_value').alias('priority'),
F.col('status_value').alias('status'),
)
# 미래의 워크플로에서 댓글과 사용자 이름을 저장하기 위한 빈 자리 표시자 컬럼 추가
# `None`이 타입이 없기 때문에 필요한 캐스트
df = df.withColumn('comment', F.lit(None).cast('string'))
df = df.withColumn('assignee', F.lit(None).cast('string'))
return df