데이터 통합PythonPySpark Reference개요

본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

개요

PySpark는 Apache Spark 백엔드와 인터페이스하여 데이터를 빠르게 처리할 수 있는 래퍼 언어입니다. Spark는 매우 큰 데이터셋에서 분산 네트워크 서버간에 작동할 수 있으며, 올바르게 사용되면 성능과 신뢰성 측면에서 큰 이점을 제공합니다. 그러나 관계형 데이터베이스 시스템인 SQL과 같은 시스템에 익숙한 경우 일부 제한 사항이 있을 수 있습니다. 예를 들어 Spark는 행이 어떤 서버에 있는지 정확히 알 수 없으므로 특정 행을 직접 선택하여 업데이트하거나 삭제할 수 없습니다. 데이터베이스를 이런 방식으로 생각하는 데 익숙하다면 데이터셋을 전체로 생각하고 행이 아닌 열을 기준으로 데이터를 처리하는 개념 모델로 조정해야 합니다.

  • DataFrame: 명명된 열 아래의 행 모음
    • 구조적으로는 SQL 데이터베이스와 비슷하지만 비관계형입니다.
    • 불변: DataFrame은 생성된 후 변경할 수 없지만, 새로운 DataFrame으로 변환할 수 있습니다(결과적으로 원본 DataFrame과 변환된 DataFrame 2개가 됩니다). 데이터셋은 덮어쓸 수 있지만, Foundry는 버전 기록을 추적하여 이전 빌드를 탐색하고 언제든지 되돌릴 수 있습니다.
    • 지연 평가: 일련의 변환 작업이 단일(결합된) 액션으로 평가되며, 빌드가 트리거될 때 수행됩니다.
  • Resilient Distributed Datasets: (RDD)는 DataFrame의 기본 데이터 구조입니다. DataFrame을 여러 개의 교차하지 않는 부분 집합으로 분할함으로써, 클러스터(네트워크)의 여러 컴퓨터(노드)에서 변환을 병렬로 평가할 수 있습니다. 이 모든 작업은 내부적으로 수행되지만, PySpark에서 작성할 때는 이를 염두에 두어야 합니다.
  • 공유 변수: 기본적으로 Spark는 효율성을 위해 변환 작업에 사용되는 각 변수의 별도로 관리되는 복사본을 각 병렬 컴퓨터(노드)에 전송합니다. 작업 간에 변수를 공유해야 하는 경우 Spark는 두 가지 유형의 공유 변수를 지원합니다:
    1. Broadcast 변수: 클러스터 내 모든 컴퓨터(노드)에 전송되어 메모리(RAM)에 캐시(저장)되는 값입니다.
    2. Accumulators: 더하거나 집계할 수 있는 변수로, 카운터와 합계 등을 포함합니다. 이 개념은 GroupedData와 관련이 있으며, 통계 계산에 유용합니다.
  • 왜 DataFrame을 사용하나요?: Spark DataFrame은 대량의 구조화된 데이터(페타바이트 이상)를 처리하도록 설계되고 최적화되어 있습니다.
  • 왜 PySpark 코드를 작성해야 하나요?: PySpark를 사용하면 Code Repositories 및 Code Workbook에서 데이터셋을 변환하는 방법을 Contour 또는 Blacksmith만 사용하는 것보다 더 복잡하고 유연하게 사용자 정의할 수 있습니다.
  • PySpark의 용도가 아닌 것은 무엇인가요?: PySpark는 데이터셋을 변환하는 데 사용되도록 설계되었으며, 개별 값을 직접 참조할 수 없습니다. 합계와 평균을 계산할 수는 있지만, 데이터를 직접 참조해서는 안 되고 할 수도 없습니다.

SQL과 달리 쿼리가 "뷰"(가상 테이블 결과 세트)를 생성하는 반면, PySpark로 데이터셋을 처리하면 완전히 새로운 데이터셋이 생성됩니다. 이를 통해 파생된 데이터셋을 기반으로 새 데이터셋을 빌드할 뿐만 아니라, 조직의 다른 구성원도 중간 데이터셋을 자신의 데이터 처리 작업에 재사용할 수 있습니다. Palantir Foundry에서는 데이터 운영 시스템에서 데이터셋이 자동으로 부모-자식(또는 원본-결과) 방향성 트리 관계를 통해 연결됩니다. 이를 통해 누구나 Spark 변환의 Data Lineage를 추적하기 쉽게 할 수 있습니다. 즉, 데이터셋 종속성이 어떻게 구축되고 어디서 나오는지를 탐색할 수 있습니다. 또한 조직의 다른 구성원들이 데이터셋을 어떻게 사용했는지도 알 수 있어 예제에서 배울 수 있거나 중복 작업을 효과적으로 줄일 수 있습니다.

PySpark 코드 이해하기

시작 코드 기본 사항

Code Workbook에서 함수는 다음과 같이 보일 수 있습니다:

Copied!
1 2 3 4 def new_frame(old_frame): df = old_frame # df에 변환 작업을 수행 return df
  • old_frame: Foundry에 저장된 데이터셋을 나타내는 DataFrame을 참조합니다. old_frame은 이 new_frame 함수 내에서 수정할 수 없는 불변입니다. 어떤 면에서, 모든 중간 변환 단계는 새로운 불변 데이터 프레임을 생성하며, 이를 다시 변환하거나 그대로 반환할 수 있습니다. 이것이 완전히 사실은 아니지만, 인지 모델로서 코드 구성에 도움이 됩니다.
  • new_frame: 이 함수 내에서 old_frame에 적용하고자 하는 일련의 변환을 정의할 수 있습니다. return 문은 DataFrame을 반환해야 합니다(이 예제에서는 df라고 했습니다). 내부적으로, DataFrame에 적용한 모든 변환은 결합되고 최적화되어 입력 데이터셋에 적용됩니다. 코드로 빌드를 트리거하면 결과가 Foundry의 새로운 데이터셋 파일에 저장되며, 빌드 완료 후 탐색할 수 있습니다.

DataFrame 내의 데이터는 ArrayDictionary가 아니므로 직접 참조할 수 없습니다. 실제로, 모든 파티셔닝과 셔플링 때문에 어느 시점에 어떤 데이터가 어디에 있는지 결정하는 것은 불가능합니다. 데이터셋 필터링이나 집계를 하지 않는 한, 작성하는 코드는 데이터셋 내용에 상대적으로 무관해야 합니다. 정렬은 일반적으로 비용이 많이 들고 느리므로 모든 행이 무작위로 정렬되어 있다고 가정하고, 열, 필터, 집계, 창의적인 문제 해결 기술에 툴셋을 제한하세요.

경고

PySpark는 타입이 안전하지 않아 모든 변환 작업을 평가하려고 하고, 런타임 중 작업이 실패할 때 중단합니다. 따라서 열 스키마를 추적하는 것이 매우 중요합니다.

문자열이나 날짜에 수학 함수를 수행하거나 숫자에 대한 문자열 작업을 하거나 정수에 대한 날짜 조작을 하지 마세요. 충돌하는 유형의 동작은 예측하기 어렵습니다.

작업을 수행하기 전에 값의 유형을 올바른 유형으로 캐스트하세요.

명명된 열

DataFrame의 각 열은 이름이 지정되어 있고 (및 이름을 변경할 수 있습니다). 열 이름은 고유하고 대소문자를 구분합니다. Foundry 데이터셋에 대해 다음 가이드라인을 따르세요:

  • 항상 소문자 또는 숫자를 사용하세요.
  • _ (밑줄)로 단어를 구분하세요(공백은 허용되지 않기 때문입니다).
  • 관례상 camelCasedColumnNames를 피하세요.
  • (, ), &와 같은 특수 문자를 사용하지 마세요.
  • 집계 함수는 때때로 열에 특수 문자가 있는 이름을 자동으로 제공합니다. 변환에서 데이터 프레임을 반환하기 전에 별칭을 제공하거나 열 이름을 변경해야 합니다.

변환 연결

기존 코드로 들어가면 DataFrame을 참조하는 변수 이름에 대해 엄격한 규칙이 없다는 것을 알게 될 것입니다. 이 치트 시트에서는 DataFrame을 df로 참조하지만, 다른 예제에서는 raw, out, input, table, something_specific일 수 있습니다. 일이 완료되는 한 무엇이든 가능합니다.

또한 이 패턴에 유의하세요:

Copied!
1 2 3 4 5 6 7 8 9 10 # 데이터프레임의 "firstName"과 "age" 열을 선택합니다. df = df.select("firstName", "age") # "age" 열의 데이터 타입을 정수(integer)로 변환합니다. df = df.withColumn("age", df.age.cast("integer")) # 21세 이상의 데이터만 필터링합니다. df = df.filter(df.age > 21) # "firstName" 열의 이름을 "first_name"으로 변경합니다. df = df.withColumnRenamed("firstName", "first_name") # 결과 데이터프레임을 반환합니다. return df

또는 (같은 내용을 다르게 표현한 것):

Copied!
1 2 3 4 5 6 7 8 # 데이터 프레임(df)에서 "firstName"과 "age" 열을 선택합니다. return df.select("firstName", "age") \ # "age" 열의 데이터 타입을 "integer"로 변환합니다. .withColumn("age", df.age.cast("integer")) \ # "age" 열의 값이 21보다 큰 데이터만 필터링합니다. .filter(df.age > 21) \ # "firstName" 열의 이름을 "first_name"으로 변경합니다. .withColumnRenamed("firstName", "first_name")

코딩에 익숙하지 않은 경우: = 왼쪽의 df는 오른쪽의 df에 적용된 변환 결과가 저장되는 곳입니다. 이 예에서는 각 단계 후에 df가 포함하는 것을 재정의하여 동일한 이름의 변수에 결과를 저장했습니다. DataFrame 변환 결과를 저장할 다른 이름을 사용할 수 있지만 대부분의 경우 변수 이름을 재정의하고 진행하는 것이 괜찮습니다. 각 변환 함수의 끝에서는 새 데이터프레임을 변수로 반환하거나(첫 번째 예에서) 마지막 변환의 결과로 반환해야 합니다(두 번째 예에서).

두 예제 모두 동일한 작업을 수행합니다:

  1. 변환된 데이터셋에 포함시키려는 df의 2개 열만 선택
  2. age 열을 캐스팅하여 문자열이 아닌 정수로 확인
  3. 데이터셋의 행을 필터링하여 age > 21인 항목만 포함
  4. firstName 열의 이름을 first_name으로 변경

결과 데이터셋에는 first_name, age 두 개의 열만 있고 21세 이하의 사람들은 제외됩니다. 이것이 마지막에 df에 포함되어 있으며, 이를 return하거나 더 많은 변환을 적용할 수 있습니다. 이러한 변환에 대해 다음 섹션에서 자세히 알아보겠습니다.

Foundry에서 PySpark 작성하기

Foundry에서 PySpark를 작성하는 데 사용되는 두 가지 도구: Code RepositoriesCode Workbook이 있습니다.

Code Repositories에서는 다음과 같은 import 문을 작성해야 대부분의 함수를 사용할 수 있습니다. 이 import 문은 .py 문서 맨 위에 선언해야 합니다:

Copied!
1 2 # pyspark.sql의 함수들을 F로 불러오기 from pyspark.sql import functions as F

Code Workbook에서 이미 포함된 전역 import가 있으므로 추가 구성 없이 대부분의 함수를 사용할 수 있습니다.

이 참조는 완전한 목록을 제공하지 않으며 일반적인 패턴과 모범 사례에 대한 지침을 제공하는 데 중점을 둘 것입니다. pySpark SQL 함수의 전체 목록은 공식 Apache Spark 문서를 참조할 수 있습니다.