윈도우 함수를 사용하면 사용자가 데이터프레임의 행에 집계 및 기타 값들을 추가할 수 있으며, 집계에 참여하지 않는 열을 잃지 않게 됩니다.
transactions
데이터셋이 있고 각 customer
가 한 거래를 가지고 있으며, 각 고객의 average_spend
를 계산하고 각 거래에 추가하고 싶다고 가정해봅시다. 조인을 사용하여 이를 수행하려면 평균을 얻기 위해 그룹화를 수행한 다음 원본 표에 다시 조인해야 합니다:
Copied!1 2 3 4 5 6 7 8 9
# 거래내역을 고객별로 그룹화합니다. averages = transactions\ .groupBy('customer')\ .agg( # 각 고객의 평균 지출액을 계산합니다. F.avg('spend').alias('average_spend') ) # 평균 지출액 정보를 거래내역 테이블에 왼쪽 외부 조인합니다. transactions = transactions.join(averages, ['customer'], 'left_outer')
최대 지출을 얻고 싶다면, 이 로직은 최대값을 계산하고 그 최대값에 다시 조인해야 하므로 더욱 복잡해집니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# 고객별로 그룹화하여 평균 지출('spend')의 최대값을 계산합니다. maximums = transactions\ .groupBy('customer')\ .max( F.avg('spend').alias('max_spend') # 'max_spend'라는 별칭으로 평균 지출의 최대값을 찾습니다. ) # transactions 데이터와 maximums 데이터를 조인합니다. # 조인 조건은 고객 ID가 같고, 지출이 최대 지출과 같아야 합니다. # 'left_outer' 조인을 사용하므로 transactions의 모든 행과 일치하는 maximums의 행이 결과에 포함됩니다. transactions = transactions\ .join( averages, (transactions.customer == maximums.customer) &\ (transactions.spend == maximums.max_spend), 'left_outer' ).drop(maximums.customer) # 조인 후에는 maximums의 고객 ID 열을 삭제합니다.
윈도우 함수를 사용하면 먼저 윈도우를 정의한 다음 윈도우 "위"에서 집계를 계산하여 이 코드를 단순화할 수 있습니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
from pyspark.sql.window import Window # Window 함수를 사용하여 고객별로 데이터를 분할하고, 소비액에 따라 정렬합니다. window = Window()\ .partitionBy('customer')\ .orderBy('spend') # 새로운 컬럼 'average_spend'와 'max_spend'를 생성합니다. # 'average_spend'는 고객별 평균 소비액을, 'max_spend'는 고객별 최대 소비액을 나타냅니다. transactions = transactions\ .withColumn('average_spend', F.avg('spend').over(window)) .withColumn('max_spend', F.max('spend').over(window))
또한, 윈도우와 함께만 사용할 수 있는 여러 가지 기능들이 있습니다. 이들은 Window Functions로 알려져 있으며 다음 섹션에서 설명하겠습니다.
dense_rank()
lag(col, count=1, default=None)
lead(col, count=1, default=None)
ntile(n)
percent_rank()
rank()
row_number()
window(timeColumn, windowDuration, slideDuration=None, startTime=None)