PySpark는 Apache Spark 백엔드와 인터페이스하여 데이터를 빠르게 처리할 수 있는 래퍼 언어입니다. Spark는 매우 큰 데이터셋에서 분산 네트워크 서버간에 작동할 수 있으며, 올바르게 사용되면 성능과 신뢰성 측면에서 큰 이점을 제공합니다. 그러나 관계형 데이터베이스 시스템인 SQL과 같은 시스템에 익숙한 경우 일부 제한 사항이 있을 수 있습니다. 예를 들어 Spark는 행이 어떤 서버에 있는지 정확히 알 수 없으므로 특정 행을 직접 선택하여 업데이트하거나 삭제할 수 없습니다. 데이터베이스를 이런 방식으로 생각하는 데 익숙하다면 데이터셋을 전체로 생각하고 행이 아닌 열을 기준으로 데이터를 처리하는 개념 모델로 조정해야 합니다.
SQL과 달리 쿼리가 "뷰"(가상 테이블 결과 세트)를 생성하는 반면, PySpark로 데이터셋을 처리하면 완전히 새로운 데이터셋이 생성됩니다. 이를 통해 파생된 데이터셋을 기반으로 새 데이터셋을 빌드할 뿐만 아니라, 조직의 다른 구성원도 중간 데이터셋을 자신의 데이터 처리 작업에 재사용할 수 있습니다. Palantir Foundry에서는 데이터 운영 시스템에서 데이터셋이 자동으로 부모-자식(또는 원본-결과) 방향성 트리 관계를 통해 연결됩니다. 이를 통해 누구나 Spark 변환의 Data Lineage를 추적하기 쉽게 할 수 있습니다. 즉, 데이터셋 종속성이 어떻게 구축되고 어디서 나오는지를 탐색할 수 있습니다. 또한 조직의 다른 구성원들이 데이터셋을 어떻게 사용했는지도 알 수 있어 예제에서 배울 수 있거나 중복 작업을 효과적으로 줄일 수 있습니다.
Code Workbook에서 함수는 다음과 같이 보일 수 있습니다:
Copied!1 2 3 4
def new_frame(old_frame): df = old_frame # df에 변환 작업을 수행 return df
old_frame
: Foundry에 저장된 데이터셋을 나타내는 DataFrame을 참조합니다. old_frame
은 이 new_frame
함수 내에서 수정할 수 없는 불변입니다. 어떤 면에서, 모든 중간 변환 단계는 새로운 불변 데이터 프레임을 생성하며, 이를 다시 변환하거나 그대로 반환할 수 있습니다. 이것이 완전히 사실은 아니지만, 인지 모델로서 코드 구성에 도움이 됩니다.new_frame
: 이 함수 내에서 old_frame
에 적용하고자 하는 일련의 변환을 정의할 수 있습니다. return
문은 DataFrame을 반환해야 합니다(이 예제에서는 df
라고 했습니다). 내부적으로, DataFrame에 적용한 모든 변환은 결합되고 최적화되어 입력 데이터셋에 적용됩니다. 코드로 빌드를 트리거하면 결과가 Foundry의 새로운 데이터셋 파일에 저장되며, 빌드 완료 후 탐색할 수 있습니다.DataFrame 내의 데이터는 Array
나 Dictionary
가 아니므로 직접 참조할 수 없습니다. 실제로, 모든 파티셔닝과 셔플링 때문에 어느 시점에 어떤 데이터가 어디에 있는지 결정하는 것은 불가능합니다. 데이터셋 필터링이나 집계를 하지 않는 한, 작성하는 코드는 데이터셋 내용에 상대적으로 무관해야 합니다. 정렬은 일반적으로 비용이 많이 들고 느리므로 모든 행이 무작위로 정렬되어 있다고 가정하고, 열, 필터, 집계, 창의적인 문제 해결 기술에 툴셋을 제한하세요.
PySpark는 타입이 안전하지 않아 모든 변환 작업을 평가하려고 하고, 런타임 중 작업이 실패할 때 중단합니다. 따라서 열 스키마를 추적하는 것이 매우 중요합니다.
문자열이나 날짜에 수학 함수를 수행하거나 숫자에 대한 문자열 작업을 하거나 정수에 대한 날짜 조작을 하지 마세요. 충돌하는 유형의 동작은 예측하기 어렵습니다.
작업을 수행하기 전에 값의 유형을 올바른 유형으로 캐스트하세요.
DataFrame의 각 열은 이름이 지정되어 있고 (및 이름을 변경할 수 있습니다). 열 이름은 고유하고 대소문자를 구분합니다. Foundry 데이터셋에 대해 다음 가이드라인을 따르세요:
_
(밑줄)로 단어를 구분하세요(공백은 허용되지 않기 때문입니다).camelCasedColumnNames
를 피하세요.(
, )
, &
와 같은 특수 문자를 사용하지 마세요.기존 코드로 들어가면 DataFrame을 참조하는 변수 이름에 대해 엄격한 규칙이 없다는 것을 알게 될 것입니다. 이 치트 시트에서는 DataFrame을 df
로 참조하지만, 다른 예제에서는 raw
, out
, input
, table
, something_specific
일 수 있습니다. 일이 완료되는 한 무엇이든 가능합니다.
또한 이 패턴에 유의하세요:
Copied!1 2 3 4 5 6 7 8 9 10
# 데이터프레임의 "firstName"과 "age" 열을 선택합니다. df = df.select("firstName", "age") # "age" 열의 데이터 타입을 정수(integer)로 변환합니다. df = df.withColumn("age", df.age.cast("integer")) # 21세 이상의 데이터만 필터링합니다. df = df.filter(df.age > 21) # "firstName" 열의 이름을 "first_name"으로 변경합니다. df = df.withColumnRenamed("firstName", "first_name") # 결과 데이터프레임을 반환합니다. return df
또는 (같은 내용을 다르게 표현한 것):
Copied!1 2 3 4 5 6 7 8
# 데이터 프레임(df)에서 "firstName"과 "age" 열을 선택합니다. return df.select("firstName", "age") \ # "age" 열의 데이터 타입을 "integer"로 변환합니다. .withColumn("age", df.age.cast("integer")) \ # "age" 열의 값이 21보다 큰 데이터만 필터링합니다. .filter(df.age > 21) \ # "firstName" 열의 이름을 "first_name"으로 변경합니다. .withColumnRenamed("firstName", "first_name")
코딩에 익숙하지 않은 경우: =
왼쪽의 df
는 오른쪽의 df
에 적용된 변환 결과가 저장되는 곳입니다. 이 예에서는 각 단계 후에 df
가 포함하는 것을 재정의하여 동일한 이름의 변수에 결과를 저장했습니다. DataFrame 변환 결과를 저장할 다른 이름을 사용할 수 있지만 대부분의 경우 변수 이름을 재정의하고 진행하는 것이 괜찮습니다. 각 변환 함수의 끝에서는 새 데이터프레임을 변수로 반환하거나(첫 번째 예에서) 마지막 변환의 결과로 반환해야 합니다(두 번째 예에서).
두 예제 모두 동일한 작업을 수행합니다:
age
열을 캐스팅하여 문자열이 아닌 정수로 확인age > 21
인 항목만 포함firstName
열의 이름을 first_name
으로 변경결과 데이터셋에는 first_name
, age
두 개의 열만 있고 21세 이하의 사람들은 제외됩니다. 이것이 마지막에 df
에 포함되어 있으며, 이를 return
하거나 더 많은 변환을 적용할 수 있습니다. 이러한 변환에 대해 다음 섹션에서 자세히 알아보겠습니다.
Foundry에서 PySpark를 작성하는 데 사용되는 두 가지 도구: Code Repositories와 Code Workbook이 있습니다.
Code Repositories에서는 다음과 같은 import 문을 작성해야 대부분의 함수를 사용할 수 있습니다. 이 import 문은 .py
문서 맨 위에 선언해야 합니다:
Copied!1 2
# pyspark.sql의 함수들을 F로 불러오기 from pyspark.sql import functions as F
Code Workbook에서 이미 포함된 전역 import가 있으므로 추가 구성 없이 대부분의 함수를 사용할 수 있습니다.
이 참조는 완전한 목록을 제공하지 않으며 일반적인 패턴과 모범 사례에 대한 지침을 제공하는 데 중점을 둘 것입니다. pySpark SQL 함수의 전체 목록은 공식 Apache Spark 문서를 참조할 수 있습니다.