データ接続と統合概要データの期待値はじめに

注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

はじめに

このガイドでは、Pythonトランスフォームリポジトリでデータ期待値を設定する方法を説明します。データ期待値の概要については、このページを参照してください。

リポジトリの設定

コードリポジトリの左側にあるライブラリ検索パネルを開きます。transforms-expectationsを検索し、ライブラリタブ内の「ライブラリを追加」をクリックします。

その後、ユーザーのコードリポジトリはすべての依存関係を解決し、再度チェックを実行します。これには数分かかる場合がありますが、その後すぐにライブラリをトランスフォームで使用できるようになります。

トランスフォームの設定

expectationsCheck をユーザーのトランスフォームファイルにインポートします:

Copied!
1 2 3 4 5 # 日本語のコメントを追加します from transforms.api import transform_df, Input, Output, Check # 変換関数をインポート from transforms import expectations as E # 期待値関数をインポート

一部の共通スキーマと行の期待値については、typesもインポートしたい場合があります:

Copied!
1 2 # pyspark.sqlからtypesをTという名前でインポート from pyspark.sql import types as T

チェックの作成

単一のチェックの基本構造:

Copied!
1 2 3 # 'expectation'が満たされているかをチェックします。名前が一意であることを確認します。 # エラーが発生した場合の動作は'WARN'または'FAIL'です。 Check(expectation, '名前の一意性をチェック', on_error='警告/失敗')
  • 期待値 - 個々の期待値は、複数のサブ期待値から成る複合期待値(例:any/all演算子を使用する)も可能です
  • ユニークな名前のチェック - これはトランスフォーム内でユニークでなければならず(同じ名前を出力と入力で共有することはできません)、チェックをアプリ全体で識別します(例:Data Health、Buildsアプリケーション)
  • on_error - 期待値が満たされない場合のジョブの動作を定義します:
    • FAIL(デフォルト) - チェックが失敗するとジョブは中止されます
    • WARN - ジョブは続行し、警告が生成されData Healthによって処理されます

データセットにチェックを割り当てる

各チェックは、単一の入力または出力に渡す必要があります。単一のチェックを checks=check1 として渡すか、配列で複数のチェックを渡します: checks=[check1, check2, ...]

複数のチェック

複数のチェックを使用して、より読みやすい期待値の構造を作成し、各意味のあるチェックの動作を個別に制御します。

出力の単純な主キーチェックの例:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # transforms.apiから必要なモジュールをインポートします from transforms.api import transform_df, Input, Output, Check # transformsからexpectationsモジュールをEとしてインポートします from transforms import expectations as E # transform_dfデコレータを使用してデータ変換関数を定義します @transform_df( # 出力データセットのパスと、チェック条件を設定します Output( "/Users/data/dataset", # 出力データセットのパス # チェック条件:'id'がプライマリキーであること # エラーが発生した場合は'FAIL'とします checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), # 入力データセットのパスを設定します input=Input("Users/data/input") ) # データ変換関数を定義します def my_compute_function(input): return input # このサンプルでは、入力をそのまま出力として返しています

複雑なチェック

複合的な期待値を使用して、より複雑なチェックを追加することもできます。例えば、行ageがタイプlongであり、指定された範囲内であることをチェックしましょう。複合期待値を定義し、それをトランスフォーム内の複数のチェックで使用し、エラー時の動作を異ならせることに注意してください。

チェックは、複合期待値で構成されている場合でも、全体として監視されます。複合期待値の特定の部分を監視(つまり、ウォッチして通知を受け取る)したい場合は、それをいくつかの異なるチェックに分割することをお勧めします。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from transforms.api import transform_df, Input, Output, Check from transforms import expectations as E from pyspark.sql import types as T # 年齢は0から200の間であると有効とみなされます。 # このコメントを日本語に翻訳しています。 expect_valid_age = E.all( E.col('age').has_type(T.LongType()), E.col('age').gte(0), E.col('age').lt(200) ) @transform_df( Output( "/Users/data/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def my_compute_function(input): return input