注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
以下の手順は、シンプルなPythonデータ変換を説明しています。データ変換を始めたばかりの場合は、まずPipeline BuilderまたはCode Repositoriesのバッチパイプラインチュートリアルをご覧ください。
このチュートリアルでは、最近の隕石の発見に関するスプレッドシートを、分析のための使えるデータセットに変換する方法を、Transforms Pythonを用いて説明します。
このチュートリアルでは、NASAのオープンデータポータルからのデータを使用します。以下のサンプルデータセットを用いて、ご自身のCode Repositoryで進めることができます:
このデータセットには、地球上で発見された隕石に関するデータが含まれています。なお、データは作業しやすいようにクリーンアップされています。
データセットには、各隕石の名前、質量、分類、その他の識別情報、発見年、発見場所の座標などが含まれています。Foundryにアップロードする前に、CSVを開いてデータを確認することが良い習慣です。
まず、Pythonコードリポジトリを作成します。
または、以下の手順で、ローカルのPythonリポジトリをCode Repositoriesにコピーすることもできます:
git remote remove origin
git remote add origin <repository_url>
GitHubインターフェースの右上角でコードリポジトリURLを見つけることができます。緑色のCloneボタンを選択し、Git remote URLをコピーします。
git remote -v
を実行して、コードリポジトリURLが返されることを確認します。
master
ブランチ(または選択した他のブランチ)をローカルブランチにマージします:git merge master
refusing to merge unrelated histories
というエラーが発生した場合、コマンド:git merge master --allow-unrelated-histories
を実行します。これにより、以前のリモートGitHubリポジトリに関連する現在のGit履歴が削除されます。
このマージにより、Code Repositoriesでコミットや変更を行うために必要な重要なファイルがローカルリポジトリに持ち込まれます。
develop
):git checkout develop
。git push
を実行し、新しいブランチがCode Repositoriesインターフェースに表示されることを確認します。チェックが成功したことを確認します。Code Repositoriesのローカル開発について詳しくはこちらをご覧ください。
Transforms Pythonリポジトリに移動します。デフォルトのexamples.py
ファイルには、始めるための例示コードが含まれています。
まず、src/myproject/datasets
に新しいファイルを作成し、それをmeteor_analysis.py
と命名して分析を整理します。必要な関数とクラスをインポートすることを忘れないでください。meteor_landings
データセットを入力とする変換を定義し、その出力としてmeteor_landings_cleaned
を作成します:
Copied!1 2 3 4 5 6 7 8 9 10 11
from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F @transform_df( # ここをあなたの出力データセットのパスに置き換えてください Output("/Users/jsmith/meteorite_landings_cleaned"), # ここをあなたの入力データセットのパスに置き換えてください meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(meteorite_landings): <あなたのデータ変換ロジック>
次に、1950年以降に発生した「有効な」メテオを入力データセットからフィルター処理することを想定します。メテオライトを「nametype」および「year」でフィルター処理するために、データ変換ロジックを更新します:
Copied!1 2 3 4 5 6 7
def clean(meteorite_landings): # メテオライトの着陸データをフィルタリングする関数 return meteorite_landings.filter( meteorite_landings.nametype == 'Valid' # 名前のタイプが'Valid'のものだけフィルタリング ).filter( meteorite_landings.year >= 1950 # 1950年以降のものだけフィルタリング )
結果のデータセットを作成するには、変更をコミットし、データセットをビルドするためにデータセットに移動します。Code Repositoriesでデータセットをビルドする方法の詳細は、シンプルなバッチパイプラインの作成のチュートリアルをご覧ください。
Pythonトランスフォームを使用すると、1つのPythonファイルで複数の出力データセットを作成できます。
たとえば、特にそのメテオライトタイプに大きな隕石だけをさらにフィルター処理したいとします。そのためには、次のことが必要です:
まず、各メテオライトタイプの平均質量を見つけるためのデータ変換をmeteor_analysis.py
に追加します。この変換は、ユーザーのmeteor_landings
データセットを入力として取り、meteorite_stats
をその出力として作成します:
Copied!1 2 3 4 5 6 7 8 9 10
@transform_df( # 出力データセットの名前は一意でなければならない Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(meteorite_landings): # メテオライトの着陸データをクラス別にグループ化し、各クラスの平均質量を計算 return meteorite_landings.groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") )
次に、各隕石の質量を、その隕石タイプの平均質量と比較するデータ変換を作成します。この変換に必要な情報は、このチュートリアルでこれまでに作成した meteorite_landings
と meteorite_stats
テーブルに分散しています。2つのデータセットを結合し、平均質量よりも大きい隕石をフィルター処理する必要があります:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# このデータ変換は、2つの入力データセットに基づいています @transform_df( Output("/Users/jsmith/meteorite_enriched"), # 隕石の着陸データ meteorite_landings=Input("/Users/jsmith/meteorite_landings"), # 隕石の統計データ meteorite_stats=Input("/Users/jsmith/meteorite_stats") ) def enriched(meteorite_landings, meteorite_stats): # 両方のデータセットを結合 enriched_together=meteorite_landings.join( meteorite_stats, "class" ) # それぞれの隕石の質量がそのクラスの平均質量よりも大きいかどうかを示す列を追加 greater_mass=enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) # 質量が平均質量より大きい隕石のみを返す return greater_mass.filter("greater_mass")
これで、結果として得られる meteorite_enriched
データセットを Contour で詳しく調べることができます。
これまでのところ、ユーザーは平均以上の質量を持つすべてのタイプの隕石を含むデータセットを作成しました。それぞれの隕石のタイプに対して別々のデータセットを作成したい場合があります。 Transforms Python を使って、同じデータ変換をそれぞれの隕石のタイプに適用するために、for ループを使用できます。同じデータ変換を異なる入力に適用する方法の詳細については、Transform generation のセクションを参照してください。
src/myproject/datasets
に新しいファイルを作成し、meteor_class.py
という名前を付けます。meteor_analysis.py
ファイルでデータ変換コードを書き続けることができますが、このチュートリアルでは、新しいファイルを使用してデータ変換ロジックを分離しています。
それぞれの隕石のタイプに対して別々のデータセットを作成するには、meteorite_enriched
データセットをクラス別にフィルター処理します。ユーザーが分析したいそれぞれの隕石のタイプに対して、同じデータ変換ロジックを適用する transform_generator
関数を定義してください。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from transforms.api import transform_df, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform_df( # これは、各隕石タイプに対して異なる出力データセットを作成します Output('/Users/jsmith/meteorite_{source}'.format(source=source)), my_input=Input('/Users/jsmith/meteorite_enriched') ) # "source=source" は、この関数のスコープ内で source 変数の値を取得します def filter_by_source(my_input, source=source): return my_input.filter(my_input["class"] == source) transforms.append(filter_by_source) return transforms # 上記のデータ変換ロジックを、提供された 3 つの隕石タイプに適用します TRANSFORMS = transform_generator(["L6", "H5", "H4"])
これにより、メテオライトデータセットをクラス別にフィルター処理する変換が作成されます。関数のスコープ内で source
パラメーターをキャプチャするために、 filter_by_source
関数に source=source
を渡す必要があることに注意してください。
meteor_analysis.py
ファイルで作成した初期のデータ変換では、プロジェクトの Pipeline に Transform を追加するために追加の設定を行う必要はありませんでした。これは、デフォルトの Python プロジェクト構造が、datasets
フォルダー内のすべての Transform オブジェクトを自動的に検出する自動登録を使用しているためです。
この最終変換もプロジェクトの Pipeline に自動登録を使用して追加するには、生成された変換をリストとして変数に追加する必要があります。上記の例では、変数 TRANSFORMS
を使用しました。自動登録と変換ジェネレータについての詳細は、Transforms Python のドキュメントの 変換の生成 のセクションを参照してください。