注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
PySpark は、Apache Spark バックエンドと連携してデータを迅速に処理できるラッパー言語です。Spark は、分散ネットワーク上の複数のサーバーにまたがる非常に大規模なデータセットを操作できるため、適切に使用すると、パフォーマンスと信頼性の面で大きな利点が得られます。ただし、SQL などのリレーショナルデータベースシステムに慣れている場合、いくつかの制限があることに注意してください。たとえば、Spark では、どの行がどのサーバーに存在するかを正確に知ることはできないため、特定の行を直接選択して更新または削除することはできません。データベースをこのように考えることに慣れている場合は、データセット全体を考慮し、行ではなく列に基づいてデータを処理するという概念モデルに調整する必要があります。
SQL とは異なり、クエリが「ビュー」(仮想テーブルの結果セット)を生成するのに対し、PySpark を使用してデータセットを処理すると、まったく新しいデータセットが生成されます。これにより、派生したデータセットに基づいて新しいデータセットを構築するだけでなく、組織の他のメンバーも中間データセットを再利用して独自のデータ処理タスクを実行できます。Palantir Foundry では、データオペレーティングシステムが自動的に親子(または、ソース結果)の向き付けられたツリー関係を介してデータセットをリンクします。これにより、Spark 変換のデータラインを容易に追跡できます。言い換えれば、データセットの依存関係がどのように構築され、それらのデータセットがどこから来るのかを調査できます。また、組織の他のメンバーがデータセットをどのように使用しているかを調査し、例から学んだり、重複する作業を効果的に削減したりできます。
Code Workbook では、関数は次のようになることがあります。
Copied!1 2 3 4
def new_frame(old_frame): df = old_frame # dfに対する変換処理を行います return df
old_frame
:Foundry 内に保存されている データセット を表す データフレーム を参照しています。old_frame
は イミュータブル であり、この new_frame
関数内で変更することはできません。ある意味で、変換のすべての中間ステップは新しいイミュータブルなデータフレームを生成し、それを再び変換するか、そのまま返すことができます。これは完全には正確ではありませんが、認知モデルとして、コードをより整理するのに役立ちます。new_frame
:この関数内で、old_frame
に適用したい一連の変換を定義できます。return
ステートメントは、データフレーム(この例では df
と呼んでいます)を返す必要があります。内部的には、そのデータフレームに適用したすべての変換が結合され、最適化され、入力データセットに対して適用されます。コードでビルドをトリガーすると、結果は Foundry の新しいデータセットファイルに保存され、ビルドが完了すると探索できます。データフレーム内のデータは、配列
や 辞書
ではないため、直接参照することはできません。実際には、内部で行われているパーティション分割やシャッフルのため、データがどこにあるかを特定することはほぼ不可能です。データセットをフィルタリングまたは集計していない限り、ユーザーが記述するコードはデータセットの内容に対して比較的 アグノスティック である必要があります。ソートは一般的にコストがかかり遅いため、すべての行がランダムに並んでいると仮定し、ツールセットを行、フィルター、集計、そしてユーザー独自の創造的な問題解決に制約するのが良い方法です。
PySpark は型安全ではなく、すべての変換操作を評価しようとし、実行時に操作が失敗すると中断します。そのため、入力される行のスキーマをしっかりと把握しておくことが非常に重要です。
文字列や日付に対して数学関数を実行したり、数値に対して文字列操作を実行したり、整数に対して日付操作を実行したりしないでください。これは、型の衝突による動作が予測しにくいためです。
操作を行う前に、値を適切な型にキャストしてください。
データフレームの各列には名前が付けられており(名前を変更することもできます)、列名は一意で大文字と小文字が区別されます。Foundry データセットでは、以下のガイドラインに従ってください。
_
(アンダースコア)で区切り、スペースは使用しないでください(スペースは許可されていません)。camelCasedColumnNames
を避けてください。(
, )
, &
などの特殊文字は使用しないでください。既存のコードに飛び込むと、データフレームを参照している変数の名前付けに関して厳密なルールはありません。このチートシートでは、データフレームは df
として参照されますが、他の例では raw
、out
、input
、table
、something_specific
などが使われることがあります。何でも構いません。目的を達成するためのものであれば。
また、このようなパターンに気付くでしょう。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
# DataFrameを "firstName" と "age" の列のみを選択するように更新します df = df.select("firstName", "age") # "age" 列のデータタイプを integer に変換します df = df.withColumn("age", df.age.cast("integer")) # "age" が21より大きいデータのみをフィルターします df = df.filter(df.age > 21) # "firstName" の列名を "first_name" に変更します df = df.withColumnRenamed("firstName", "first_name") # 更新された DataFrame を返します return df
また(同じことを異なる方法で書いた場合):
Copied!1 2 3 4 5 6 7 8
# "firstName" と "age" の列を選択 return df.select("firstName", "age") \ # "age" 列の型を整数にキャスト(変換) .withColumn("age", df.age.cast("integer")) \ # 年齢が21以上のデータのみをフィルタリング .filter(df.age > 21) \ # "firstName" 列の名前を "first_name" に変更 .withColumnRenamed("firstName", "first_name")
コードに慣れていない場合:=
の左側のdf
は、右側のdf
に適用された変換の結果が保存される場所で、次のコード行に移る前に結果が保存されます。この例では、結果を同じ名前の変数に保存し、各ステップ後のdf
の内容を基本的に上書きします。DataFrame の変換結果を保持するために異なる名前を使用することもできますが、ほとんどの場合、変数名を上書きして進むことは問題ありません。各変換関数の最後には、新しい dataframe を変数として(最初の例)または最後の変換の結果として(2 番目の例)返す必要があります。
両方の例は同じことを達成します:
age
行をキャストして、それが文字列ではなく整数であることを確認します。age > 21
のエントリーのみを含めますfirstName
行の名前を first_name
に変更します結果のデータセットには first_name
、age
の 2 行しか含まれず、21歳以下の人は除外されます。それが最後に df
が含む内容で、それを return
するか、それにさらに変換を適用することができます。これらの変換については、次のセクションで詳しく説明します。
Foundry で PySpark を書くためのツールは 2 つあります:Code Repositories と Code Workbook。
Code Repositories では、ほとんどの関数を使用するためには、.py
ドキュメントの最初に以下のインポート文を宣言する必要があります:
Copied!1 2
# pyspark.sqlからfunctionsをFという名前でインポート from pyspark.sql import functions as F
Code Workbookでは、すでに含まれているグローバルなインポートであるため、追加の設定なしでほとんどの関数を使用できます。
この参考資料は完全なものではなく、一般的なパターンとベストプラクティスについてのガイダンスを提供することに焦点を当てています。 pySpark SQL関数の完全なリストについては、公式の Apache Spark documentation を参照してください。