注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Transforms クラス

クラス説明
CheckExpectation をラップして、Data Health に登録できるようにします。
FileStatusFoundryFS ファイルの詳細をキャプチャする collections.namedtuple です。
FileSystem(foundry_fs[, read_only])データセットファイルの読み書き用のファイルシステムオブジェクト。
IncrementalTransformContext(ctx, is_incremental)インクリメンタル計算用の追加機能を持つ TransformContext。
IncrementalTransformInput(tinput[, prev_txrid])インクリメンタル計算用の追加機能を持つ TransformInput。
IncrementalTransformOutput(toutput[, …])インクリメンタル計算用の追加機能を持つ TransformOutput。
Input(alias)トランスフォーム入力の仕様。
Output(alias[, sever_permissions])トランスフォーム出力の仕様。
Pipeline()Transform オブジェクトのコレクションをグループ化するオブジェクト。
Transform(compute_func[, inputs, outputs, ...])計算の単一ステップを説明する呼び出し可能オブジェクト。
TransformContext(foundry_connector[, parameters])トランスフォームの計算関数にオプションで注入できるコンテキストオブジェクト。
TransformInput(rid, branch, txrange, …)実行時に Transform オブジェクトに渡される入力オブジェクト。
TransformOutput(rid, branch, txrid, …)実行時に Transform オブジェクトに渡される出力オブジェクト。

Check

class transforms.api.Check

Expectation をラップして、Data Health に登録できるようにします。

  • expectation
    • Expectation – 評価する期待値。
  • name
    • str – チェックの名前。時間の経過とともに安定した識別子として使用されます。
  • is_incremental
    • bool – トランスフォームがインクリメンタルに実行されている場合。
  • on_error
    • (str, Optional) – 期待値が満たされない場合のアクション。現在は 'WARN', 'FAIL'。
  • description
    • (str, Optional) – チェックの説明。

FileStatus

class transforms.api.FileStatus

FoundryFS ファイルの詳細をキャプチャする collections.namedtuple です。

FileStatus(path, size, modified) の新しいインスタンスを作成

  • count(value) → integer -- 値の出現回数を返す
  • index(value[, start[, stop]]) → integer -- 値の最初のインデックスを返す
    • 値が存在しない場合は ValueError を発生させます
  • modified
    • フィールド番号 2 のエイリアス
  • path
    • フィールド番号 0 のエイリアス
  • size
    • フィールド番号 1 のエイリアス

FileSystem

class transforms.api.FileSystem(foundry_fs, read_only=False)

データセットファイルの読み書き用のファイルシステムオブジェクト。

  • files(glob=None, regex='.*', show_hidden=False, packing_heuristic=None)
    • このデータセット内でアクセス可能なパスを含む DataFrame を作成します。
    • DataFrame は、ファイルサイズでパーティション化され、各パーティションは spark.files.maxPartitionBytes バイト以下のファイルパスの合計サイズを持ちます。また、spark.files.maxPartitionBytes より大きい単一のファイルによってパーティション化されます。ファイルのサイズは、ディスク上のファイルサイズに spark.files.openCostInBytes を加えたものとして計算されます。
    • パラメーター
      • glob (str, optional) – Unix のファイルマッチングパターン。globstar をサポートしています。ファイル(例:pdf)を再帰的に検索するには、**/*.pdf を使用してください。
      • regex (str, optional) – ファイル名と照合する正規表現パターン。
      • show_hidden (bool, optional) – . または _ で始まる隠しファイルを含めます。
      • packing_heuristic (str, optional) – Spark パーティションにファイルをビンパッキングするためのヒューリスティックを指定します。選択肢は ffd(First Fit Decreasing)または wfd(Worst Fit Decreasing)です。wfd は分布が均等でない傾向がありますが、速度がはるかに速いため、ファイル数が非常に多いデータセットには wfd が推奨されます。ヒューリスティックが指定されていない場合、自動的に選択されます。
    • 戻り値
      • (path, size, modified) の DataFrame
    • 戻り値の型
      • pyspark.sql.DataFrame
  • ls(glob=None, regex='.*', show_hidden=False)
    • すべてのディレクトリを再帰的に検索し、指定されたパターンに一致するすべてのファイルを、データセットのルートディレクトリからリストします。
    • パラメーター
      • glob (str, optional) – Unix のファイルマッチングパターン。globstar をサポートしています。ファイル(例:pdf)を再帰的に検索するには、**/*.pdf を使用してください。
      • regex (str, optional) – ファイル名と照合する正規表現パターン。
      • show_hidden (bool, optional) – . または _ で始まる隠しファイルを含めます。
    • 生成
      • FileStatus - 論理パス、ファイルサイズ(バイト)、変更タイムスタンプ(1970 年 1 月 1 日 UTC からの ms)。
  • open(_path, mode='r', kwargs)
    • 指定されたモードで FoundryFS ファイルを開きます。kwargs はキーワード引数です。
    • パラメーター
      • path (str – データセット内のファイルの論理パス。
      • kwargsio.open() に渡される残りのキーワード引数
      • show_hidden (bool, optional) – . または _ で始まる隠しファイルを含めます。
    • 戻り値
      • ストリームに接続された Python のファイルライクオブジェクト。
    • 戻り値の型
      • File

IncrementalTransformContext

class transforms.api.IncrementalTransformContext(ctx, is_incremental)

インクリメンタル計算用の追加機能を持つ TransformContext です。

  • auth_header
    • str – トランスフォームを実行するために使用される認証ヘッダー。
  • fallback_branches
    • List[str] – トランスフォームの実行時に設定されたフォールバックブランチ。
  • is_incremental
    • bool – トランスフォームがインクリメンタルに実行されている場合。
  • parameters
    • dict of (str, any) – トランスフォームパラメーター。
  • spark_session
    • pyspark.sql.SparkSession – トランスフォームの実行に使用される Spark セッション。

IncrementalTransformInput

クラス transforms.api.IncrementalTransformInput(tinput, prev_txrid=None)

増分計算のために追加機能を持つTransformInput

  • dataframe(mode='added')
    • 指定された読み込みモードのためのpyspark.sql.DataFrameを返します。
    • currentpreviousadded モードのみがサポートされています。
    • パラメーター
      • mode (str, optional) – 読み込みモードで、currentpreviousaddedmodified、_removed_のいずれかです。デフォルトは added
    • 戻り値
      • データセットのデータフレーム。
    • 戻り値の型
  • filesystem(mode='added')
    • 指定された読み込みモードのために FoundryFS から読み込むための_FileSystem_オブジェクトを構築します。
    • currentpreviousadded モードのみがサポートされています。
    • パラメーター
      • mode (str, optional) – 読み込みモードで、currentpreviousaddedmodified、_removed_のいずれかです。デフォルトは added
    • 戻り値
      • 指定されたビューのファイルシステムオブジェクト。
    • 戻り値の型
  • pandas()
    • pandas.DataFrame: 入力データセットの全体ビューを含むPandasデータフレーム。
  • branch
    • str – 入力データセットのブランチ。
  • path
    • str – 入力データセットのプロジェクトパス。
  • rid
    • str – データセットのリソース識別子。

IncrementalTransformOutput

クラス transforms.api.IncrementalTransformOutput(toutput, prev_txrid=None, mode='replace')

増分計算のために追加機能を持つTransformOutput

  • abort()
    • トランザクションを中止し、データを書き込まずにジョブを正常に完了させます。詳細は Python Abort を参照してください。
  • dataframe(mode='current', schema=None)
    • 指定された読み込みモードのための pyspark.sql.DataFrameを返します。
    • パラメーター
      • mode (str, optional) – 読み込みモードで、currentpreviousaddedmodified、_removed_のいずれかです。デフォルトは current
      • schema (pyspark.types.StructType, optional) - 空のデータフレームを構築するときに使用するPySparkスキーマ。読み込みモード「previous」を使用する場合に必要です。
    • 戻り値
      • データセットのデータフレーム。
    • 戻り値の型
    • 例外
      • ValueError - モード「previous」を使用しているときにスキーマが渡されない場合
  • filesystem(mode='current')
    • FoundryFS への書き込みのための_FileSystem_オブジェクトを構築します。
    • パラメーター
      • mode (str, optional) – 読み込みモードで、追加、現行、前のいずれかです。デフォルトは現行です。現在のファイルシステムのみが書き込み可能です。
    • 例外
  • pandas(mode='current')
    • pandas.DataFrame: 指定された読み込みモードのためのPandasデータフレーム。
  • set_mode(mode)
    • データセットの書き込みモードを変更します。
    • パラメーター
      • mode (str) – 書き込みモードで、'replace'または'modify'のいずれかです。修正モードでは、出力に書き込まれたものがデータセットに追加されます。置換モードでは、出力に書き込まれたものがデータセットを置換します。

データが書き込まれた後は書き込みモードを変更することはできません。

  • write_dataframe(df, partition_cols=None, bucket_cols=None, bucket_count=None, sort_by=None, output_format=None, options=None)
    • 指定された DataFrame を出力データセットに書き込みます。
    • パラメーター
      • df (_pyspark.sql.DataFrame) – 書き込むPySparkデータフレーム。
      • partition_cols (List[str], optional) - データを書き込むときに使用する列のパーティション化。
      • bucket_cols (List[str], optional) – データをバケットに分ける列。bucket_countが指定されている場合は必須です。
      • bucket_count (int, optional) – バケットの数。bucket_colsが指定されている場合は必須です。
      • sort_by (List[str], optional) – バケット化されたデータをソートする列。
      • output_format (str, optional) – 出力ファイルの形式、デフォルトは'parquet'。
      • options (dict, optional) – org.apache.spark.sql.DataFrameWriter#option(String, String)に渡す追加オプション。
  • write_pandas(pandas_df)
    • 指定された pandas.DataFrame を出力データセットに書き込みます。
    • パラメーター
  • branch
    • str – データセットのブランチ。
  • path
    • str – データセットのプロジェクトパス。
  • rid
    • str – データセットのリソース識別子。

Input

クラス transforms.api.Input(alias, branch, stop_propagating, stop_requiring, checks)

トランスフォーム入力の仕様。

  • パラメーター
    • alias (str, optional) – データセットのridまたはデータセットの絶対プロジェクトパス。指定しない場合、パラメーターは未結合です。
    • branch (str, optional): 入力データセットを解決するブランチ名。指定しない場合、ビルド時に解決されます。
    • stop_propagating (マーキング, optional): この入力から伝播を停止するセキュリティマーキング。マーキング継承されたマーキングを削除するのドキュメンテーションを参照してください。
    • stop_requiring (OrgMarkings, optional): この入力で必要とするOrgマーキング。
    • checks (List[Check], Check, optional): 1つまたは複数の:class:Checkオブジェクト。
    • failure_strategy (str, optional): 入力が更新に失敗した場合の戦略。continueまたはfailのいずれかでなければなりません。指定しない場合、デフォルトはfailです。

Output

クラス transforms.api.Output(alias=None, sever_permissions=False, checks=None)

トランスフォーム出力の仕様。

  • パラメーター
    • alias (str, optional) - データセットのridまたはデータセットの絶対プロジェクトパス。指定しない場合、パラメーターは未結合です。
    • sever_permissions (bool, optional) - trueの場合、データセットの許可をその入力の許可から切断します。パラメーターが未結合の場合は無視されます
    • checks (List[Check], Check, optional) - 1つまたは複数の :class:Checkオブジェクト。

Pipeline

クラス transforms.api.Pipeline

Transformオブジェクトの集合をグループ化するためのオブジェクト。

  • add_transforms(*transforms)
    • 指定されたTransformオブジェクトを Pipeline インスタンスに登録します。
    • パラメーター
      • transforms (Transform) – 登録する変換。
    • 例外
      • ValueError – 複数の Transform オブジェクトが同じ Outputエイリアスに書き込む場合。
  • discover_transforms(*modules)
    • モジュールレベルの変換をすべて登録し、モジュールを再帰的に検索してインポートします。
    • このメソッドは、指定されたモジュールの ___path___から始まり、見つけた各モジュールをインポートし、Transformのインスタンス(transformsデコレータによって構築された)である任意の属性をパイプラインに登録します。
    • パラメーター
      • modules (module) – 検索を開始するモジュール。
Copied!
1 2 3 4 5 6 7 8 # myprojectというモジュールをインポートする >>> import myproject # Pipelineクラスのインスタンスを作成する >>> p = Pipeline() # myproject内で定義されたデータ変換を探し出す >>> p.discover_transforms(myproject)

各モジュールが見つかるとインポートされます。モジュールレベルでのコードの実行は避けてください。

  • transforms
    • List[Transform] – パイプラインに登録されている変換のリスト。

Transform

class transforms.api.Transform(compute_func, inputs=None, outputs=None, profile=None)

計算の1ステップを記述する呼び出し可能なオブジェクト。

Transformは、いくつかの Input 仕様、いくつかの Output 仕様、および1つの計算関数で構成されています。

慣用的には、提供されているデコレーターを使ってTransformオブジェクトを構築します:transform()transform_df() 、および transform_pandas()

注:元の計算関数は、Transformの __call__ メソッドを介して公開されます。

  • パラメーター

    • compute_func (Callable) – ラップする計算関数。
    • inputs (Dict[str, Input]) - 入力名を Input 仕様にマッピングする辞書。
    • outputs (Dict[str, Output]) - 入力名を Output 仕様にマッピングする辞書。
    • profile (str, optional) – ランタイムで使用するTransformプロファイルの名前。
  • compute(ctx=None, _kwargs_)**

    • コンテキストと一連の入力と出力で変換を計算します。
    • パラメーター
      • ctx (TransformContext, optional) – 変換に要求された場合に変換に渡されるコンテキストオブジェクト。
      • kwargs (TransformInput or TransformOutput) - 入力名を Input 仕様にマッピングする辞書。kwarg はキーワード引数の省略形です。
      • outputs (Dict[str, Output]) - 計算関数に名前で渡す入力、出力、およびコンテキストオブジェクト。
    • 戻り値
      • 変換の実行後の出力オブジェクト。
    • 戻り値のタイプ
  • version

    • str – 変換の2つのバージョンをロジックの古さを考慮して比較するために使用される文字列。
    • 例えば、SQL変換はSQLクエリのハッシュを取得することができます。理想的には、SQLクエリは同等の意味を持つ変換に対して同じバージョンを生成する形式に変換されます。つまり、SQLクエリ select A, B from foo; は、SQL query select A, B from (select * from foo); と同じバージョンであるべきです。
    • バージョンが指定されていない場合、リポジトリのバージョンが使用されます。
    • Raises
      • ValueError – 計算関数のオブジェクトハッシュの計算に失敗した場合

TransformContext

class transforms.api.TransformContext(foundry_connector, parameters=None) コンテキストオブジェクトは、変換の計算関数にオプションで注入できます。

  • auth_header
    • str – 変換を実行するために使用される認証ヘッダー。この認証ヘッダーはスコープが限定されており、ジョブを実行するために必要な権限しか持っていません。API呼び出しには使用しないでください。
  • fallback_branches
    • List[str] – 変換の実行時に設定されているフォールバックブランチ。
  • parameters
    • dict of (str, any) – 変換パラメーター。
  • spark_session
    • pyspark.sql.SparkSession – 変換の実行に使用されるSparkセッション。

TransformInput

class transforms.api.TransformInput(rid, branch, txrange, dfreader, fsbuilder)

ランタイムでTransformオブジェクトに渡される入力オブジェクト。

  • dataframe()
  • filesystem()
    • FoundryFS から読み取るための FileSystem オブジェクトを構築します。
    • 戻り値
      • Foundryから読み取るための FileSystem オブジェクト。
    • 戻り値のタイプ
  • pandas()
    • pandas.DataFrame:入力データセットの完全なビューを含むPandasデータフレーム。
  • branch
    • str – 入力データセットのブランチ。
  • path
    • str – 入力データセットのプロジェクトパス。
  • rid
    • str – データセットのリソース識別子。
  • column_descriptions
    • Dict<str, str> – データセットの列の説明。
  • column_typeclasses
    • Dict<str, str> – データセットの列のタイプクラス。

TransformOutput

class transforms.api.TransformOutput(rid, branch, txrid, dfreader, dfwriter, fsbuilder)

ランタイムでTransformオブジェクトに渡される出力オブジェクト。

  • abort()
    • トランザクションを中止し、データを書き込まずにジョブを正常に完了させることができます。詳細については Python Abort を参照してください。
  • dataframe()
  • filesystem()
    • FoundryFS に書き込むための FileSystem オブジェクトを構築します。
    • 戻り値
      • Foundry に書き込むための FileSystem オブジェクト。
  • 戻り値のタイプ
  • pandas()
    • pandas.DataFrame:出力データセットの完全なビューを含むPandasデータフレーム。
  • set_mode(mode)
    • データセットの書き込みモードを変更します。
    • パラメーター
      • mode (str) – 書き込みモードで、'replace' または 'modify' のいずれか。modifyモードでは、出力に書き込まれたものがデータセットに追加されます。replaceモードでは、出力に書き込まれたものがデータセットと置き換えられます。
  • write_dataframe(df, partition_cols=None, bucket_cols=None, bucket_count=None, sort_by=None, output_format=None, options=None, column_descriptions=None, column_typeclasses=None)
    • 与えられた DataFrame を出力データセットに書き込みます。
    • パラメーター
      • df (pyspark.sql.DataFrame) – 書き込むPySparkデータフレーム。
      • partition_cols (List[str], optional) - データの書き込み時に使用する列パーティショニング。
      • bucket_cols (List[str], optional) - データをバケット化する列。bucket_count が指定されている場合は必須です。
      • bucket_count (int, optional) – バケット数。bucket_cols が指定されている場合は必須です。
      • sort_by (List[str], optional) - バケット化されたデータをソートする列。
      • output_format (str, optional) - 出力ファイル形式で、デフォルトは 'parquet' です。ファイル形式は、Spark の DataFrameWriter に基づいており、他のタイプには 'csv'、'json'、'orc'、'text' が含まれます。
      • options (dict, optional) - org.apache.spark.sql.DataFrameWriter#option(String, String) に渡す追加のオプション。
      • column_descriptions (Dict[str, str], optional) - 列名とその文字列説明のマップ。このマップは DataFrameの列と交差し、説明(最大800文字)が含まれている必要があります。
      • column_typeclasses (Dict[str, List[Dict[str, str]], optional) - 列名とその列のタイプクラスのマップ。リスト内の各タイプクラスは、_Dict[str, str]_で、有効なキーは "name" と "kind" のみです。これらのキーは、ユーザーが望む対応する文字列にマッピングされます。
  • write_pandas(pandas_df)
    • 与えられた pandas.DataFrame を出力データセットに書き込みます。
    • パラメーター
  • branch
    • str – データセットのブランチ。
  • path
    • str – データセットのプロジェクトパス。
  • rid
    • str – データセットのリソース識別子。