注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Copied!
1# transforms.external.systemsからexternal_systems, Sourceをインポートします 2from transforms.external.systems import external_systems, Source

ユーザーは external_systems デコレータを使用して、トランスフォームに含めるべきソースを指定する必要があります:

Copied!
1# 外部システムとの連携を行うためのデコレータ 2@external_systems( 3 # pokeSourceは、外部システムとの通信に使用する情報源を指定します 4 # ここでは、"ri.magritte..source.e301d738-b532-431a-8bda-fa211228bba6"という情報源を指定しています 5 pokeSource=Source("ri.magritte..source.e301d738-b532-431a-8bda-fa211228bba6") 6)
Copied!
1# pokeSourceからHTTPS接続を取得し、そのURLをpokeUrlに設定します。 2pokeUrl = pokeSource.get_https_connection().url
Copied!
1pokeUrl = pokeSource.get_https_connection().url 2pokeClient = pokeSource.get_https_connection().get_client() 3 4# pokeClientはPythonの `requests` ライブラリから事前設定されたSessionオブジェクトです。 5# GETリクエストの例: 6response = pokeClient.get(pokeUrl + "/api/v2/pokemon/" + name, timeout=10)

エージェントプロキシを使用してオンプレミスシステムに接続する場合、必要なエージェントプロキシの設定が自動的に行われるため、ビルトインクライアントを 必ず 使用しなければなりません。

エージェントプロキシはベータ機能であり、ユーザーのエンロールメントによっては利用できない場合があります。

例:PokeAPIからデータをインポートする

以下の例は、APIが返すすべてのポケモンを一度に100ずつ取得し、すべてのポケモンの名前をデータセットに出力する完全なトランスフォームを示しています。

Copied!
1from transforms.api import transform_df, Output 2from transforms.external.systems import external_systems, Source 3from pyspark.sql import Row 4import json 5import logging 6 7logger = logging.getLogger(__name__) 8 9# 外部システムとの連携を設定 10@external_systems( 11 # リポジトリにインポートされたソースを指定 12 pokeSource=Source("ri.magritte..source.e301d738-b532-431a-8bda-fa211228bba6") 13) 14@transform_df( 15 # この変換では入力を使用せず、出力データセットのみを指定 16 Output("/path/to/output/dataset") 17) 18def compute(pokeSource, ctx): 19 poke = pokeSource.get_https_connection().get_client() 20 pokeUrl = pokeSource.get_https_connection().url 21 22 data = [] 23 24 startUrl = pokeUrl + "/api/v2/pokemon?limit=100&offset=0" 25 while startUrl is not None: 26 # これ以上ページがないまでループする 27 logger.info("PokeAPIからデータを取得しました:" + startUrl) 28 29 # PokeAPIソースの組み込みHTTPSクライアントを使用して、ページごとに最大100のポケモンを取得 30 response = poke.get(startUrl) 31 32 responseJson = json.loads(response.text) 33 for pokemon in responseJson["results"]: 34 data.append(Row(name=pokemon["name"])) 35 startUrl = responseJson["next"] 36 37 # 外部システムから取得し、解析したデータを出力データセットに書き込む 38 return ctx.spark_session.createDataFrame(data)

外部トランスフォームで Foundry 入力を使用する

外部トランスフォームでは、Foundry の入力データを使用することがよくあります。たとえば、表形式のデータセットの各行に対して追加のメタデータを収集するために API をクエリする可能性があります。または、Foundry のデータを外部のソフトウェアシステムにエクスポートする必要があるワークフローがあるかもしれません。

このようなケースは、安全な Foundry データを未知のセキュリティ保証および切断されたデータフローを持つ別のシステムにエクスポートする可能性があるため、輸出規制 ワークフローと見なされます。ソース接続を設定するとき、ソース所有者は Foundry からのデータがエクスポートできるかどうかを指定し、セキュリティマーキングと組織のセットを提供する必要があります。Foundry は、開発者がセキュリティ意図を明確にコード化できるように、そして情報セキュリティオフィサーが外部システムとやり取りするワークフローの範囲と意図を監査できるように、ガバナンス制御を提供します。

ソースの輸出規制を設定する

輸出は セキュリティマーキング を使用して制御されます。ソースを設定するとき、輸出設定は外部システムにエクスポートするのに安全なセキュリティマーキングと組織を指定するために使用されます。これは、データ接続アプリケーションでソースに移動し、次に 接続設定 > エクスポート設定 タブに移動することで行われます。次に、このソースへのエクスポートを有効にする オプションをオンにし、エクスポートする可能性があるマーキングと組織のセットを選択する必要があります。

これを行うには、エクスポートは Foundry 内のデータのマーキングを削除することと同等と見なされるため、関連するデータと組織のマーキングを削除する権限が必要です。

以下の操作を許可するために、このソースへのエクスポートを有効にする の設定をオンにする必要があります:

  • このソースをインポートする Python トランスフォームコードへの入力としてデータセット、メディアセット、ストリームを使用する。
  • Python トランスフォームでこのソースに登録された仮想テーブルを使用する。

以下は、Palantir 組織からのデータを、追加のセキュリティマーキングなしで PokeAPI にエクスポートできるようにする PokeAPI ソースのエクスポート設定の例です:

PokeAPI ソースのエクスポート設定を表示するデータ接続設定。このソースへのエクスポートを有効にするがオンになっています

実際にはこのシステムにデータをエクスポートしない場合でも、同じ計算ジョブに Foundry データ入力を許可し、このシステムへの開放接続を意味するため、このソースへのエクスポートを有効にする はオンにしなければなりません。これは、データがエクスポートされる可能性があることを意味します。

例:PokeAPI からのデータと並行して Foundry インポートを使用する

この例では、ポケモンの名前の入力データセットから始め、PokeAPI を使用して、各ポケモンの高さと重さを含む豊かなデータセットを出力します。また、レスポンスのステータスコードに基づく基本的なエラーハンドリングも示されています。

Copied!
1from transforms.api import transform_df, Output, Input 2from transforms.external.systems import external_systems, Source 3from pyspark.sql import Row 4import json 5import logging 6 7logger = logging.getLogger(__name__) 8 9 10@external_systems( 11 # 外部システムからの情報源 12 pokeSource=Source("ri.magritte..source.e301d738-b532-431a-8bda-fa211228bba6") 13) 14@transform_df( 15 # PokeAPIから取得した豊富なポケモンデータの出力データセット 16 Output("/path/to/output/dataset"), 17 18 # ポケモン名の入力データセット 19 pokemonList=Input("/path/to/input/dataset") 20) 21def compute(pokeSource, pokemonList, ctx): 22 poke = pokeSource.get_https_connection().get_client() 23 pokeUrl = pokeSource.get_https_connection().url 24 25 def make_request_and_enrich(row: Row): 26 name = row["name"] 27 response = poke.get(pokeUrl + "/api/v2/pokemon/" + name, timeout=10) 28 29 if response.status_code == 200: 30 data = json.loads(response.text) 31 height = data["height"] 32 weight = data["weight"] 33 else: 34 # ステータスコードが200以外の場合、リクエストが失敗したと警告をログに出力します。 35 logger.warn(f"{name}のリクエストがステータスコード{response.status_code}で失敗しました。") 36 height = None 37 weight = None 38 39 return Row(name=name, height=height, weight=weight) 40 41 # ポケモンリストの各行に対してリクエストを作成し、データを補完します。 42 return pokemonList.rdd.map(make_request_and_enrich).toDF()