아래의 지시사항은 간단한 Python 데이터 변환을 단계별로 설명합니다. 데이터 변환을 처음 시작하는 경우, 먼저 Pipeline Builder 또는 Code Repositories의 배치 파이프라인 튜토리얼을 진행해 보는 것을 고려해 보세요.
이 튜토리얼은 최근의 운석 발견에 대한 스프레드시트를 분석 가능한 데이터셋으로 변환하는 방법을 Transforms Python을 사용하여 설명합니다.
이 튜토리얼은 NASA의 Open Data Portal에서 제공하는 데이터를 사용합니다. 다음의 샘플 데이터셋으로 직접 Code Repository를 따라해 볼 수 있습니다:
이 데이터셋은 지구에서 발견된 운석에 대한 데이터를 포함하고 있습니다. 데이터는 작업하기 쉽도록 정리되어 있습니다.
데이터셋에는 각 운석의 이름, 질량, 분류, 그리고 다른 식별 정보가 포함되어 있으며, 발견된 연도와 발견된 위치의 좌표 또한 포함되어 있습니다. Foundry에 업로드하기 전에 CSV를 열어서 데이터를 미리 검토하는 것이 좋습니다.
Python Code Repositories를 생성하여 시작하세요.
또는, 다음의 단계를 따라서 로컬 Python 저장소를 Code Repositories로 복사할 수도 있습니다:
git remote remove origin
git remote add origin <repository_url>
GitHub 인터페이스의 오른쪽 상단에서 Code Repositories URL을 찾을 수 있습니다. 초록색 Clone 버튼을 선택한 후 Git 원격 URL을 복사합니다.
git remote -v
를 실행하여 Code Repositories URL을 확인합니다.
master
브랜치 (또는 선택한 다른 브랜치)를 로컬 브랜치로 병합합니다: git merge master
refusing to merge unrelated histories
에 대한 오류가 발생하면, 명령어 git merge master --allow-unrelated-histories
를 실행합니다. 이렇게 하면 이전 원격 GitHub 저장소와 관련된 현재 Git 히스토리가 제거됩니다.
이 병합은 Code Repositories에서 커밋과 변경을 만들기 위해 필요한 핵심 파일들을 로컬 저장소로 가져옵니다.
develop
): git checkout develop
.git push
를 실행하고, 새로운 브랜치가 Code Repositories 인터페이스에 표시되는지 확인합니다. 검사가 성공적으로 완료되었는지 확인합니다.Code Repositories에서 로컬 개발에 대해 더 알아보세요.
Transforms Python 저장소로 이동합니다. 기본 examples.py
파일에는 시작하는 데 도움이 되는 예제 코드가 포함되어 있습니다.
src/myproject/datasets
에 새 파일을 생성하고, 분석을 구성하기 위해 meteor_analysis.py
라고 이름을 지정하세요. 필요한 함수와 클래스를 반드시 가져와야 합니다. meteor_landings
데이터셋을 입력으로 받아 meteor_landings_cleaned
를 결과물로 만드는 변환을 정의하세요:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F @transform_df( # 출력 데이터셋 경로를 여기에 입력하세요 Output("/Users/jsmith/meteorite_landings_cleaned"), # 입력 데이터셋 경로를 여기에 입력하세요 meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(meteorite_landings): # 여기에 데이터 변환 로직을 입력하세요
이제 입력 데이터셋을 1950년 이후에 발생한 "유효한" 메테오라이트로 필터링하려고 가정해 봅시다. nametype
과 year
로 메테오라이트를 필터링하기 위해 데이터 변환 로직을 업데이트합니다:
Copied!1 2 3 4 5 6 7 8
def clean(meteorite_landings): # 'Valid'라는 이름 유형을 가진 기록만 필터링 return meteorite_landings.filter( meteorite_landings.nametype == 'Valid' # 1950년 이후의 기록만 필터링 ).filter( meteorite_landings.year >= 1950 )
결과물 데이터셋을 빌드하려면 변경 사항을 커밋하고 데이터셋을 빌드하기 위해 이동합니다. Code Repositories에서 데이터셋을 빌드하는 방법에 대한 자세한 내용은 간단한 배치 파이프라인 생성하기 튜토리얼을 참조하세요.
Python Transforms를 사용하면 하나의 Python 파일에서 여러 개의 결과물 데이터셋을 생성할 수 있습니다.
예를 들어, 각 운석 유형에 대해 특히 큰 운석만 필터링하려는 경우 다음 작업이 필요합니다.
먼저, meteor_analysis.py
에 각 운석 유형의 평균 질량을 찾는 데이터 변환을 추가합니다. 이 변환은 meteor_landings
데이터셋을 입력으로 사용하고 meteorite_stats
를 결과물로 생성합니다:
Copied!1 2 3 4 5 6 7 8 9 10
@transform_df( # 출력 데이터셋 이름은 고유해야 합니다. Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(meteorite_landings): # "class"에 따라 그룹화하고, 각 그룹의 "mass" 평균을 계산합니다. 계산 결과의 이름을 "avg_mass_per_class"로 지정합니다. return meteorite_landings.groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") )
다음으로, 각 운석의 질량을 해당 운석 유형의 평균 질량과 비교하는 데이터 변환을 생성합니다. 이 변환에 필요한 정보는 지금까지 이 튜토리얼에서 생성한 meteorite_landings
및 meteorite_stats
표에 분산되어 있습니다. 두 데이터셋을 함께 결합한 다음 평균 이상의 질량을 가진 운석을 찾기 위해 필터링해야 합니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# 이 데이터 변환은 두 개의 입력 데이터셋에 기반합니다. @transform_df( Output("/Users/jsmith/meteorite_enriched"), # meteorite_landings 데이터셋 경로 meteorite_landings=Input("/Users/jsmith/meteorite_landings"), # meteorite_stats 데이터셋 경로 meteorite_stats=Input("/Users/jsmith/meteorite_stats") ) def enriched(meteorite_landings, meteorite_stats): # meteorite_landings와 meteorite_stats를 'class' 기준으로 결합합니다. enriched_together=meteorite_landings.join( meteorite_stats, "class" ) # greater_mass 컬럼을 추가하고, 해당 행의 mass가 avg_mass_per_class보다 큰 경우 True로 설정합니다. greater_mass=enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) # greater_mass가 True인 행들만 필터링하여 반환합니다. return greater_mass.filter("greater_mass")
이제 Contour에서 결과물인 meteorite_enriched
데이터셋을 탐색하여 더 자세히 분석할 수 있습니다.
지금까지 평균 질량보다 큰 모든 유형의 운석을 포함하는 데이터셋을 생성했습니다. 각 운석 유형별로 별도의 데이터셋을 생성하려면 어떻게 해야 할까요? Transforms Python을 사용하면 for loop를 사용하여 같은 데이터 변환을 각 운석 유형에 적용할 수 있습니다. 같은 데이터 변환을 다른 입력에 적용하는 방법에 대한 자세한 정보는 Transform generation 섹션을 참조하세요.
src/myproject/datasets
에 새 파일을 생성하고 meteor_class.py
라고 이름을 지으세요. 데이터 변환 코드를 계속해서 meteor_analysis.py
파일에 작성할 수 있지만, 이 튜토리얼에서는 데이터 변환 로직을 분리하기 위해 새 파일을 사용합니다.
각 운석 유형별로 별도의 데이터셋을 생성하기 위해, meteorite_enriched
데이터셋을 클래스별로 필터링할 것입니다. 분석하고자 하는 각 운석 유형에 동일한 데이터 변환 로직을 적용하는 transform_generator
함수를 정의하세요:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from transforms.api import transform_df, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform_df( # 이 코드는 각각의 운석 유형에 대해 다른 출력 데이터셋을 생성합니다. Output('/Users/jsmith/meteorite_{source}'.format(source=source)), my_input=Input('/Users/jsmith/meteorite_enriched') ) # "source=source"는 이 함수의 범위에서 source 변수의 값을 캡처합니다. def filter_by_source(my_input, source=source): return my_input.filter(my_input["class"] == source) transforms.append(filter_by_source) return transforms # 이 코드는 위의 데이터 변환 로직을 제공된 세 가지 운석 유형에 적용합니다. TRANSFORMS = transform_generator(["L6", "H5", "H4"])
이것은 우리의 운석 데이터셋을 클래스별로 필터링하는 변환을 생성합니다. 함수의 범위에서 source
파라미터를 캡처하기 위해 filter_by_source
함수에 source=source
를 전달해야 한다는 점을 주의하세요.
meteor_analysis.py
파일에서 생성된 초기 데이터 변환의 경우, 프로젝트의 Pipeline에 Transforms를 추가하기 위해 추가적인 설정을 할 필요가 없었습니다. 이는 기본 Python 프로젝트 구조가 자동 등록을 사용하여 datasets
폴더 내의 모든 Transform 오브젝트를 찾기 때문입니다.
자동 등록을 사용하여 이 마지막 변환을 프로젝트의 Pipeline에 추가하려면 생성된 변환을 목록으로 변수에 추가해야 합니다. 위의 예에서는 TRANSFORMS
라는 변수를 사용했습니다. 자동 등록 및 변환 생성기에 대한 자세한 정보는 Transforms Python 문서의 Transforms generation 섹션을 참조하세요.