データ統合PythonPySpark 一覧概要

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

概要

PySpark は、Apache Spark バックエンドと連携してデータを迅速に処理できるラッパー言語です。Spark は、分散ネットワーク上の複数のサーバーにまたがる非常に大規模なデータセットを操作できるため、適切に使用すると、パフォーマンスと信頼性の面で大きな利点が得られます。ただし、SQL などのリレーショナルデータベースシステムに慣れている場合、いくつかの制限があることに注意してください。たとえば、Spark では、どの行がどのサーバーに存在するかを正確に知ることはできないため、特定の行を直接選択して更新または削除することはできません。データベースをこのように考えることに慣れている場合は、データセット全体を考慮し、行ではなく列に基づいてデータを処理するという概念モデルに調整する必要があります。

  • DataFrame: 名前付きの列の下にある行のコレクション
    • 構造的には SQL データベースに似ていますが、リレーショナルではありません
    • イミュータブル: DataFrame は作成後に変更することはできませんが、新しい DataFrame に変換することができます(結果として2つの DataFrame が得られます:元のものと変換されたもの)。データセットは上書きされますが、Foundry はバージョン履歴を追跡しているため、いつでも古いビルドを探索して戻ることができます。
    • 遅延評価: 一連の変換タスクは、単一の(組み合わせた)アクションとして評価され、ビルドがトリガーされたときに実行されます。
  • Resilient Distributed Datasets: (RDD) は、DataFrame の基本的なデータ構造です。 DataFrame を複数の非交差サブセットに分割することで、変換はクラスター(ネットワーク)内の複数のコンピューター(ノード)で並行して評価できます。これはすべて内部で行われますが、PySpark で記述するときには考慮に入れておくことが重要です。
  • 共有変数: デフォルトでは、Spark は効率のために、変換タスクで使用される各変数の別々に管理されたコピーを並列コンピューター(ノード)に送信します。タスク間で変数を共有する必要がある場合は、Spark は 2 種類の共有変数をサポートしています。
    1. ブロードキャスト変数: クラスター内のすべてのコンピューター(ノード)にブロードキャストされるメモリ(RAM)にキャッシュ(保存)される値
    2. アキュムレータ: 追加または集約できる変数で、カウンターや合計などが含まれます。この概念は GroupedData に関連し、統計計算に役立ちます。
  • なぜ DataFrame を使用するのか?: Spark の DataFrame は、大規模なコレクション(ペタバイト+)の構造化データを処理するように設計および最適化されています。
  • なぜ PySpark コードを書く必要があるのか?: PySpark を使用すると、Code Repository や Code Workbook でデータセットをどのように変換するかをカスタマイズできます。これにより、Contour や Blacksmith だけでは実現できないより複雑で柔軟な方法でデータセットを変換できます。
  • PySpark の目的は何ではありませんか?: PySpark はデータセットを変換するために設計されていますが、個々の値にアクセスするためではありません。合計や平均を計算することはできますが、データを直接参照することはできませんし、すべきではありません。

SQL とは異なり、クエリが「ビュー」(仮想テーブルの結果セット)を生成するのに対し、PySpark を使用してデータセットを処理すると、まったく新しいデータセットが生成されます。これにより、派生したデータセットに基づいて新しいデータセットを構築するだけでなく、組織の他のメンバーも中間データセットを再利用して独自のデータ処理タスクを実行できます。Palantir Foundry では、データオペレーティングシステムが自動的に親子(または、ソース結果)の向き付けられたツリー関係を介してデータセットをリンクします。これにより、Spark 変換のデータラインを容易に追跡できます。言い換えれば、データセットの依存関係がどのように構築され、それらのデータセットがどこから来るのかを調査できます。また、組織の他のメンバーがデータセットをどのように使用しているかを調査し、例から学んだり、重複する作業を効果的に削減したりできます。

PySpark コードの理解

スターターコードの基本

Code Workbook では、関数は次のようになることがあります。

Copied!
1 2 3 4 def new_frame(old_frame): df = old_frame # dfに対する変換処理を行います return df
  • old_frame:Foundry 内に保存されている データセット を表す データフレーム を参照しています。old_frameイミュータブル であり、この new_frame 関数内で変更することはできません。ある意味で、変換のすべての中間ステップは新しいイミュータブルなデータフレームを生成し、それを再び変換するか、そのまま返すことができます。これは完全には正確ではありませんが、認知モデルとして、コードをより整理するのに役立ちます。
  • new_frame:この関数内で、old_frame に適用したい一連の変換を定義できます。return ステートメントは、データフレーム(この例では df と呼んでいます)を返す必要があります。内部的には、そのデータフレームに適用したすべての変換が結合され、最適化され、入力データセットに対して適用されます。コードでビルドをトリガーすると、結果は Foundry の新しいデータセットファイルに保存され、ビルドが完了すると探索できます。

データフレーム内のデータは、配列辞書ではないため、直接参照することはできません。実際には、内部で行われているパーティション分割やシャッフルのため、データがどこにあるかを特定することはほぼ不可能です。データセットをフィルタリングまたは集計していない限り、ユーザーが記述するコードはデータセットの内容に対して比較的 アグノスティック である必要があります。ソートは一般的にコストがかかり遅いため、すべての行がランダムに並んでいると仮定し、ツールセットを行、フィルター、集計、そしてユーザー独自の創造的な問題解決に制約するのが良い方法です。

警告

PySpark は型安全ではなく、すべての変換操作を評価しようとし、実行時に操作が失敗すると中断します。そのため、入力される行のスキーマをしっかりと把握しておくことが非常に重要です。

文字列や日付に対して数学関数を実行したり、数値に対して文字列操作を実行したり、整数に対して日付操作を実行したりしないでください。これは、型の衝突による動作が予測しにくいためです。

操作を行う前に、値を適切な型にキャストしてください。

名前付き列

データフレームの各列には名前が付けられており(名前を変更することもできます)、列名は一意で大文字と小文字が区別されます。Foundry データセットでは、以下のガイドラインに従ってください。

  • 常に小文字のアルファベットまたは数字を使用してください。
  • 単語間は _(アンダースコア)で区切り、スペースは使用しないでください(スペースは許可されていません)。
  • 慣習により、camelCasedColumnNames を避けてください。
  • ( , ) , & などの特殊文字は使用しないでください。
  • 集計関数は、場合によっては特殊文字を含む列名を自動的に付けます。変換でデータフレームを返す前に、別名を付けたり、列名を変更したりする必要があります。

変換のチェーン化

既存のコードに飛び込むと、データフレームを参照している変数の名前付けに関して厳密なルールはありません。このチートシートでは、データフレームは df として参照されますが、他の例では rawoutinputtablesomething_specific などが使われることがあります。何でも構いません。目的を達成するためのものであれば。

また、このようなパターンに気付くでしょう。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # DataFrameを "firstName" と "age" の列のみを選択するように更新します df = df.select("firstName", "age") # "age" 列のデータタイプを integer に変換します df = df.withColumn("age", df.age.cast("integer")) # "age" が21より大きいデータのみをフィルターします df = df.filter(df.age > 21) # "firstName" の列名を "first_name" に変更します df = df.withColumnRenamed("firstName", "first_name") # 更新された DataFrame を返します return df

また(同じことを異なる方法で書いた場合):

Copied!
1 2 3 4 5 6 7 8 # "firstName" と "age" の列を選択 return df.select("firstName", "age") \ # "age" 列の型を整数にキャスト(変換) .withColumn("age", df.age.cast("integer")) \ # 年齢が21以上のデータのみをフィルタリング .filter(df.age > 21) \ # "firstName" 列の名前を "first_name" に変更 .withColumnRenamed("firstName", "first_name")

コードに慣れていない場合:=の左側のdfは、右側のdf適用された変換の結果が保存される場所で、次のコード行に移る前に結果が保存されます。この例では、結果を同じ名前の変数に保存し、各ステップ後のdfの内容を基本的に上書きします。DataFrame の変換結果を保持するために異なる名前を使用することもできますが、ほとんどの場合、変数名を上書きして進むことは問題ありません。各変換関数の最後には、新しい dataframe を変数として(最初の例)または最後の変換の結果として(2 番目の例)返す必要があります。

両方の例は同じことを達成します:

  1. 変換されたデータセットに含めるために df の 2 行だけを選択します
  2. age 行をキャストして、それが文字列ではなく整数であることを確認します。
  3. データセットの行をフィルター処理して、age > 21のエントリーのみを含めます
  4. firstName 行の名前を first_name に変更します

結果のデータセットには first_nameage の 2 行しか含まれず、21歳以下の人は除外されます。それが最後に df が含む内容で、それを return するか、それにさらに変換を適用することができます。これらの変換については、次のセクションで詳しく説明します。

Foundry での PySpark の書き方

Foundry で PySpark を書くためのツールは 2 つあります:Code RepositoriesCode Workbook

Code Repositories では、ほとんどの関数を使用するためには、.py ドキュメントの最初に以下のインポート文を宣言する必要があります:

Copied!
1 2 # pyspark.sqlからfunctionsをFという名前でインポート from pyspark.sql import functions as F

Code Workbookでは、すでに含まれているグローバルなインポートであるため、追加の設定なしでほとんどの関数を使用できます。

この参考資料は完全なものではなく、一般的なパターンとベストプラクティスについてのガイダンスを提供することに焦点を当てています。 pySpark SQL関数の完全なリストについては、公式の Apache Spark documentation を参照してください。