注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

コンセプト:ユーザー定義関数

ユーザー定義関数は、PySparkでユーザーの任意のPythonコードを使用することができます。たとえば、データセットの各行に含まれる複雑なテキスト形式から情報を解析するためのUDFを使用することができます。

定義した後、UDFはconcatdate_difftrimなどのPySparkの組み込み関数と同様に動作します。

動機

直感的ではありませんが、通常の状況ではデータは実際にはPythonコードに取り込まれません。PySparkを使用してDataFramesを操作するとき、Sparkクラスタが分散並列処理で最終的なDataFrameを取得するための手順を記述しています。これにより、SparkとFoundryはほぼ無限にスケールすることができますが、実際のデータ上でコードを実行するためのUDFのマイナーセットアップが導入されます。PySparkは、ユーザーのクエリを実行している各サーバーにUDFコードを送信します。

UDFの使用を控えることを検討する

Pythonのオーバーヘッドは、Sparkの最適化された組み込み機能に比べてUDFを比較的遅くします。PySparkの組み込み機能でロジックを表現することを検討してください。

Copied!
1 # "天気予報:雨 55-62"

次の天気フォーマットから低温度を取得したいとします、この場合は 55 です。普通の Python 関数を以下のように書くことができます。

Copied!
1 2 3 4 5 def extract_low_temperature(weather_report): # 'weather_report'という文字列から最低気温を取り出す関数 # 文字列をスペースで分割し、最後の要素を取得する # その後、その要素を'-'で分割し、最初の要素(最低気温)を整数として返す return int(weather_report.split(' ')[-1].split('-')[0])

関数 extract_low_temperature を囲む UDF を作成し、それを PySpark クエリに統合します。UDF を作成するには、関数とその予想される戻り値の型を PySpark の型システムで提供する必要があります。

Copied!
1 2 3 4 5 # 必要な型をインポートする from pyspark.sql.types import IntegerType # 関数をUDF(ユーザー定義関数)としてラップする low_temp_udf = F.udf(extract_low_temperature, IntegerType())

これで、UDF を DataFrame に使用でき、引数として行全体を取ることができます。

Copied!
1 2 # 'weather_report'という名前の列を使用して、新しい'low'という名前の列を作成します。この新しい列は、'low_temp_udf'というユーザー定義関数を適用した結果となります。 df = df.withColumn('low', low_temp_udf(F.col('weather_report')))
idweather_reportlow
1天気予報: 雨 55-6255
2天気予報: 晴れ 69-7469
3天気予報: 曇り 31-3431

複数の列から読み込む

UDF は任意の列引数を取ることができます。列引数は関数引数に対応します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from pyspark.sql.types import StringType # 天候の良し悪しを判断する関数 def weather_quality(temperature, windy): # 気温が70度より高く、風がなければ良い天気 if temperature > 70 and windy == False: return "good" # それ以外の場合は悪い天気 else: return "bad" # UDF関数として登録 weather_udf = F.udf(weather_quality, StringType()) # データフレームに新しいカラム'quality'を追加し、UDF関数を適用 df = df.withColumn('quality', weather_udf(F.col('temp'), F.col('wind')))
idtempwindquality
173false良い
236false悪い
390true悪い