注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

はじめに

Tip

以下の手順では、簡単なPythonデータトランスフォームを説明します。データトランスフォームを始めたばかりの場合は、Pipeline Builder または Code Repositories 用のバッチパイプラインチュートリアルを先に進めることを検討してください。

このチュートリアルでは、最近発見された隕石のスプレッドシートを分析準備が整ったデータセットに変換する方法について説明します。

データセットについて

このチュートリアルでは、NASAのオープンデータポータル ↗ からのデータを使用します。このサンプルデータセットを使用して、ユーザーのCode Repositoryで手順を追うことができます。

Download meteorite_landings

このデータセットには、地球上で発見された隕石に関するデータが含まれています。データを扱いやすくするために、データはクリーニングされています。

データセットには、各隕石の名前、質量、分類、およびその他の識別情報、発見された年、および発見場所の座標が含まれています。データをFoundryにアップロードする前に、CSVを開いてデータを確認することをお勧めします。

Pythonコードリポジトリのセットアップ

Pythonコードリポジトリを作成して始めましょう。

  1. プロジェクトに移動し、+ New > Code repository を選択します。
  2. Repository type セクションで、Data Transforms を選択します。
  3. Language template として Python を選択します。
  4. Initialize repository を選択します。

ローカルPythonリポジトリの使用

または、以下の手順でローカルPythonリポジトリをCode Repositoriesにコピーすることもできます。

  1. 上記の手順に従って、新しいPythonコードリポジトリを作成します。
  2. ローカルリポジトリで、以前のGit originを削除します(たとえば、GitHubからクローンした場合): git remote remove origin
  3. コードリポジトリのGitリモートURLを追加します: git remote add origin <repository_url>

GitHubインターフェースの右上隅にコードリポジトリのURLが表示されます。緑色の Clone ボタンを選択し、GitリモートURLをコピーします。

これを確認するには、git remote -v を実行してコードリポジトリのURLを返します。

  1. 現在の master ブランチ(または選択した別のブランチ)をCode Repositoriesからローカルブランチにマージします: git merge master

refusing to merge unrelated histories というエラーが発生した場合は、次のコマンドを実行します: git merge master --allow-unrelated-histories。これにより、以前のリモートGitHubリポジトリに関連付けられた現在のGit履歴が削除されます。

このマージにより、Code Repositoriesでコミットや変更を行うために必要なファイルがローカルリポジトリに取り込まれます。

  1. 新しいブランチを作成し、名前を付けます(たとえば、testbranch): git checkout -b testbranch
  2. 変更を行い、ブランチにコミットします。
  3. git push を実行し、新しいブランチがCode Repositoriesインターフェースに表示されることを確認します。チェックが成功したことを確認します。

Code Repositoriesでのローカル開発についてさらに学びましょう。

Pythonデータトランスフォームの作成

Transforms Pythonリポジトリに移動します。デフォルトの examples.py ファイルには、開始するためのサンプルコードが含まれています。 まず、src/myproject/datasets に新しいファイルを作成し、それを meteor_analysis.py と命名して分析を整理します。必要な関数やクラスをインポートすることを忘れないでください。meteor_landings データセットを入力として受け取り、meteor_landings_cleaned を出力として作成するトランスフォームを定義します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F @transform_df( # 出力データセットのパスをここに置き換えてください Output("/Users/jsmith/meteorite_landings_cleaned"), # 入力データセットのパスをここに置き換えてください meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(meteorite_landings): <データ変換ロジックをここに記述してください>

たとえば、1950年以降に発生した「Valid」な隕石に入力データセットをフィルター処理する場合、データトランスフォーメーションロジックを更新して nametypeyear で隕石をフィルター処理します。

Copied!
1 2 3 4 5 6 def clean(meteorite_landings): return meteorite_landings.filter( meteorite_landings.nametype == 'Valid' # nametypeが'Valid'であるものをフィルタリング ).filter( meteorite_landings.year >= 1950 # 年が1950年以降のものをフィルタリング )

出力データセットを構築する

結果のデータセットを構築するには、変更をコミットし、右上隅の Build を選択します。Code Repositories でデータセットを構築する方法の詳細については、Create a simple batch pipeline チュートリアルを参照してください。

データ変換の追加

Pythonトランスフォーム を使用すると、1 つの Python ファイルで複数の出力データセットを作成できます。

たとえば、隕石のタイプに対して特に大きかった隕石のみをさらにフィルター処理したいとします。そのためには、以下の手順が必要です。

  1. 各隕石タイプの平均質量を見つける
  2. 各隕石の質量をその隕石タイプの平均質量と比較する

まず、meteor_analysis.py に各隕石タイプの平均質量を見つけるデータ変換を追加します。この変換は meteor_landings データセットを入力として取り、meteorite_stats を出力として作成します。

Copied!
1 2 3 4 5 6 7 8 9 @transform_df( # 出力データセット名は一意でなければなりません Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(meteorite_landings): return meteorite_landings.groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") # 各クラスごとの平均質量を計算 )

次に、各隕石の質量をその隕石タイプの平均質量と比較するデータトランスフォームを作成します。このトランスフォームに必要な情報は、これまでのチュートリアルで作成した meteorite_landings テーブルと meteorite_stats テーブルに分散しています。これら 2 つのデータセットを結合し、平均質量を超える質量を持つ隕石を見つけるためにフィルター処理します。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # このデータ変換は2つの入力データセットに基づいています @transform_df( Output("/Users/jsmith/meteorite_enriched"), # 出力データセットのパス meteorite_landings=Input("/Users/jsmith/meteorite_landings"), # 隕石着陸データセットの入力パス meteorite_stats=Input("/Users/jsmith/meteorite_stats") # 隕石統計データセットの入力パス ) def enriched(meteorite_landings, meteorite_stats): # 隕石着陸データセットと隕石統計データセットを"クラス"列で結合 enriched_together = meteorite_landings.join( meteorite_stats, "class" ) # 各行の質量がクラス平均質量を超えるかどうかを示す新しい列'greater_mass'を追加 greater_mass = enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) # 'greater_mass'列がTrueの行のみをフィルタリングして返す return greater_mass.filter("greater_mass")

Contour で meteorite_enriched データセットをさらに分析できます。

データトランスフォームを複数の入力に適用する

これまで、平均よりも大きな質量を持つすべての隕石タイプを含むデータセットを作成してきました。たとえば、各隕石タイプの個別のデータセットを作成したい場合、Transforms Python を使用して、for ループを用いて各隕石タイプに同じデータトランスフォームを適用できます。異なる入力に同じデータトランスフォームを適用する方法については、Transform generation セクションを参照してください。

src/myproject/datasets に新しいファイルを作成し、meteor_class.py と名付けてください。なお、meteor_analysis.py ファイルにデータトランスフォームコードを書き続けることもできますが、このチュートリアルではデータトランスフォームロジックを分離するために新しいファイルを使用します。

各隕石タイプの個別のデータセットを作成するために、meteorite_enriched データセットをクラスごとにフィルター処理します。分析したい各隕石タイプに同じデータトランスフォームロジックを適用する transform_generator 関数を定義します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from transforms.api import transform_df, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform_df( # 各隕石タイプに対して異なる出力データセットを作成します Output('/Users/jsmith/meteorite_{source}'.format(source=source)), my_input=Input('/Users/jsmith/meteorite_enriched') ) # "source=source" は、この関数のスコープ内で source 変数の値をキャプチャします def filter_by_source(my_input, source=source): return my_input.filter(my_input["class"] == source) transforms.append(filter_by_source) return transforms # 上記のデータ変換ロジックを提供された3つの隕石タイプに適用します TRANSFORMS = transform_generator(["L6", "H5", "H4"])

これは、クラスによって隕石データセットをフィルター処理するトランスフォーメーションを作成します。関数のスコープ内でsourceパラメーターを取得するために、filter_by_source 関数にsource=sourceを渡す必要があることに注意してください。

Tip

meteor_analysis.pyファイルで作成された初期データトランスフォーメーションでは、プロジェクトのPipelineにトランスフォームを追加するための追加の設定は不要でした。これは、デフォルトのPythonプロジェクト構造が自動登録を使用してdatasetsフォルダー内のすべてのトランスフォームオブジェクトを発見するためです。

自動登録を使用してこの最終的なトランスフォーメーションもプロジェクトのPipelineに追加するには、生成されたトランスフォームをリストとして変数に追加する必要があります。上記の例では、変数TRANSFORMSを使用しました。自動登録とトランスフォームジェネレーターについての詳細は、Transforms PythonドキュメントのTransforms generationセクションを参照してください。