注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
ウィンドウ関数を使用すると、集約に関与していない行を失うことなく、データフレームの行に集約や他の値を追加することができます。
transactions
データセットがあり、それぞれが customer
の取引を表していて、各顧客の average_spend
を計算して各取引に追加したいとしましょう。これを結合を使用して行うためには、平均を得るためにグループ化を行い、その後オリジナルのテーブルに戻して結合する必要があります:
Copied!1 2 3 4 5 6 7 8 9 10
# 'transactions'から'customer'をキーにしてグループ化します averages = transactions\ .groupBy('customer')\ .agg( # 各'customer'の'spend'の平均値を計算し、'average_spend'という名前をつけます F.avg('spend').alias('average_spend') ) # 'averages'を'transactions'に左外部結合します。結合キーは'customer'です transactions = transactions.join(averages, ['customer'], 'left_outer')
もし最大の支出を取得したい場合、このロジックはさらに複雑になります。なぜなら、平均ではなく最大値を計算し、その最大値に戻すための結合が必要になるからです。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# 'transactions'から'customer'ごとにグループ化します maximums = transactions\ .groupBy('customer')\ # その後、各顧客の平均'spend'の最大値を求めます .max( F.avg('spend').alias('max_spend') ) # それぞれのトランザクションについて、顧客と最大の消費額が一致する場合に限り、 # 'maximums'と結合します(左外部結合) transactions = transactions\ .join( averages, (transactions.customer == maximums.customer) &\ (transactions.spend == maximums.max_spend), 'left_outer' # 結合後、'maximums.customer'列は不要になるので、削除します ).drop(maximums.customer)
しかし、ウィンドウ関数を使用すると、まずウィンドウを定義し、そのウィンドウ「上」で集約を計算することにより、このコードを簡略化できます:
Copied!1 2 3 4 5 6 7 8 9 10 11
from pyspark.sql.window import Window # ウィンドウを定義し、顧客ごとにパーティションを作成し、支出額で並び替えます。 window = Window()\ .partitionBy('customer')\ .orderBy('spend') # 取引データに、顧客ごとの平均支出額と最大支出額の列を追加します。 transactions = transactions\ .withColumn('average_spend', F.avg('spend').over(window)) # 平均支出額の列を追加 .withColumn('max_spend', F.max('spend').over(window)) # 最大支出額の列を追加
さらに、ウィンドウでのみ使用できるいくつかの関数があります。これらはウィンドウ関数として知られており、次のセクションで説明されています。
dense_rank()
lag(col, count=1, default=None)
lead(col, count=1, default=None)
ntile(n)
percent_rank()
rank()
row_number()
window(timeColumn, windowDuration, slideDuration=None, startTime=None)