注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
以下の手順では、簡単なPythonデータトランスフォームを説明します。データトランスフォームを始めたばかりの場合は、Pipeline Builder または Code Repositories 用のバッチパイプラインチュートリアルを先に進めることを検討してください。
このチュートリアルでは、最近発見された隕石のスプレッドシートを分析準備が整ったデータセットに変換する方法について説明します。
このチュートリアルでは、NASAのオープンデータポータル ↗ からのデータを使用します。このサンプルデータセットを使用して、ユーザーのCode Repositoryで手順を追うことができます。
このデータセットには、地球上で発見された隕石に関するデータが含まれています。データを扱いやすくするために、データはクリーニングされています。
データセットには、各隕石の名前、質量、分類、およびその他の識別情報、発見された年、および発見場所の座標が含まれています。データをFoundryにアップロードする前に、CSVを開いてデータを確認することをお勧めします。
Pythonコードリポジトリを作成して始めましょう。
または、以下の手順でローカルPythonリポジトリをCode Repositoriesにコピーすることもできます。
git remote remove origin
git remote add origin <repository_url>
GitHubインターフェースの右上隅にコードリポジトリのURLが表示されます。緑色の Clone ボタンを選択し、GitリモートURLをコピーします。
これを確認するには、git remote -v
を実行してコードリポジトリのURLを返します。
master
ブランチ(または選択した別のブランチ)をCode Repositoriesからローカルブランチにマージします: git merge master
refusing to merge unrelated histories
というエラーが発生した場合は、次のコマンドを実行します: git merge master --allow-unrelated-histories
。これにより、以前のリモートGitHubリポジトリに関連付けられた現在のGit履歴が削除されます。
このマージにより、Code Repositoriesでコミットや変更を行うために必要なファイルがローカルリポジトリに取り込まれます。
testbranch
): git checkout -b testbranch
git push
を実行し、新しいブランチがCode Repositoriesインターフェースに表示されることを確認します。チェックが成功したことを確認します。Code Repositoriesでのローカル開発についてさらに学びましょう。
Transforms Pythonリポジトリに移動します。デフォルトの examples.py
ファイルには、開始するためのサンプルコードが含まれています。
まず、src/myproject/datasets
に新しいファイルを作成し、それを meteor_analysis.py
と命名して分析を整理します。必要な関数やクラスをインポートすることを忘れないでください。meteor_landings
データセットを入力として受け取り、meteor_landings_cleaned
を出力として作成するトランスフォームを定義します。
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F @transform_df( # 出力データセットのパスをここに置き換えてください Output("/Users/jsmith/meteorite_landings_cleaned"), # 入力データセットのパスをここに置き換えてください meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(meteorite_landings): <データ変換ロジックをここに記述してください>
たとえば、1950年以降に発生した「Valid」な隕石に入力データセットをフィルター処理する場合、データトランスフォーメーションロジックを更新して nametype
と year
で隕石をフィルター処理します。
Copied!1 2 3 4 5 6
def clean(meteorite_landings): return meteorite_landings.filter( meteorite_landings.nametype == 'Valid' # nametypeが'Valid'であるものをフィルタリング ).filter( meteorite_landings.year >= 1950 # 年が1950年以降のものをフィルタリング )
結果のデータセットを構築するには、変更をコミットし、右上隅の Build を選択します。Code Repositories でデータセットを構築する方法の詳細については、Create a simple batch pipeline チュートリアルを参照してください。
Pythonトランスフォーム を使用すると、1 つの Python ファイルで複数の出力データセットを作成できます。
たとえば、隕石のタイプに対して特に大きかった隕石のみをさらにフィルター処理したいとします。そのためには、以下の手順が必要です。
まず、meteor_analysis.py
に各隕石タイプの平均質量を見つけるデータ変換を追加します。この変換は meteor_landings
データセットを入力として取り、meteorite_stats
を出力として作成します。
Copied!1 2 3 4 5 6 7 8 9
@transform_df( # 出力データセット名は一意でなければなりません Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(meteorite_landings): return meteorite_landings.groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") # 各クラスごとの平均質量を計算 )
次に、各隕石の質量をその隕石タイプの平均質量と比較するデータトランスフォームを作成します。このトランスフォームに必要な情報は、これまでのチュートリアルで作成した meteorite_landings
テーブルと meteorite_stats
テーブルに分散しています。これら 2 つのデータセットを結合し、平均質量を超える質量を持つ隕石を見つけるためにフィルター処理します。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# このデータ変換は2つの入力データセットに基づいています @transform_df( Output("/Users/jsmith/meteorite_enriched"), # 出力データセットのパス meteorite_landings=Input("/Users/jsmith/meteorite_landings"), # 隕石着陸データセットの入力パス meteorite_stats=Input("/Users/jsmith/meteorite_stats") # 隕石統計データセットの入力パス ) def enriched(meteorite_landings, meteorite_stats): # 隕石着陸データセットと隕石統計データセットを"クラス"列で結合 enriched_together = meteorite_landings.join( meteorite_stats, "class" ) # 各行の質量がクラス平均質量を超えるかどうかを示す新しい列'greater_mass'を追加 greater_mass = enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) # 'greater_mass'列がTrueの行のみをフィルタリングして返す return greater_mass.filter("greater_mass")
Contour で meteorite_enriched
データセットをさらに分析できます。
これまで、平均よりも大きな質量を持つすべての隕石タイプを含むデータセットを作成してきました。たとえば、各隕石タイプの個別のデータセットを作成したい場合、Transforms Python を使用して、for ループを用いて各隕石タイプに同じデータトランスフォームを適用できます。異なる入力に同じデータトランスフォームを適用する方法については、Transform generation セクションを参照してください。
src/myproject/datasets
に新しいファイルを作成し、meteor_class.py
と名付けてください。なお、meteor_analysis.py
ファイルにデータトランスフォームコードを書き続けることもできますが、このチュートリアルではデータトランスフォームロジックを分離するために新しいファイルを使用します。
各隕石タイプの個別のデータセットを作成するために、meteorite_enriched
データセットをクラスごとにフィルター処理します。分析したい各隕石タイプに同じデータトランスフォームロジックを適用する transform_generator
関数を定義します:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from transforms.api import transform_df, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform_df( # 各隕石タイプに対して異なる出力データセットを作成します Output('/Users/jsmith/meteorite_{source}'.format(source=source)), my_input=Input('/Users/jsmith/meteorite_enriched') ) # "source=source" は、この関数のスコープ内で source 変数の値をキャプチャします def filter_by_source(my_input, source=source): return my_input.filter(my_input["class"] == source) transforms.append(filter_by_source) return transforms # 上記のデータ変換ロジックを提供された3つの隕石タイプに適用します TRANSFORMS = transform_generator(["L6", "H5", "H4"])
これは、クラスによって隕石データセットをフィルター処理するトランスフォーメーションを作成します。関数のスコープ内でsource
パラメーターを取得するために、filter_by_source
関数にsource=source
を渡す必要があることに注意してください。
meteor_analysis.py
ファイルで作成された初期データトランスフォーメーションでは、プロジェクトのPipelineにトランスフォームを追加するための追加の設定は不要でした。これは、デフォルトのPythonプロジェクト構造が自動登録を使用してdatasets
フォルダー内のすべてのトランスフォームオブジェクトを発見するためです。
自動登録を使用してこの最終的なトランスフォーメーションもプロジェクトのPipelineに追加するには、生成されたトランスフォームをリストとして変数に追加する必要があります。上記の例では、変数TRANSFORMS
を使用しました。自動登録とトランスフォームジェネレーターについての詳細は、Transforms PythonドキュメントのTransforms generationセクションを参照してください。