注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このガイドでは、Pythonトランスフォームリポジトリでデータ期待値を設定する方法を説明します。データ期待値の概要については、このページを参照してください。
コードリポジトリの左側にあるライブラリ検索パネルを開きます。transforms-expectations
を検索し、ライブラリタブ内の「ライブラリを追加」をクリックします。
その後、ユーザーのコードリポジトリはすべての依存関係を解決し、再度チェックを実行します。これには数分かかる場合がありますが、その後すぐにライブラリをトランスフォームで使用できるようになります。
expectations
と Check
をユーザーのトランスフォームファイルにインポートします:
Copied!1 2 3 4 5
# 日本語のコメントを追加します from transforms.api import transform_df, Input, Output, Check # 変換関数をインポート from transforms import expectations as E # 期待値関数をインポート
一部の共通スキーマと行の期待値については、types
もインポートしたい場合があります:
Copied!1 2
# pyspark.sqlからtypesをTという名前でインポート from pyspark.sql import types as T
単一のチェックの基本構造:
Copied!1 2 3
# 'expectation'が満たされているかをチェックします。名前が一意であることを確認します。 # エラーが発生した場合の動作は'WARN'または'FAIL'です。 Check(expectation, '名前の一意性をチェック', on_error='警告/失敗')
FAIL
(デフォルト) - チェックが失敗するとジョブは中止されますWARN
- ジョブは続行し、警告が生成されData Healthによって処理されますデータセットにチェックを割り当てる
各チェックは、単一の入力または出力に渡す必要があります。単一のチェックを checks=check1
として渡すか、配列で複数のチェックを渡します: checks=[check1, check2, ...]
複数のチェックを使用して、より読みやすい期待値の構造を作成し、各意味のあるチェックの動作を個別に制御します。
出力の単純な主キーチェックの例:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# transforms.apiから必要なモジュールをインポートします from transforms.api import transform_df, Input, Output, Check # transformsからexpectationsモジュールをEとしてインポートします from transforms import expectations as E # transform_dfデコレータを使用してデータ変換関数を定義します @transform_df( # 出力データセットのパスと、チェック条件を設定します Output( "/Users/data/dataset", # 出力データセットのパス # チェック条件:'id'がプライマリキーであること # エラーが発生した場合は'FAIL'とします checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), # 入力データセットのパスを設定します input=Input("Users/data/input") ) # データ変換関数を定義します def my_compute_function(input): return input # このサンプルでは、入力をそのまま出力として返しています
複合的な期待値を使用して、より複雑なチェックを追加することもできます。例えば、行age
がタイプlong
であり、指定された範囲内であることをチェックしましょう。複合期待値を定義し、それをトランスフォーム内の複数のチェックで使用し、エラー時の動作を異ならせることに注意してください。
チェックは、複合期待値で構成されている場合でも、全体として監視されます。複合期待値の特定の部分を監視(つまり、ウォッチして通知を受け取る)したい場合は、それをいくつかの異なるチェックに分割することをお勧めします。
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E from pyspark.sql import types as T # 年齢は0から200の間であると有効とみなされます。 # このコメントを日本語に翻訳しています。 expect_valid_age = E.all( E.col('age').has_type(T.LongType()), E.col('age').gte(0), E.col('age').lt(200) ) @transform_df( Output( "/Users/data/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def my_compute_function(input): return input