注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
PySparkは、Apache Sparkバックエンドとのインターフェースを提供し、データを迅速に処理するためのラッパー言語です。Sparkは、分散サーバーネットワーク上の非常に大きなデータセットで動作することができ、正しく使用された場合には、パフォーマンスと信頼性の大きな利点を提供します。しかし、PySparkの構文はSparkのJVM遺産に基づいているため、経験豊富なPython開発者であっても挑戦的な面があります。なぜなら、それは慣れないコードパターンを実装するからです。
このPySparkコードスタイルに関する意見形成的なガイドは、私たちが経験したPySparkのリポジトリ全体で最も頻繁に再発するトピックに基づいたベストプラクティスを提示します。
一貫したコードスタイルを強制するために、各メインリポジトリにはPylintを有効にするべきです。同じ設定で。私たちは、このドキュメントに記載されているルールに一致するように、追加でPylintに含めることができるPySpark特有のチェッカーを提供しています。Pythonリポジトリ用の組み込みPylintプラグインに関する詳細は、スタイルチェックの有効化に関するドキュメンテーションをご覧ください。
PySpark特有の事項を超えて、クリーンなコードの一般的な実践は、PySparkリポジトリにおいて重要です。GoogleのPyGuideは良い出発点です。
Copied!1 2 3 4 5 6
# 悪い例 df = df.select(F.lower(df1.colA), F.upper(df2.colB)) # 良い例 # F.col()を使用して列を参照することで、コードの可読性を向上させています。 df = df.select(F.lower(F.col("colA")), F.upper(F.col("colB")))
優先すべきオプションはより複雑で、長く、汚染されているように見えるかもしれません - それは正しいです。実際、F.col()を全く使用しない方がよいです。しかし、それを使用する、あるいは明示的な選択をすることが避けられない状況もあります。しかし、最初の例よりも二つ目の例を優先する非常に良い理由があります。
最初のケースのように明示的な行を使用すると、データフレームの名前とスキーマが明示的にデータフレーム変数に結びつけられます。これは、df1
が削除されたり名前が変更されたりすると、参照df1.colA
が壊れることを意味します。
対照的に、F.col("colA")
は常に操作中のデータフレーム内の“colA”
という名前の行を参照します。この場合、その名前がdf
です。他のデータフレームの状態を全く追跡する必要がないため、コードはよりローカルになり、「距離を隔てた不気味な相互作用」に対して脆弱になります。これらは、デバッグが非常に難しいことがよくあります。
最初のケースを避けるための他の良い理由:
df1["colA"]
はF.col(“colA”)
と同じくらい書きにくくなります。F.col("prod_status") == 'Delivered'
のような抽象的な表現を変数に割り当てると、複数のデータフレームで再利用できます。一方、df.prod_status == 'Delivered'
は常にdfに束縛されます。幸いなことに、通常はF.col()
による複雑な表現は必要ありません。唯一の例外はF.lower
、F.upper
、... そしてこれらです。
一部のコンテキストでは、複数のデータフレームから行にアクセスでき、名前が重複する可能性があります。一般的な例は、df.join(df2, on=(df.key == df2.key), how='left'
のようなマッチング表現です。このような場合、行を直接データフレームで参照するのは問題ありません。また、データフレームのエイリアスを使用して結合を明確にすることもできます(このガイドの結合セクションで詳しく説明します)。
一般的に、forループはSparkで非効率的です。これは、Sparkが遅延評価され、一度に一つのforループしか処理しないためです。ループのすべての部分が一度に処理できる場合、実行時間が遅くなる可能性がありますし、ドライバーのメモリ不足エラー(OOM)が発生する可能性があります。データセットのすべての行の名前を大文字から小文字に変更するには、以下の最初の例(# bad
とラベル付けされています)の代わりに、リスト内包表記を使用することをお勧めします(二つ目の例(# good
とラベル付けされています)のように):
Copied!1 2 3 4
# よくない for colm in df.columns: # 列名を小文字に変更 df = df.withColumnRenamed(colm, colm.lower())
Copied!1 2 3 4 5
# 良い df = df.select( # df.columnsのすべての列名を小文字にして、新しいエイリアスとして選択します *[F.col(colm).alias(colm.lower()) for colm in df.columns] )
# good
という例のようにリスト内包表記を使用すると、上記で説明したパフォーマンスの低下やクエリプランの問題を回避しながら、同じ期待される結果を得ることができます。
論理演算は、通常 .filter()
や F.when()
の内部に存在し、読みやすくなければなりません。 我々は関数チェーンと同じルールを適用し、ロジック式は同じコードブロック内に 最大 3つの式 を保持します。それ以上になると、コードを単純化したり抽出したりできる兆候です。複雑な論理演算を変数や関数に抽出すると、コードの読みやすさと推論能力が向上し、バグも減少します。
Copied!1 2 3 4 5 6 7
# 悪い例 F.when( (df.prod_status == 'Delivered') | (((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & # 上のコードは、商品のステータスが「Delivered(配送済み)」であるか、または実際の配達日と現在の日付との差が0未満で、 # 現在の登録情報が正規表現「.+」に一致するか、または実際の配達日と現在の日付との差が0未満で、 # 元のオペレータが正規表現「.+」に一致するか現在のオペレータが正規表現「.+」に一致する場合に「In Service」を返す ((df.currentRegistration.rlike('.+')) | ((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')))))), 'In Service')
上記のコードは、いくつかの方法で単純化できます。まず、いくつかの名前付き変数でロジックステップをグループ化することに焦点を当てましょう。Pysparkでは、式は括弧でラップする必要があります。これは、実際の括弧とロジカル操作をグループ化することが可読性に悪影響を与えることがあります。たとえば、上記のコードには冗長な (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
がありますが、元の著者が非常に見つけにくいため気付かなかったものです。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# より良い # originalOperatorまたはcurrentOperatorに一文字以上の文字列が存在するかどうかを判断する has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')) # 実際の配送日が現在の日付よりも前であるかどうかを判断する delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0) # currentRegistrationに一文字以上の文字列が存在するかどうかを判断する has_registration = (df.currentRegistration.rlike('.+')) # 商品のステータスが「配送済み」であるかどうかを判断する is_delivered = (df.prod_status == 'Delivered') # 商品が配送済みであるか、または配送日が過ぎており、登録またはオペレーターが存在する場合は、「サービス中」と表示する F.when(is_delivered | (delivery_date_passed & (has_registration | has_operator)), 'In Service')
上記の例は、読みやすくなっており、冗長な表現も削除されています。さらに、操作の数を減らすことで、より改善することができます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# 良い # originalOperatorまたはcurrentOperatorが存在する(何らかの文字列が含まれる)場合にTrueを返す has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')) # 実際の配送日が現在の日付よりも過去である場合にTrueを返す delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0) # 現在の登録が存在する(何らかの文字列が含まれる)場合にTrueを返す has_registration = (df.currentRegistration.rlike('.+')) # 商品のステータスが「Delivered」(配送済み)である場合にTrueを返す is_delivered = (df.prod_status == 'Delivered') # currentRegistrationまたはoperatorのどちらかが存在する場合にTrueを返す is_active = (has_registration | has_operator) # 商品が配送済み、または配送日が過ぎて商品がアクティブ(登録またはオペレーターが存在する)である場合、'In Service'を返す F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')
F.when
式がすっきりと読みやすくなり、このコードの目的がレビューする人に明確に伝わるようになったことに注目してください。エラーがあると疑われる場合にのみ、読者は個々の式を訪れる必要があります。また、ユニットテストがコードに含まれていて、それらを関数として抽象化したい場合、各ロジックのチャンクをテストしやすくします。
最終的な例にはまだコードの重複が存在します:その重複を削除する方法は読者に演習として残されています。
select
文を使用するPySpark 変換の始まり、または返す前に select を行うことは、良い実践と考えられています。この select
文は、入力と出力のための期待されるデータフレームスキーマに関するコードと読者の両方との契約を指定します。
どんな select も、次の変換ステップでデータフレームを消費する準備としてのクリーニング操作と見なされるべきです。
常に select 文を可能な限りシンプルに保つことを目指してください。一般的な SQL の慣用表現により、選択された行ごとに spark.sql.function
から 1つ の関数が使用され、オプションで .alias()
を使ってそれに意味のある名前を付けることが許可されます。これは控えめに使用するべきであり、同じ select で 3つ 以上のそのような使用がある場合は、操作をカプセル化するためにそれを clean_<dataframe name>()
という別の関数にリファクタリングします。
データフレームを超えた表現、または .when()
のような条件操作が select で使用されることは 絶対に 許可されません。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# 悪い例 aircraft = aircraft.select( 'aircraft_id', # 航空機のID 'aircraft_msn', # 航空機の製造番号 F.col('aircraft_registration').alias('registration'), # 航空機の登録番号(エイリアスとして'registration'を使用) 'aircraft_type', # 航空機のタイプ F.avg('staleness').alias('avg_staleness'), # stalenessの平均値(エイリアスとして'avg_staleness'を使用) F.col('number_of_economy_seats').cast('long'), # エコノミーシートの数をlong型にキャスト F.avg('flight_hours').alias('avg_flight_hours'), # 飛行時間の平均値(エイリアスとして'avg_flight_hours'を使用) 'operator_code', # 運航者のコード F.col('number_of_business_seats').cast('long'), # ビジネスシートの数をlong型にキャスト )
同じタイプの操作をまとめてください。すべての個々の行は前もってリストアップされるべきで、spark.sql.function
からの関数呼び出しは別々の行に記述するべきです。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
# 良い aircraft = aircraft.select( 'aircraft_id', # 航空機ID 'aircraft_msn', # 航空機msn 'aircraft_type', # 航空機のタイプ 'operator_code', # 運行者コード F.col('aircraft_registration').alias('registration'), # 航空機登録をエイリアスとして「登録」に変更 F.col('number_of_economy_seats').cast('long'), # 経済クラスの席数を長い型にキャスト F.col('number_of_business_seats').cast('long'), # ビジネスクラスの席数を長い型にキャスト F.avg('staleness').alias('avg_staleness'), # stalenessの平均値をエイリアスとして「avg_staleness」に変更 F.avg('flight_hours').alias('avg_flight_hours'), # 飛行時間の平均値をエイリアスとして「avg_flight_hours」に変更 )
select()
ステートメントは、その性質上、データフレームのスキーマを再定義するため、古いものも新しいものも含む行の含有または除外を自然にサポートし、既存の行の再定義も可能です。このようなすべての操作を一つのステートメントに集約することで、最終的なスキーマを特定しやすくなり、コードの読みやすさが向上します。また、コードがわずかに簡潔になります。
withColumnRenamed()
を呼び出す代わりに、エイリアスを使用してください:
Copied!1 2 3 4 5
# 悪い例 df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') # 'comments'という列名を'num_comments'に変更します。 # 良い例 df.select('key', F.col('comments').alias('num_comments')) # F.colを使って、'comments'という列名を'num_comments'という別名で選択します。
タイプを再定義するために withColumn()
を使用する代わりに、セレクト内でキャストします:
Copied!1 2 3 4 5
# 悪い例 df.select('comments').withColumn('comments', F.col('comments').cast('double')) # 良い例 df.select(F.col('comments').cast('double'))
Copied!1 2 3 4 5 6 7
# 悪い例 df.select('comments').withColumn('comments', F.col('comments').cast('double')) # コメント列を選択し、その列をdouble型に変換しています。 # 良い例 df.select(F.col('comments').cast('double')) # コメント列を直接double型に変換して選択しています。
しかし、シンプルに保つことが大切です:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# 悪い例 df.select( # 'closed_at'のunixタイムスタンプを取得し、そこから'created_at'のunixタイムスタンプを引きます。 # 'closed_at'がnullの場合は、現在のunixタイムスタンプを使用します。 # 結果の差を86400(1日の秒数)で割ります。これにより、'created_at'から'closed_at'までの日数が算出されます。 ((F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400).alias('days_open') ) # 良い例 df.withColumn( 'days_open', # 'closed_at'のunixタイムスタンプを取得し、そこから'created_at'のunixタイムスタンプを引きます。 # 'closed_at'がnullの場合は、現在のunixタイムスタンプを使用します。 # 結果の差を86400(1日の秒数)で割ります。これにより、'created_at'から'closed_at'までの日数が算出されます。 (F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400 )
select ステートメントに含まれるカラムが未使用のままである場合、それらを含めるのを避け、代わりに明示的なカラムのセットを選択することを推奨します - これはスキーマの変更が予期しないカラムがデータフレームを肥大化させることを保証するため、.drop()
を使用するよりも好ましい代替手段です。それにもかかわらず、カラムをドロップすることが全てのケースで潜在的に非推奨というわけではありません。例えば、ジョインが冗長なカラムを導入することが一般的であるため、ジョイン後にカラムをドロップすることはよくあります。
最後に、select ステートメントを用いて新たなカラムを追加するのではなく、.withColumn()
を使用することを推奨します。
スキーマを満たすために空の行を追加する必要がある場合は、常にその行を満たすために F.lit(None)
を使用してください。空の文字列や空の値を示す他の文字列(NA
など)を使用しないでください。
これが意味論的に正しいだけでなく、実用的な理由としては、空の文字列や null、'NA'
などを確認する必要がなく、isNull
のようなユーティリティを使用できるという点が挙げられます。
Copied!1 2 3 4 5 6 7 8
# 悪い例 df = df.withColumn('foo', F.lit('')) # 悪い例 df = df.withColumn('foo', F.lit('NA')) # 良い例 - `None`は型が無いため、適切な型を選択してキャストする必要があります。使用予定に基づいて適切な型を選択します。 df = df.withColumn('foo', F.lit(None).cast('string'))
コメントはコードに有用な洞察を提供することができますが、その可読性を向上させるためにコードをリファクタリングする方がより価値があります。コードはそれ自体で読み取り可能であるべきです。ステップバイステップでロジックを説明するためにコメントを使用している場合、それをリファクタリングすべきです。
Copied!1 2 3 4 5 6 7 8
# よくない # タイムスタンプの列をキャストする cols = ["start_date", "delivery_date"] for c in cols: # from_unixtime関数を使用して、ミリ秒単位のUNIX時間を秒単位に変換し、 # その後、TimestampTypeにキャストしています。 df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
上記の例では、それらの行がタイムスタンプにキャストされていることがわかります。コメントはあまり価値がありません。さらに、コードに既に存在する情報しか提供しない場合、より詳細なコメントが役に立たないかもしれません。例えば:
Copied!1 2 3 4 5 6 7
# 良くない例 # 各列を通過し、ミリ秒のために1000を除去し、タイムスタンプにキャストします cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType())) # 各列に対して、1000で割り(ミリ秒を秒に変換)、UNIX時間を日付に変換し、それをタイムスタンプ型にキャストします
コードのロジックを説明するだけのコメントではなく、コードを書く際に行った意思決定の「なぜ」を説明する文脈を提供するコメントを残すことを目指すべきです。これは、特に PySpark において重要であり、読者はユーザーのコードを理解できますが、多くの場合、PySpark 変換にデータが供給される文脈を持っていません。ロジックの小さな部分には、正しい動作を理解するためにデータを調べるのに何時間もかかることがあり、その場合、その理由を説明するコメントが特に価値があるでしょう。
Copied!1 2 3 4 5 6 7 8 9
# 良い # このデータセットの利用者は、日付ではなくタイムスタンプが必要であり、 # 元のデータソースがミリ秒単位でこれらを格納しているため、 # 時刻を1000で調整する必要があります。 # ただし、ドキュメントには実際には日付であると記載されています。 cols = ["start_date", "delivery_date"] for c in cols: df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))
UDFは、ネイティブのPySparkと比べてパフォーマンスが大幅に低いため、すべての状況での使用を強く推奨していません。ほとんどの状況では、UDFが必要と思われるロジックは、実際にはネイティブのPySpark関数のみを使用してリファクタリングすることができます。
データをSparkドライバに集める関数の使用は常に避けてください。例えば:
DataFrame.collect()
DataFrame.first()
DataFrame.head(...)
DataFrame.take(...)
DataFrame.show(...)
これらの関数を使用すると、Sparkのような分散フレームワークの利点が失われ、パフォーマンスが低下したりメモリ不足のエラーが発生したりします。代わりに、以下を強く推奨します:
Joinには注意が必要です。左側のjoinを行い、右側にキーに対する複数のマッチがある場合、その行はマッチした回数だけ重複します。これは "join explosion" と呼ばれ、データセットのサイズを大幅に膨らませる可能性があります。常に仮定を二重チェックし、joinしているキーがユニークであることを確認してください。除非、その乗算を予期しています。
不適切なjoinは、デバッグが難しい多くの問題の原因です。how
を明示的に指定するなど、いくつかの対策が助けになります。たとえそれがデフォルト値(inner
)であっても:
Copied!1 2 3 4 5 6 7 8
# 悪い例 flights = flights.join(aircraft, 'aircraft_id') # 'aircraft_id'をキーにしてflightsとaircraftを結合します。結合方法は指定されていません。 # また悪い例 flights = flights.join(aircraft, 'aircraft_id', 'inner') # 'aircraft_id'をキーにしてflightsとaircraftを内部結合します。しかし、結合方法を明示的に示していません。 # 良い例 flights = flights.join(aircraft, 'aircraft_id', how='inner') # 'aircraft_id'をキーにしてflightsとaircraftを内部結合します。結合方法が明示的に指定されています。
また、right
結合も避けてください。right
結合を使用しようとする場合は、データフレームの順序を入れ替えて、代わりにleft
結合を使用してください。これは、操作を行っているデータフレームが結合の中心にあるため、直感的に理解しやすいです。
Copied!1 2 3 4 5
# 悪い例 flights = aircraft.join(flights, 'aircraft_id', how='right') # 航空機データとフライトデータをaircraft_idで結合(右外部結合) # 良い例 flights = flights.join(aircraft, 'aircraft_id', how='left') # フライトデータと航空機データをaircraft_idで結合(左外部結合)
データフレームを結合する際、出力で行を複製する表現の使用を避けてください:
Copied!1 2 3 4 5
# 悪い例 - aircraft_id カラムが出力結果で重複してしまう output = flights.join(aircraft, flights.aircraft_id == aircraft.aircraft_id, how='inner') # 良い例 output = flights.join(aircraft, 'aircraft_id', how='inner')
すべての列の名前を変更して衝突を避けるのではなく、データフレーム全体に別名を付けて、最終的にどの列を選択したいかをその別名を使用して選択できます。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# 悪い例 columns = ["start_time", "end_time", "idle_time", "total_time"] for col in columns: flights = flights.withColumnRenamed(col, 'flights_' + col) # flightsの各列をリネームします parking = parking.withColumnRenamed(col, 'parking_' + col) # parkingの各列をリネームします flights = flights.join(parking, on="flight_code", how="left") # flightsとparkingをflight_codeで結合します flights = flights.select( F.col("flights_start_time").alias("flight_start_time"), # 列名を変更して選択します F.col("flights_end_time").alias("flight_end_time"), # 列名を変更して選択します F.col("parking_total_time").alias("client_parking_total_time") # 列名を変更して選択します ) # 良い例 flights = flights.alias("flights") # flightsのエイリアスを設定します parking = parking.alias("parking") # parkingのエイリアスを設定します flights = flights.join(parking, on="flight_code", how="left") # flightsとparkingをflight_codeで結合します flights = flights.select( F.col("flights.start_time").alias("flight_start_time"), # 列名を変更して選択します F.col("flights.end_time").alias("flight_end_time"), # 列名を変更して選択します F.col("parking.total_time").alias("client_parking_total_time") # 列名を変更して選択します )
ただし、以下のことを念頭に置いてください:
結合についての最後の言葉として、.dropDuplicates()
や .distinct()
を杖にしないでください。予期しない重複行が観察された場合、その重複行が表示される背後にはほとんど常に理由があります。 .dropDuplicates()
を追加するだけで、この問題を隠蔽し、ランタイムにオーバーヘッドを追加します。
式のチェーン化は議論の余地がありますが、チェーン化の使用については一部の制限を推奨します。この推奨の背後にある理由については、このセクションの 結論 を参照してください。
異なる型の多行式への式のチェーン化を避けてください。特に、それらが異なる動作やコンテキストを持つ場合です。例えば、行の作成や結合を選択やフィルター処理することと混合することなどです。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# 悪い例 df = ( df .select("a", "b", "c", "key") # "a", "b", "c", "key"を選択 .filter(df.a == "truthiness") # "a"列が"truthiness"の行だけを抽出 .withColumn("boverc", df.b / df.c) # 新しい列"boverc"を作成、"b"列を"c"列で割った結果を格納 .join(df2, "key", how="inner") # "key"でdf2と内部結合 .join(df3, "key", how="left") # "key"でdf3と左結合 .drop('c') # "c"列を削除 ) # 良い例 (ステップを分ける) # 最初に:必要なデータを選択し、絞り込む # その次に:必要な列を作成する # 最後に:他のデータフレームと結合する df = ( df .select("a", "b", "c", "key") # "a", "b", "c", "key"を選択 .filter(df.a == "truthiness") # "a"列が"truthiness"の行だけを抽出 ) df = df.withColumn("boverc", df.b / df.c) # 新しい列"boverc"を作成、"b"列を"c"列で割った結果を格納 df = ( df .join(df2, "key", how="inner") # "key"でdf2と内部結合 .join(df3, "key", how="left") # "key"でdf3と左結合 .drop('c') # "c"列を削除 )
各表現のグループをそれぞれの論理的なコードブロックに隔てることで、可読性が向上し、関連するロジックを見つけやすくなります。
例えば、以下のコードを読むユーザーは、データフレームが割り当てられている場所 df = df...
へと目を遷移させるでしょう。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# 悪い例 df = ( df .select('foo', 'bar', 'foobar', 'abc') .filter(df.abc == 123) # abc列が123のものをフィルタリング .join(another_table, 'some_field') # some_fieldをキーにして別のテーブルと結合 ) # より良い例 df = ( df .select('foo', 'bar', 'foobar', 'abc') .filter(F.col('abc') == 123) # F.colを使ってabc列が123のものをフィルタリング ) # 結合処理は別に行う df = df.join(another_table, 'some_field', how='inner') # some_fieldをキーにして別のテーブルと内部結合
式を連鎖させる正当な理由があります。これらは通常、原子的な論理ステップを表し、許容されます。コードを読みやすくするために、同じブロック内で最大数の連鎖式を適用するルールを適用してください。私たちは3-5のステートメントの連鎖を推奨します。
もし、より長い連鎖を作っていると感じる場合、または変数のサイズのせいで問題が起きている場合は、ロジックを別の関数に抽出することを検討してください。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# 良くない customers_with_shipping_address = ( customers_with_shipping_address .select("a", "b", "c", "key") # "a", "b", "c", "key"を選択 .filter(F.col("a") == "truthiness") # "a"の列が"truthiness"と一致する行をフィルタリング .withColumn("boverc", F.col("b") / F.col("c")) # 新しい列"boverc"を作成、"b"の列を"c"の列で割った値を設定 .join(df2, "key", how="inner") # "key"でdf2を内部結合 ) # これも良くない customers_with_shipping_address = customers_with_shipping_address.select("a", "b", "c", "key") # "a", "b", "c", "key"を選択 customers_with_shipping_address = customers_with_shipping_address.filter(df.a == "truthiness") # "a"の列が"truthiness"と一致する行をフィルタリング customers_with_shipping_address = customers_with_shipping_address.withColumn("boverc", F.col("b") / F.col("c")) # 新しい列"boverc"を作成、"b"の列を"c"の列で割った値を設定 customers_with_shipping_address = customers_with_shipping_address.join(df2, "key", how="inner") # "key"でdf2を内部結合 # これが良い def join_customers_with_shipping_address(customers, df_to_join): # customersとdf_to_joinを結合する関数 customers = ( customers .select("a", "b", "c", "key") # "a", "b", "c", "key"を選択 .filter(df.a == "truthiness") # "a"の列が"truthiness"と一致する行をフィルタリング ) customers = customers.withColumn("boverc", F.col("b") / F.col("c")) # 新しい列"boverc"を作成、"b"の列を"c"の列で割った値を設定 customers = customers.join(df_to_join, "key", how="inner") # "key"でdf2を内部結合 return customers # 結果を返す
実際には、3つ以上のステートメントを連鎖させることは、すでにカプセル化され、孤立したロジックブロックがあるため、別々の適切に名前付けられた関数に分解するのに適した候補です。
これらの連鎖に制限を設ける背後にはいくつかの理由があります。
式を連鎖させることができるのは、PySpark が JVM 言語から派生した Spark から開発されたためです。これにより、チェイン可能性という設計パターンが輸送されました。
ただし、Python は複数行の式をうまくサポートしておらず、明示的な改行を提供するか、式を括弧で囲む以外の代替手段がありません。連鎖がルートノードで発生する場合にのみ、明示的な改行を提供する必要があります。例えば:
Copied!1 2 3 4 5 6 7 8 9
# `\`が必要 df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # ルートノードにチェーンがないので、`\`は必要ありません df = df.withColumn('safety', F.when(F.col('has_tests') == True, 'is safe') .when(F.col('has_executed') == True, 'no tests but runs') .otherwise('not safe'))
これにより、一貫性を保つために、全体の式を1つの括弧ブロックにラップし、\
を使用しないようにしてください。
Copied!1 2 3 4 5 6 7 8 9 10 11
# 悪い例 df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # 良い例 df = ( df .filter(F.col('event') == 'executing') # イベントが"executing"のものをフィルター .filter(F.col('has_tests') == True) # テストがあるものをフィルター .drop('has_tests') # 'has_tests'列を削除 )
.otherwise(value)
を一般的なフォールバックとして使用しないでください。キーのリストを値のリストにマッピングしていて、未知のキーがいくつか現れる場合、otherwise
を使用すると、これら全てを一つの値にマスクしてしまいます。types
と functions
があります。from pyspark.sql import types as T, functions as F
。