注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このページで説明されているPythonリポジトリのユニットテストは、バッチパイプラインにのみ適用可能で、ストリーミングパイプラインではサポートされていません。
Pythonリポジトリでは、チェックの一部としてテストを実行するオプションがあります。これらのテストは、人気のあるPythonテストフレームワーク、PyTest を使用して実行されます。
すべてのCIチェックには、他のタスクの中に、condaPackRunが含まれています。
condaPackRunは環境のインストールを担当します。各作成物は適切なチャネルから取得され、Condaはこれらの作成物を使用して環境を構築します。このタスクには3つのステージがあります:
環境仕様は次のビルドのキャッシュとして、以下の隠しファイルに保存されます:
キャッシュは7日間保存されます。meta.yaml ファイルに何らかの変更があった場合は再キャッシュされます。
このタスクは、リポジトリに追加されるパッケージの数に大きく依存します。パッケージを多く追加すればするほど、タスクの実行速度は遅くなります。
PEP8 / PyLintスタイルチェックは、ユーザーのPythonプロジェクトのbuild.gradle
ファイルでcom.palantir.conda.pep8
およびcom.palantir.conda.pylint
Gradleプラグインを適用することで有効化できます。transformsリポジトリでは、これはPythonサブプロジェクトに存在します。ライブラリリポジトリでは、これはルートフォルダーに存在します。
transformsのbuild.gradle
は次のようになります:
Copied!1 2 3 4 5 6 7 8 9 10
// 'com.palantir.transforms.lang.python'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python' // 'com.palantir.transforms.lang.python-defaults'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python-defaults' // pep8 lintingプラグインを適用します // Apply the pep8 linting plugin apply plugin: 'com.palantir.conda.pep8' // 'com.palantir.conda.pylint'プラグインを適用します apply plugin: 'com.palantir.conda.pylint'
PyLint は、ユーザーの Python プロジェクト内の src/.pylintrc
で設定できます。
例えば、特定のメッセージを無効にすることができます:
[MESSAGES CONTROL]
disable =
missing-module-docstring, # モジュールドキュメンテーション文字列がない
missing-function-docstring # 関数ドキュメンテーション文字列がない
Foundryで動作が保証されている PyLint の設定はすべてではありません。src/.pylintrc
の機能がチェックに表示されない場合は、その機能がサポートされていないことを示します。
Spark アンチパターン リンターは、Python プロジェクトの build.gradle
ファイルで com.palantir.transforms.lang.antipattern-linter
Gradle プラグインを適用することで有効にできます。
Copied!1 2
// アンチパターンリンターを適用する apply plugin: 'com.palantir.transforms.lang.antipattern-linter'
Spark アンチパターンプラグインは、正確性の問題、Spark のパフォーマンス低下、およびセキュリティへの影響など、Spark でよくあるアンチパターンの使用に対して警告します。
テストは、Python プロジェクトの build.gradle
ファイルで com.palantir.transforms.lang.pytest-defaults
Gradle プラグインを適用することで有効にできます。変換リポジトリの場合、これは Python サブプロジェクトに存在します。ライブラリリポジトリの場合、これはルートフォルダーに存在します。
変換の build.gradle
は次のようになります:
Copied!1 2 3 4 5 6
apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Python言語用のプラグインを適用する apply plugin: 'com.palantir.transforms.lang.pytest-defaults' // テスト用のプラグインを適用する
そして、ライブラリの build.gradle
は次のようになります:
Copied!1 2 3 4 5 6 7 8 9 10 11 12
apply plugin: 'com.palantir.transforms.lang.python-library' apply plugin: 'com.palantir.transforms.lang.python-library-defaults' // テストプラグインを適用 apply plugin: 'com.palantir.transforms.lang.pytest-defaults' pythonLibrary { publishChannelName = 'libs' } // タグ付きリリースのみ公開する(最後のgitタグからのコミットがゼロ) condaLibraryPublish.onlyIf { versionDetails().commitDistance == 0 }
meta.yamlで定義されたランタイム要件は、ユーザーのテストで利用可能です。conda test セクションで追加の要件を指定することもできます。
完全なドキュメントは https://docs.pytest.org で見つけることができます。
PyTest は、test_
で始まるか、_test.py
で終わる任意の Python ファイル内のテストを検索します。プロジェクトの src
ディレクトリの下にある test
パッケージにすべてのテストを配置することをお勧めします。テストは test_
プレフィックスで名前が付けられた Python 関数であり、アサーションは Python の assert
ステートメントを使用して行われます。PyTest は、Python の組み込みの unittest
モジュールを使用して記述されたテストも実行します。
例えば、transforms-python/src/test/test_increment.py
にある簡単なテストは次のようになります:
Copied!1 2 3 4 5 6 7 8 9
# numという引数を1つ増やす関数を定義します def increment(num): return num + 1 # increment関数のテストを行うための関数を定義します # increment関数に3を引数として渡すと、結果は4になるべきです # しかし、ここでは5と等しいという間違ったアサーションが設定されています def test_increment(): assert increment(3) == 5
このテストを実行すると、次のようなメッセージと共にチェックが失敗します:
============================= テストセッションの開始 =============================
1つのアイテムを収集
test_increment.py F [100%]
================================== 失敗 ===================================
_______________________________ test_increment ________________________________
def test_increment():
> assert increment(3) == 5
E assert 4 == 5
E + ここで 4 = increment(3)
test_increment.py:5: AssertionError
========================== 1つの失敗が0.08秒で発生 ===========================
PyTest フィクスチャは、同じ名前のパラメーターを追加するだけでテスト関数に値を注入することを可能にする強力な機能です。
この機能は、ユーザーのテスト関数で使用するための spark_session
フィクスチャを提供するために使用されます。例えば:
Copied!1 2 3 4 5
def test_dataframe(spark_session): # Sparkセッションからデータフレームを作成 df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number']) # データフレームのスキーマ(構造)が正しいことを確認 assert df.schema.names == ['letter', 'number']
CSVファイルはCode Repositoryに保存され、データ変換のテスト入力として使用できます。
以下のセクションでは、次のデータ変換がtransforms-python/src/myproject/datasets/
で作成されていると仮定した例を示します。
find_aircraft.py
Copied!1 2 3 4 5 6 7 8 9 10 11
from pyspark.sql import functions as F from transforms.api import transform_df, Input, Output # 関数定義 @transform_df( Output("<output_dataset_rid>"), aircraft_df=Input("<input_dataset_rid>"), ) def compute(aircraft_df): # シート数が300より大きく、運用ステータスが "Yes" の航空機データをフィルタリング return aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes")))
次の2つのCSVファイルとその内容をフォルダーに入れてください:
transforms-python/src/test/resources/
:
aircraft_mock.csv
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# tail_number: 尾部番号 # serial_number: シリアル番号 # manufacture_year: 製造年 # manufacturer: 製造元 # model: モデル # number_of_seats: 座席数 # capacity_in_pounds: ポンドでのキャパシティ # operating_status: 運用状況 # aircraft_status: 航空機の状態 # acquisition_date: 取得日 # model_type: モデルタイプ tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type AAA1,20809,1990,Manufacturer_1,M1-100,1,3500,Yes,Owned,13/8/90,208 BBB1,46970,2013,Manufacturer_2,M2-300,310,108500,No,Owned,10/15/14,777 CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777 DDD1,58340,2014,Manufacturer_3,M3-200,294,100000,Yes,Leased,11/21/13,330 EEE1,58600,2013,Manufacturer_2,M2-300,300,47200,Yes,Leased,12/2/13,777
expected_filtered_aircraft.csv
Copied!1 2 3 4
// 日本語のコメントを追加します tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type // しっぽの番号, 製造番号, 製造年, メーカー, モデル, 座席数, ポンドでの容量, 運行状況, 航空機の状態, 取得日, モデルタイプ CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777
以下のテストは transforms-python/src/test/
のパスで記述することができます:
test_find_aircraft.py
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
import os from pathlib import Path from myproject.datasets.find_aircraft import compute # テストリソースのディレクトリへのパスを設定 TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): # aircraft_mock.csvを読み込み、DataFrameを作成 aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, # スキーマを推論 header=True # ヘッダーが存在する ) # expected_filtered_aircraft.csvを読み込み、期待される結果のDataFrameを作成 expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, # スキーマを推論 header=True # ヘッダーが存在する ) # compute関数を使用して、結果DataFrameを作成 result_df = compute(aircraft_mock_df) # 結果DataFrameのカラムと期待される結果のDataFrameのカラムが一致することを確認 assert result_df.columns == expected_filtered_aircraft_df.columns # 結果DataFrameの行数と期待される結果のDataFrameの行数が一致することを確認 assert result_df.count() == expected_filtered_aircraft_df.count() # 結果DataFrameに期待される結果のDataFrameに存在しない行がないことを確認 assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 # 期待される結果のDataFrameに結果DataFrameに存在しない行がないことを確認 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0
最終的なリポジトリ構造は、次の画像のようになります。
テストは transforms-python/src/test/test_find_aircraft.py
にあります。
入力と期待される出力の CSV リソースは transforms-python/src/test/resources
にあります。
変換関数が transform_df
ではなく transform()
でデコレートされると、変換関数は結果のデータフレームを返さず、代わりに関数に送られた Output
オブジェクトのいずれかを使用して、結果をデータセットにマテリアライズします。ロジックをテストするために、マテリアライズするために送られた値をインターセプトするために、Output
引数のモックを使用する必要があります。
上記のデータ変換が transform()
デコレータを使用するように変更した場合:
find_aircraft_transform_decorator.py
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# pyspark.sqlからfunctionsをFという名前でインポートします from pyspark.sql import functions as F # transforms.apiからtransform、Input、Outputをインポートします from transforms.api import transform, Input, Output # transformデコレータを使用して、結果の出力と航空機の入力を指定します @transform( results_output=Output("<output_dataset_rid>"), # 結果を出力するデータセットのRIDを指定します aircraft_input=Input("<input_dataset_rid>"), # 入力として使用する航空機のデータセットのRIDを指定します ) def compute(results_output, aircraft_input): # 計算を行う関数を定義します # aircraft_inputからデータフレームを取得します aircraft_df = aircraft_input.dataframe() # "number_of_seats"が300以上、かつ"operating_status"が"Yes"である行だけをフィルタリングしたデータフレームを作成します results_df = aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes"))) # results_outputに結果のデータフレームを書き込みます results_output.write_dataframe(results_df)
検証のテスト中に、変換関数は現在 Input()
を aircraft_input
引数として期待しており、result_df
の値を results_output
に送信するためにはこれをインターセプトする必要があります。
MagicMock(外部)を使用して、両方のインスタンスに必要なラッパーを作成できます。
test_find_aircraft_transform_decorator.py
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
import os from pathlib import Path from unittest.mock import MagicMock from myproject.datasets.find_aircraft_transform_decorator import compute from transforms.api import Input TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): # 入力データを読み込み aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, header=True ) # 期待される出力データを読み込み expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, header=True ) # 出力用のモックオブジェクトを作成 results_output_mock = MagicMock() # 入力用のラッパーを作成し、データセットを設定 aircraft_mock_input = Input() aircraft_mock_input.dataframe = MagicMock(return_value=aircraft_mock_df) # モック出力オブジェクトで変換を実行 compute( results_output=results_output_mock, aircraft_input=aircraft_mock_input ) # モックオブジェクトに対してwrite_dataframeが呼ばれた引数を取得し、 # 書き込まれるデータフレームを抽出 args, kwargs = results_output_mock.write_dataframe.call_args result_df = args[0] # 結果のデータフレームと期待されるデータフレームが一致することを確認 assert result_df.columns == expected_filtered_aircraft_df.columns assert result_df.count() == expected_filtered_aircraft_df.count() assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0
設定された全てのテストの出力は Checks
タブに表示され、各テストごとに別々の出力が表示されます。デフォルトでは、テスト結果は PASSED、FAILED、または SKIPPED のステータスとともに折りたたまれて表示されます。各テストを展開する(または全てのテストを展開する)と、テスト出力とStdOutおよびStdErrのログが表示されます。
PyTest coverage を使用して、ユーザーのリポジトリに対するカバレッジを計算し、最小パーセンテージを強制することができます。
リポジトリの meta.yml
に以下を追加してください:
Copied!1 2 3
test: requires: - pytest-cov # pytest-covは、pytestでテストを行う際に、テストカバレッジ(テストがどれだけのコードをカバーしているか)を計測するためのライブラリです。
/transforms-python/src/pytest.ini
ファイルを作成し、以下の内容を記載してください:
Copied!1 2 3 4 5 6 7
[pytest] addopts = --cov=<<パッケージ名, 例えば myproject>> --cov-report term --cov-fail-under=100 # [pytest]はpytestの設定をするセクションです。 # addoptsはpytestに追加のオプションを付けるための設定です。 # --covはカバレッジを計測するパッケージを指定します。ここでは<<パッケージ名, 例えば myproject>>を指定します。 # --cov-report termはカバレッジレポートをターミナルに出力することを指定します。 # --cov-fail-under=100はカバレッジが100%に達しない場合にテストを失敗させるようにします。
カバレッジによるチェックの失敗が必要な範囲はカスタマイズ可能です。--cov-fail-under
引数にパーセンテージを選択してください。
指定された量よりもカバレッジが低いテストの実行は、以下の出力で失敗します。
デフォルトでは、PyTest はテストを逐次実行します。トランスフォームの build.gradle
を調整して、numProcesses
の値を設定することで、複数の CPU にテストを送信してテスト実行を高速化することができます。その値は使用するプロセスの数を反映させてください。
Copied!1 2 3 4 5 6 7 8 9 10 11 12
// 'com.palantir.transforms.lang.python'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python' // 'com.palantir.transforms.lang.python-defaults'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python-defaults' // テストプラグインを適用します apply plugin: 'com.palantir.transforms.lang.pytest-defaults' tasks.pytest { // プロセス数を "3" に設定します numProcesses "3" }
テストの並列化は、pytest-xdist テストプラグインを使用して実行されます。
テストを並列化すると、保留中のテストが利用可能なワーカーに送信されますが、順序は保証されません。他の先行テストによる変更を予期しているグローバル/共有状態を必要とするテストは、それに応じて調整されるべきです。
:transforms-python:pytest
タスクが実行されることが確認できるはずです。test_
に基づいて検出されます。これは PyTest の標準規則です。.collect()
を呼び出すことができます。