注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

はじめに

Tip

以下の手順は、シンプルなPythonデータ変換を説明しています。データ変換を始めたばかりの場合は、まずPipeline BuilderまたはCode Repositoriesのバッチパイプラインチュートリアルをご覧ください。

このチュートリアルでは、最近の隕石の発見に関するスプレッドシートを、分析のための使えるデータセットに変換する方法を、Transforms Pythonを用いて説明します。

データセットについて

このチュートリアルでは、NASAのオープンデータポータルからのデータを使用します。以下のサンプルデータセットを用いて、ご自身のCode Repositoryで進めることができます:

meteorite_landingsをダウンロード

このデータセットには、地球上で発見された隕石に関するデータが含まれています。なお、データは作業しやすいようにクリーンアップされています。

データセットには、各隕石の名前、質量、分類、その他の識別情報、発見年、発見場所の座標などが含まれています。Foundryにアップロードする前に、CSVを開いてデータを確認することが良い習慣です。

Pythonコードリポジトリの設定

まず、Pythonコードリポジトリを作成します。

  1. プロジェクトに移動し、+ New > Code repository を選択します。
  2. Repository type セクションで、Data Transforms を選択します。
  3. Language template として Python を選択します。
  4. Initialize repository を選択します。

ローカルのPythonリポジトリを使用する

または、以下の手順で、ローカルのPythonリポジトリをCode Repositoriesにコピーすることもできます:

  1. 上述のように新しいPythonコードリポジトリを作成します。
  2. ローカルのリポジトリで、以前のGit originを削除します(例えば、GitHubからクローンした場合):git remote remove origin
  3. コードリポジトリのGit remote URLを追加します:git remote add origin <repository_url>

GitHubインターフェースの右上角でコードリポジトリURLを見つけることができます。緑色のCloneボタンを選択し、Git remote URLをコピーします。

git remote -v を実行して、コードリポジトリURLが返されることを確認します。

  1. Code Repositoriesの現在の master ブランチ(または選択した他のブランチ)をローカルブランチにマージします:git merge master

refusing to merge unrelated histories というエラーが発生した場合、コマンド:git merge master --allow-unrelated-histories を実行します。これにより、以前のリモートGitHubリポジトリに関連する現在のGit履歴が削除されます。

このマージにより、Code Repositoriesでコミットや変更を行うために必要な重要なファイルがローカルリポジトリに持ち込まれます。

  1. 新しいブランチを作成し、名前を付けます(例えば、develop):git checkout develop
  2. 変更を行い、それらをブランチにコミットします。
  3. git pushを実行し、新しいブランチがCode Repositoriesインターフェースに表示されることを確認します。チェックが成功したことを確認します。

Code Repositoriesのローカル開発について詳しくはこちらをご覧ください。

Pythonデータ変換を書く

Transforms Pythonリポジトリに移動します。デフォルトのexamples.pyファイルには、始めるための例示コードが含まれています。 まず、src/myproject/datasetsに新しいファイルを作成し、それをmeteor_analysis.pyと命名して分析を整理します。必要な関数とクラスをインポートすることを忘れないでください。meteor_landingsデータセットを入力とする変換を定義し、その出力としてmeteor_landings_cleanedを作成します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 from transforms.api import transform_df, Input, Output from pyspark.sql import functions as F @transform_df( # ここをあなたの出力データセットのパスに置き換えてください Output("/Users/jsmith/meteorite_landings_cleaned"), # ここをあなたの入力データセットのパスに置き換えてください meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(meteorite_landings): <あなたのデータ変換ロジック>

次に、1950年以降に発生した「有効な」メテオを入力データセットからフィルター処理することを想定します。メテオライトを「nametype」および「year」でフィルター処理するために、データ変換ロジックを更新します:

Copied!
1 2 3 4 5 6 7 def clean(meteorite_landings): # メテオライトの着陸データをフィルタリングする関数 return meteorite_landings.filter( meteorite_landings.nametype == 'Valid' # 名前のタイプが'Valid'のものだけフィルタリング ).filter( meteorite_landings.year >= 1950 # 1950年以降のものだけフィルタリング )

ユーザーの出力データセットの作成

結果のデータセットを作成するには、変更をコミットし、データセットをビルドするためにデータセットに移動します。Code Repositoriesでデータセットをビルドする方法の詳細は、シンプルなバッチパイプラインの作成のチュートリアルをご覧ください。

データ変換の追加

Pythonトランスフォームを使用すると、1つのPythonファイルで複数の出力データセットを作成できます。

たとえば、特にそのメテオライトタイプに大きな隕石だけをさらにフィルター処理したいとします。そのためには、次のことが必要です:

  1. 各メテオライトタイプの平均質量を見つける
  2. 各隕石の質量をその隕石タイプの平均質量と比較する

まず、各メテオライトタイプの平均質量を見つけるためのデータ変換をmeteor_analysis.pyに追加します。この変換は、ユーザーのmeteor_landingsデータセットを入力として取り、meteorite_statsをその出力として作成します:

Copied!
1 2 3 4 5 6 7 8 9 10 @transform_df( # 出力データセットの名前は一意でなければならない Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(meteorite_landings): # メテオライトの着陸データをクラス別にグループ化し、各クラスの平均質量を計算 return meteorite_landings.groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") )

次に、各隕石の質量を、その隕石タイプの平均質量と比較するデータ変換を作成します。この変換に必要な情報は、このチュートリアルでこれまでに作成した meteorite_landingsmeteorite_stats テーブルに分散しています。2つのデータセットを結合し、平均質量よりも大きい隕石をフィルター処理する必要があります:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # このデータ変換は、2つの入力データセットに基づいています @transform_df( Output("/Users/jsmith/meteorite_enriched"), # 隕石の着陸データ meteorite_landings=Input("/Users/jsmith/meteorite_landings"), # 隕石の統計データ meteorite_stats=Input("/Users/jsmith/meteorite_stats") ) def enriched(meteorite_landings, meteorite_stats): # 両方のデータセットを結合 enriched_together=meteorite_landings.join( meteorite_stats, "class" ) # それぞれの隕石の質量がそのクラスの平均質量よりも大きいかどうかを示す列を追加 greater_mass=enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) # 質量が平均質量より大きい隕石のみを返す return greater_mass.filter("greater_mass")

これで、結果として得られる meteorite_enriched データセットを Contour で詳しく調べることができます。

複数の入力にデータ変換を適用する

これまでのところ、ユーザーは平均以上の質量を持つすべてのタイプの隕石を含むデータセットを作成しました。それぞれの隕石のタイプに対して別々のデータセットを作成したい場合があります。 Transforms Python を使って、同じデータ変換をそれぞれの隕石のタイプに適用するために、for ループを使用できます。同じデータ変換を異なる入力に適用する方法の詳細については、Transform generation のセクションを参照してください。

src/myproject/datasets に新しいファイルを作成し、meteor_class.py という名前を付けます。meteor_analysis.py ファイルでデータ変換コードを書き続けることができますが、このチュートリアルでは、新しいファイルを使用してデータ変換ロジックを分離しています。

それぞれの隕石のタイプに対して別々のデータセットを作成するには、meteorite_enriched データセットをクラス別にフィルター処理します。ユーザーが分析したいそれぞれの隕石のタイプに対して、同じデータ変換ロジックを適用する transform_generator 関数を定義してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from transforms.api import transform_df, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform_df( # これは、各隕石タイプに対して異なる出力データセットを作成します Output('/Users/jsmith/meteorite_{source}'.format(source=source)), my_input=Input('/Users/jsmith/meteorite_enriched') ) # "source=source" は、この関数のスコープ内で source 変数の値を取得します def filter_by_source(my_input, source=source): return my_input.filter(my_input["class"] == source) transforms.append(filter_by_source) return transforms # 上記のデータ変換ロジックを、提供された 3 つの隕石タイプに適用します TRANSFORMS = transform_generator(["L6", "H5", "H4"])

これにより、メテオライトデータセットをクラス別にフィルター処理する変換が作成されます。関数のスコープ内で source パラメーターをキャプチャするために、 filter_by_source 関数に source=source を渡す必要があることに注意してください。

ヒント

meteor_analysis.py ファイルで作成した初期のデータ変換では、プロジェクトの Pipeline に Transform を追加するために追加の設定を行う必要はありませんでした。これは、デフォルトの Python プロジェクト構造が、datasetsフォルダー内のすべての Transform オブジェクトを自動的に検出する自動登録を使用しているためです。

この最終変換もプロジェクトの Pipeline に自動登録を使用して追加するには、生成された変換をリストとして変数に追加する必要があります。上記の例では、変数 TRANSFORMS を使用しました。自動登録と変換ジェネレータについての詳細は、Transforms Python のドキュメントの 変換の生成 のセクションを参照してください。