사용자 정의 함수(User Defined Functions)는 PySpark에서 임의의 파이썬 코드를 사용할 수 있게 해줍니다. 예를 들어, UDF를 사용하여 데이터셋의 각 행에서 복잡한 텍스트 형식의 정보를 파싱할 수 있습니다.
선언 후에는 UDF가 concat
, date_diff
, trim
등의 PySpark 내장 함수와 유사하게 작동합니다.
직관적이지 않게도 일반적인 상황에서 데이터는 실제로 파이썬 코드로 가져오지 않습니다. PySpark를 사용하여 데이터 프레임을 조작하면, Spark 클러스터가 분산된 병렬 방식으로 최종 데이터 프레임을 얻기 위해 취해야 할 단계를 설명합니다. 이를 통해 Spark와 Foundry는 거의 무한대로 확장할 수 있지만, 클러스터 내에서 실제 데이터로 실행할 코드를 주입하기 위한 UDF 설정이 약간 도입됩니다. PySpark는 UDF 코드를 쿼리를 실행하는 각 서버에 전송합니다.
Python의 오버헤드와 비교하여 Spark의 최적화된 내장 기능으로 인해 UDF는 상대적으로 느립니다. PySpark 내장 기능을 사용하여 로직을 표현해 보십시오.
Copied!1
# "날씨 보고서: 비 55-62"
다음과 같은 날씨 형식에서 낮은 온도, 즉 55
를 얻고 싶다고 가정해 봅시다. 우리는 다음과 같은 일반적인 파이썬 함수를 작성할 수 있습니다.
Copied!1 2 3
def extract_low_temperature(weather_report): # 날씨 보고서에서 최저 기온을 추출하는 함수 return int(weather_report.split(' ')[-1].split('-')[0])
우리의 함수 extract_low_temperature
를 PySpark 쿼리에 통합하기 위해 UDF를 생성할 것입니다. UDF를 생성하는 것은 함수와 PySpark 타입 시스템에서 예상되는 반환 유형을 제공하는 것을 포함합니다.
Copied!1 2 3 4 5
# 필요한 유형을 가져옵니다 from pyspark.sql.types import IntegerType # 우리의 함수를 UDF (사용자 정의 함수)로 래핑합니다 low_temp_udf = F.udf(extract_low_temperature, IntegerType())
UDF는 사용자 정의 함수를 의미합니다. 이 코드는 extract_low_temperature라는 사용자 정의 함수를 IntegerType() 형식으로 PySpark의 UDF로 래핑하는 과정을 보여줍니다. 이렇게 하면 이 함수를 Spark 데이터프레임에 적용할 수 있습니다. 이제 UDF는 DataFrame에서 사용될 수 있으며, 인수로 전체 열을 사용할 수 있습니다.
Copied!1 2 3
# 'df' 데이터프레임에 'low'라는 새로운 컬럼을 추가합니다. # 이 'low' 컬럼은 'weather_report' 컬럼의 데이터를 'low_temp_udf' 사용자 정의 함수(UDF)를 통해 변환한 결과입니다. df = df.withColumn('low', low_temp_udf(F.col('weather_report')))
id | 날씨_보고 | 최저 |
---|---|---|
1 | 날씨 보고: 비 55-62 | 55 |
2 | 날씨 보고: 햇빛 69-74 | 69 |
3 | 날씨 보고: 구름 31-34 | 31 |
UDF는 임의의 열 인수를 취할 수 있습니다. 열 인수는 function 인수에 해당합니다.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from pyspark.sql.types import StringType # 날씨의 품질을 평가하는 함수 정의 # 온도가 70 이상이고, 바람이 불지 않으면 "good", 그렇지 않으면 "bad" 반환 def weather_quality(temperature, windy): if temperature > 70 and windy == False: return "good" else: return "bad" # 위에서 정의한 함수를 이용하여 사용자 정의 함수(UDF) 생성 # 반환 타입은 StringType()으로 설정 weather_udf = F.udf(weather_quality, StringType()) # 데이터프레임 df에 'quality'라는 새로운 컬럼 추가 # 'quality' 컬럼은 'temp'와 'wind' 컬럼을 인자로 하는 weather_udf 함수의 결과 df = df.withColumn('quality', weather_udf(F.col('temp'), F.col('wind')))
id | temp | wind | quality |
---|---|---|---|
1 | 73 | false | 좋음 |
2 | 36 | false | 나쁨 |
3 | 90 | true | 나쁨 |