データ統合Python基本的な変換ユニットテスト

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

ユニットテスト

このページで説明されているPythonリポジトリのユニットテストは、バッチパイプラインにのみ適用可能で、ストリーミングパイプラインではサポートされていません。

Pythonリポジトリでは、チェックの一部としてテストを実行するオプションがあります。これらのテストは、人気のあるPythonテストフレームワーク、PyTest を使用して実行されます。

CIタスク:condaPackRun

すべてのCIチェックには、他のタスクの中に、condaPackRunが含まれています。

リポジトリのCIチェックのビルド時間の概要

condaPackRunは環境のインストールを担当します。各作成物は適切なチャネルから取得され、Condaはこれらの作成物を使用して環境を構築します。このタスクには3つのステージがあります:

  1. 解決された環境内のすべてのパッケージをダウンロードして展開します。
  2. パッケージの内容を確認します。設定により、Condaはチェックサムを使用するか、ファイルサイズが正しいことを確認します。
  3. パッケージを環境にリンクします。

環境仕様は次のビルドのキャッシュとして、以下の隠しファイルに保存されます:

  • conda-version-run.linux-64.lock
  • conda-version-test.linux-64.lock

キャッシュは7日間保存されます。meta.yaml ファイルに何らかの変更があった場合は再キャッシュされます。

このタスクは、リポジトリに追加されるパッケージの数に大きく依存します。パッケージを多く追加すればするほど、タスクの実行速度は遅くなります。

スタイルチェックの有効化

PEP8 / PyLintスタイルチェックは、ユーザーのPythonプロジェクトのbuild.gradleファイルでcom.palantir.conda.pep8およびcom.palantir.conda.pylint Gradleプラグインを適用することで有効化できます。transformsリポジトリでは、これはPythonサブプロジェクトに存在します。ライブラリリポジトリでは、これはルートフォルダーに存在します。 transformsのbuild.gradleは次のようになります:

Copied!
1 2 3 4 5 6 7 8 9 10 // 'com.palantir.transforms.lang.python'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python' // 'com.palantir.transforms.lang.python-defaults'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python-defaults' // pep8 lintingプラグインを適用します // Apply the pep8 linting plugin apply plugin: 'com.palantir.conda.pep8' // 'com.palantir.conda.pylint'プラグインを適用します apply plugin: 'com.palantir.conda.pylint'

PyLint は、ユーザーの Python プロジェクト内の src/.pylintrc で設定できます。 例えば、特定のメッセージを無効にすることができます:

[MESSAGES CONTROL]
disable =
    missing-module-docstring,  # モジュールドキュメンテーション文字列がない
    missing-function-docstring  # 関数ドキュメンテーション文字列がない
PyLint の制限事項

Foundryで動作が保証されている PyLint の設定はすべてではありません。src/.pylintrcの機能がチェックに表示されない場合は、その機能がサポートされていないことを示します。

Spark アンチパターン プラグインの有効化

Spark アンチパターン リンターは、Python プロジェクトの build.gradle ファイルで com.palantir.transforms.lang.antipattern-linter Gradle プラグインを適用することで有効にできます。

Copied!
1 2 // アンチパターンリンターを適用する apply plugin: 'com.palantir.transforms.lang.antipattern-linter'

Spark アンチパターンプラグインは、正確性の問題、Spark のパフォーマンス低下、およびセキュリティへの影響など、Spark でよくあるアンチパターンの使用に対して警告します。

テストの有効化

テストは、Python プロジェクトの build.gradle ファイルで com.palantir.transforms.lang.pytest-defaults Gradle プラグインを適用することで有効にできます。変換リポジトリの場合、これは Python サブプロジェクトに存在します。ライブラリリポジトリの場合、これはルートフォルダーに存在します。 変換の build.gradle は次のようになります:

Copied!
1 2 3 4 5 6 apply plugin: 'com.palantir.transforms.lang.python' apply plugin: 'com.palantir.transforms.lang.python-defaults' // Python言語用のプラグインを適用する apply plugin: 'com.palantir.transforms.lang.pytest-defaults' // テスト用のプラグインを適用する

そして、ライブラリの build.gradle は次のようになります:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 apply plugin: 'com.palantir.transforms.lang.python-library' apply plugin: 'com.palantir.transforms.lang.python-library-defaults' // テストプラグインを適用 apply plugin: 'com.palantir.transforms.lang.pytest-defaults' pythonLibrary { publishChannelName = 'libs' } // タグ付きリリースのみ公開する(最後のgitタグからのコミットがゼロ) condaLibraryPublish.onlyIf { versionDetails().commitDistance == 0 }

meta.yamlで定義されたランタイム要件は、ユーザーのテストで利用可能です。conda test セクションで追加の要件を指定することもできます。

テストの作成

完全なドキュメントは https://docs.pytest.org で見つけることができます。

PyTest は、test_ で始まるか、_test.py で終わる任意の Python ファイル内のテストを検索します。プロジェクトの src ディレクトリの下にある test パッケージにすべてのテストを配置することをお勧めします。テストは test_ プレフィックスで名前が付けられた Python 関数であり、アサーションは Python の assert ステートメントを使用して行われます。PyTest は、Python の組み込みの unittest モジュールを使用して記述されたテストも実行します。 例えば、transforms-python/src/test/test_increment.py にある簡単なテストは次のようになります:

Copied!
1 2 3 4 5 6 7 8 9 # numという引数を1つ増やす関数を定義します def increment(num): return num + 1 # increment関数のテストを行うための関数を定義します # increment関数に3を引数として渡すと、結果は4になるべきです # しかし、ここでは5と等しいという間違ったアサーションが設定されています def test_increment(): assert increment(3) == 5

このテストを実行すると、次のようなメッセージと共にチェックが失敗します:

============================= テストセッションの開始 =============================
1つのアイテムを収集

test_increment.py F                                                       [100%]

================================== 失敗 ===================================
_______________________________ test_increment ________________________________

    def test_increment():
>       assert increment(3) == 5
E       assert 4 == 5
E        +  ここで 4 = increment(3)

test_increment.py:5: AssertionError
========================== 1つの失敗が0.08秒で発生 ===========================

PySpark でのテスト

PyTest フィクスチャは、同じ名前のパラメーターを追加するだけでテスト関数に値を注入することを可能にする強力な機能です。 この機能は、ユーザーのテスト関数で使用するための spark_session フィクスチャを提供するために使用されます。例えば:

Copied!
1 2 3 4 5 def test_dataframe(spark_session): # Sparkセッションからデータフレームを作成 df = spark_session.createDataFrame([['a', 1], ['b', 2]], ['letter', 'number']) # データフレームのスキーマ(構造)が正しいことを確認 assert df.schema.names == ['letter', 'number']

CSVからテスト入力を作成する

CSVファイルはCode Repositoryに保存され、データ変換のテスト入力として使用できます。

以下のセクションでは、次のデータ変換がtransforms-python/src/myproject/datasets/で作成されていると仮定した例を示します。

find_aircraft.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 from pyspark.sql import functions as F from transforms.api import transform_df, Input, Output # 関数定義 @transform_df( Output("<output_dataset_rid>"), aircraft_df=Input("<input_dataset_rid>"), ) def compute(aircraft_df): # シート数が300より大きく、運用ステータスが "Yes" の航空機データをフィルタリング return aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes")))

次の2つのCSVファイルとその内容をフォルダーに入れてください: transforms-python/src/test/resources/:

aircraft_mock.csv

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # tail_number: 尾部番号 # serial_number: シリアル番号 # manufacture_year: 製造年 # manufacturer: 製造元 # model: モデル # number_of_seats: 座席数 # capacity_in_pounds: ポンドでのキャパシティ # operating_status: 運用状況 # aircraft_status: 航空機の状態 # acquisition_date: 取得日 # model_type: モデルタイプ tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type AAA1,20809,1990,Manufacturer_1,M1-100,1,3500,Yes,Owned,13/8/90,208 BBB1,46970,2013,Manufacturer_2,M2-300,310,108500,No,Owned,10/15/14,777 CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777 DDD1,58340,2014,Manufacturer_3,M3-200,294,100000,Yes,Leased,11/21/13,330 EEE1,58600,2013,Manufacturer_2,M2-300,300,47200,Yes,Leased,12/2/13,777

expected_filtered_aircraft.csv

Copied!
1 2 3 4 // 日本語のコメントを追加します tail_number,serial_number,manufacture_year,manufacturer,model,number_of_seats,capacity_in_pounds,operating_status,aircraft_status,acquisition_date,model_type // しっぽの番号, 製造番号, 製造年, メーカー, モデル, 座席数, ポンドでの容量, 運行状況, 航空機の状態, 取得日, モデルタイプ CCC1,44662,2013,Manufacturer_2,M2-300,310,108500,Yes,Owned,6/23/13,777

以下のテストは transforms-python/src/test/ のパスで記述することができます:

test_find_aircraft.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import os from pathlib import Path from myproject.datasets.find_aircraft import compute # テストリソースのディレクトリへのパスを設定 TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): # aircraft_mock.csvを読み込み、DataFrameを作成 aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, # スキーマを推論 header=True # ヘッダーが存在する ) # expected_filtered_aircraft.csvを読み込み、期待される結果のDataFrameを作成 expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, # スキーマを推論 header=True # ヘッダーが存在する ) # compute関数を使用して、結果DataFrameを作成 result_df = compute(aircraft_mock_df) # 結果DataFrameのカラムと期待される結果のDataFrameのカラムが一致することを確認 assert result_df.columns == expected_filtered_aircraft_df.columns # 結果DataFrameの行数と期待される結果のDataFrameの行数が一致することを確認 assert result_df.count() == expected_filtered_aircraft_df.count() # 結果DataFrameに期待される結果のDataFrameに存在しない行がないことを確認 assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 # 期待される結果のDataFrameに結果DataFrameに存在しない行がないことを確認 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0

最終的なリポジトリ構造は、次の画像のようになります。 Unit test with example inputs

テストは transforms-python/src/test/test_find_aircraft.py にあります。 入力と期待される出力の CSV リソースは transforms-python/src/test/resources にあります。

transform()デコレータで書き込まれたデータフレームをインターセプトする

変換関数が transform_df ではなく transform() でデコレートされると、変換関数は結果のデータフレームを返さず、代わりに関数に送られた Output オブジェクトのいずれかを使用して、結果をデータセットにマテリアライズします。ロジックをテストするために、マテリアライズするために送られた値をインターセプトするために、Output 引数のモックを使用する必要があります。

上記のデータ変換が transform() デコレータを使用するように変更した場合:

find_aircraft_transform_decorator.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # pyspark.sqlからfunctionsをFという名前でインポートします from pyspark.sql import functions as F # transforms.apiからtransform、Input、Outputをインポートします from transforms.api import transform, Input, Output # transformデコレータを使用して、結果の出力と航空機の入力を指定します @transform( results_output=Output("<output_dataset_rid>"), # 結果を出力するデータセットのRIDを指定します aircraft_input=Input("<input_dataset_rid>"), # 入力として使用する航空機のデータセットのRIDを指定します ) def compute(results_output, aircraft_input): # 計算を行う関数を定義します # aircraft_inputからデータフレームを取得します aircraft_df = aircraft_input.dataframe() # "number_of_seats"が300以上、かつ"operating_status"が"Yes"である行だけをフィルタリングしたデータフレームを作成します results_df = aircraft_df.filter((F.col("number_of_seats") > F.lit(300)) & (F.col("operating_status") == F.lit("Yes"))) # results_outputに結果のデータフレームを書き込みます results_output.write_dataframe(results_df)

検証のテスト中に、変換関数は現在 Input()aircraft_input 引数として期待しており、result_df の値を results_output に送信するためにはこれをインターセプトする必要があります。

MagicMock(外部)を使用して、両方のインスタンスに必要なラッパーを作成できます。

test_find_aircraft_transform_decorator.py

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import os from pathlib import Path from unittest.mock import MagicMock from myproject.datasets.find_aircraft_transform_decorator import compute from transforms.api import Input TEST_RESOURCES_DIRECTORY_PATH = Path(os.path.dirname(__file__)).joinpath('resources') def test_find_aircrafts(spark_session): # 入力データを読み込み aircraft_mock_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('aircraft_mock.csv')), inferSchema=True, header=True ) # 期待される出力データを読み込み expected_filtered_aircraft_df = spark_session.read.csv( str(TEST_RESOURCES_DIRECTORY_PATH.joinpath('expected_filtered_aircraft.csv')), inferSchema=True, header=True ) # 出力用のモックオブジェクトを作成 results_output_mock = MagicMock() # 入力用のラッパーを作成し、データセットを設定 aircraft_mock_input = Input() aircraft_mock_input.dataframe = MagicMock(return_value=aircraft_mock_df) # モック出力オブジェクトで変換を実行 compute( results_output=results_output_mock, aircraft_input=aircraft_mock_input ) # モックオブジェクトに対してwrite_dataframeが呼ばれた引数を取得し、 # 書き込まれるデータフレームを抽出 args, kwargs = results_output_mock.write_dataframe.call_args result_df = args[0] # 結果のデータフレームと期待されるデータフレームが一致することを確認 assert result_df.columns == expected_filtered_aircraft_df.columns assert result_df.count() == expected_filtered_aircraft_df.count() assert result_df.exceptAll(expected_filtered_aircraft_df).count() == 0 assert expected_filtered_aircraft_df.exceptAll(result_df).count() == 0

テスト結果の表示

設定された全てのテストの出力は Checks タブに表示され、各テストごとに別々の出力が表示されます。デフォルトでは、テスト結果は PASSED、FAILED、または SKIPPED のステータスとともに折りたたまれて表示されます。各テストを展開する(または全てのテストを展開する)と、テスト出力とStdOutおよびStdErrのログが表示されます。

checks-test

テストカバレッジ

PyTest coverage を使用して、ユーザーのリポジトリに対するカバレッジを計算し、最小パーセンテージを強制することができます。

リポジトリの meta.yml に以下を追加してください:

Copied!
1 2 3 test: requires: - pytest-cov # pytest-covは、pytestでテストを行う際に、テストカバレッジ(テストがどれだけのコードをカバーしているか)を計測するためのライブラリです。

/transforms-python/src/pytest.ini ファイルを作成し、以下の内容を記載してください:

Copied!
1 2 3 4 5 6 7 [pytest] addopts = --cov=<<パッケージ名, 例えば myproject>> --cov-report term --cov-fail-under=100 # [pytest]はpytestの設定をするセクションです。 # addoptsはpytestに追加のオプションを付けるための設定です。 # --covはカバレッジを計測するパッケージを指定します。ここでは<<パッケージ名, 例えば myproject>>を指定します。 # --cov-report termはカバレッジレポートをターミナルに出力することを指定します。 # --cov-fail-under=100はカバレッジが100%に達しない場合にテストを失敗させるようにします。

カバレッジによるチェックの失敗が必要な範囲はカスタマイズ可能です。--cov-fail-under 引数にパーセンテージを選択してください。

指定された量よりもカバレッジが低いテストの実行は、以下の出力で失敗します。

coverage-test

テストの並列化

デフォルトでは、PyTest はテストを逐次実行します。トランスフォームの build.gradle を調整して、numProcesses の値を設定することで、複数の CPU にテストを送信してテスト実行を高速化することができます。その値は使用するプロセスの数を反映させてください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 // 'com.palantir.transforms.lang.python'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python' // 'com.palantir.transforms.lang.python-defaults'プラグインを適用します apply plugin: 'com.palantir.transforms.lang.python-defaults' // テストプラグインを適用します apply plugin: 'com.palantir.transforms.lang.pytest-defaults' tasks.pytest { // プロセス数を "3" に設定します numProcesses "3" }

テストの並列化は、pytest-xdist テストプラグインを使用して実行されます。

テストを並列化すると、保留中のテストが利用可能なワーカーに送信されますが、順序は保証されません。他の先行テストによる変更を予期しているグローバル/共有状態を必要とするテストは、それに応じて調整されるべきです。

ヒント

  1. これらのテストを有効にした後、コミットすると CI ログで :transforms-python:pytest タスクが実行されることが確認できるはずです。
  2. テストは、ファイルと関数名の両方の先頭にある test_ に基づいて検出されます。これは PyTest の標準規則です。
  3. サンプルレコードを素早く取得する方法として、Code Workbook コンソール でデータセットを開き、.collect() を呼び出すことができます。
  4. Python 形式のスキーマを取得するには、データセットプレビューを開き、 タブを開いて コピー をクリックし、PySpark スキーマをコピー を選択してください。