注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

外部トランスフォーム

現時点では、外部トランスフォームはPythonトランスフォームを通じてのみサポートされています。

ユーザーはコードリポジトリのPythonトランスフォームを使用して、コード内で直接外部システムに接続することができます。例えば、外部システムとの対話に複雑なロジックが必要な場合や、第三者が提供するソフトウェア開発キット(SDK)を使用したい場合などに便利です。我々は、REST APIを使用してスケジュールされた同期やエクスポートを行うために、外部トランスフォームを使用することをお勧めします。

続ける前に、必ずPythonトランスフォームコードリポジトリ を作成し、以下の指示に従って設定してください。

コードリポジトリのプレビューヘルパーを使用して、外部システムに接続するトランスフォームをプレビューすることはできません。

コードリポジトリに外部接続の設定を許可する

このセクションでは、外部トランスフォームを使用するための設定をカバーしています。

外部トランスフォームは、SECURE モードで動作するリポジトリでのみ使用できます。他のセキュリティモードは、Palantirの代表者と話すことでのみ利用可能です。

外部システムとの対話を有効にする

ユーザーが作成したコードリポジトリ内で、Information Security Officer の役割を持つFoundryユーザーは、リポジトリSettingsタブに移動し、次にRepository > External systemsに移動して、Allow access to external systems from this repositoryのオプションをオンにします。

Information Security OfficerはFoundryのデフォルトの役割であり、ユーザーはControl PanelEnrollment permissionsの下でInformation Security Officerの役割を付与することができます。

Enable external system interaction

データセット入力を有効にする

Information Security Officer の役割を持つFoundryユーザーは、外部トランスフォームでの入力の使用をオプションで許可することができます。

データセットが外部システムと通信可能なトランスフォームの入力として使用される場合、そのデータセットの任意のデータがFoundryを離れる可能性があります。Information Security Officer の役割を持つFoundryユーザーは、このリポジトリで外部トランスフォームの入力として許可されるべき任意のデータのセキュリティマーキングと組織マーキングのセットを選択する必要があります。

例えば、ユーザーはPalantir組織でSensitiveマーキングのあるデータセットを持っているかもしれません。このデータセットを外部トランスフォームで使用するには、情報セキュリティオフィサーはコードリポジトリ設定のステップ3、Configure use of Foundry inputs with external systemsで、SensitiveマーキングとPalantir組織の両方を追加する必要があります。

Enable external system interaction with inputs

transforms-external-systemsライブラリを追加する

コードリポジトリで外部システムとの対話が有効になったら、ユーザーはコードインターフェースの左側にあるLibraries sidebar tabからtransforms-external-systemsライブラリを追加する必要があります。

Add library in the Libraries tab to left side panel.

送信と資格情報の設定

ライブラリを追加した後、ユーザーはコードインターフェースの左側に新しいEgress and Credentialsタブにアクセスできます。

このタブでは、ユーザーのコードから呼び出される任意のエンドポイントに必要なネットワーク送信ポリシーを設定し、リポジトリが使用する資格情報(必要な場合)を追加できます。すべてのエンドポイントが資格情報を必要とするわけではないことに注意してください。

The Egress and Credentials tab in Code Editor

ネットワーク送信ポリシーと資格情報の使用

送信ポリシーを追加する

Foundryは、ユーザーが作成したコードから開始されたすべての接続を、外部の宛先に到達する際にネットワーク送信ポリシーにバインドする必要があります。ネットワーク送信ポリシーは、ユーザーが作成したコードから開始された許可された接続がFoundryの外部の宛先に到達することを可能にします。

ユーザーのエンロールメントで自己サービスの送信が有効になっている場合は、Control Panelで送信ポリシーを作成および管理できます。接続が新しいネットワークルートの追加を必要とする場合は、送信ポリシーの詳細を持って情報セキュリティオフィサーまたはPalantirの代表者に連絡してください。

Foundryのインスタンスに自己サービスの送信が有効になっていない場合は、エンドポイントを内部的に有効にし、ポリシーを設定するためにPalantirの代表者に連絡してください。

外部トランスフォーム(グローバルまたは非グローバル)の送信ポリシーを使用する前に、ユーザーはコードリポジトリのEgress and CredentialsパネルにあるEgressタブからそれらをインポートする必要があります。Egressタブは、ユーザーが権限を持つすべてのポリシーを公開し、それらを3つのカテゴリに分類します:

  • Imported: 現在のコードリポジトリのトランスフォームから使用する準備ができたネットワーク送信ポリシー。
  • Importable: ネットワーク送信ポリシーにプロジェクト参照を追加する必要があります。
  • Access request required: ユーザーはポリシーにViewerの権限はありますが、Importerの権限はありません。

このタブ内で、希望のネットワーク送信ポリシーを検索します。ポリシーがImportableとマークされている場合は、それを選択し、Import egress policy to projectを選択します。

希望のネットワーク送信ポリシーがAccess request requiredとマークされている場合は、Information Security Officerの役割を持つFoundryユーザーが、Control PanelのNetwork egressページでユーザーにImporterの役割を付与する必要があります。Importerの役割を付与するには、情報セキュリティオフィサーはまず希望のネットワークポリシーを選択し、次にActions > Manage Sharingを選択してユーザーをポリシーのImporterとして追加する必要があります。

資格情報を追加する

ユーザーの外部システムがアクセスを得るために認証された資格情報(ユーザーネームとパスワード)を必要とする場合、ユーザーはEgress and CredentialsサイドパネルのCredentialsタブを使用して資格情報のセットを追加し、それをコードで使用することができます。

ソースエンドポイントに許可リストプロセスが存在する場合は、FoundryのIPアドレスを許可リストに登録する必要があります。必要なIPアドレスについては、Palantirの代表者に連絡してください。

証明書のパス

ユーザーの外部システムがSSL/TLS証明書を必要とする場合は、ユーザーは証明書を資格情報に保存し、その資格情報を外部トランスフォームのロジックで使用することができます。

外部トランスフォームのロジックを書く

ユーザーは、Pythonデータトランスフォームを外部システムに書く準備ができました。外部トランスフォームを書く前に、Pythonデータトランスフォームの書き方に関する基本的な指示を確認することをおすすめします。

transforms.external.systemsパッケージから、必要なuse_external_systemsデコレータとEgressPolicyCredential(必要な場合)の入力をインポートします。transforms-apiから、トランスフォームデコレータとOutputをインポートします。ユーザーの外部ソースが資格情報を必要としない場合、ユーザーはロジックにCredentialの入力を追加する必要はありません。

ユーザーはuse_external_systemsデコレータを使用して、外部トランスフォームのロジックを正常に書く必要があります。

次の例は、2つのデコレータの使用方法を示しています:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 必要なライブラリをインポートします from transforms.api import transform, Output from transforms.external.systems import use_external_systems, EgressPolicy, Credential # 外部システムを使用するためのデコレータを設定します。 # EgressPolicyとCredentialは、リソース識別子 (RID) を引数に取ります。 @use_external_systems( egress=EgressPolicy('<policy RID>'), # データの出口ポリシーを指定します。 creds=Credential('<credential RID>') # クレデンシャルを指定します。 ) # 変換の出力を設定するデコレータを設定します。 @transform( output=Output('/path/to/output/dataset') # 出力データセットのパスを指定します。 ) # メインの計算関数を定義します。 def compute(egress, creds, ...): # ...

次に、APIに接続するためのシンプルなトランスフォームを設定することができます:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from transforms.api import transform, Output from transforms.external.systems import EgressPolicy, use_external_systems, Credential import requests # 外部システムを使用するためのデコレータ # ここでは、アクセスポリシーや資格情報のRID(リソースID)を指定しています @use_external_systems( egress=EgressPolicy('<policy RID>'), creds=Credential('<credential RID>') ) # トランスフォームデコレータで、出力先のパスを指定 @transform( output=Output('/path/to/output/dataset') ) def compute(egress, output, creds): # 資格情報からユーザ名とパスワードを取得 username = creds.get('username') password = creds.get('password') # 指定されたAPIからデータを取得 response = requests.get('https://<API URL>', auth=(username, password), timeout=10).text # 取得したデータを指定した出力先に書き出し with output.filesystem().open('response.json', 'w') as f: f.write(response)

以下の例のように、単一のトランスフォームで複数のエグレスポリシーや資格情報を指定することもできます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from transforms.api import transform, Output from transforms.external.systems import use_external_systems, EgressPolicy, Credential # 外部システムとの接続設定 @use_external_systems( egress1=EgressPolicy('<first policy RID>'), # 第一のポリシーRID egress2=EgressPolicy('<second policy RID>'), # 第二のポリシーRID creds1=Credential('<first credential RID>'), # 第一の資格情報RID creds2=Credential('<second credential RID>') # 第二の資格情報RID ) # トランスフォーム関数の設定 @transform( output=Output('/path/to/output/dataset') # 出力データセットへのパス ) def compute(egress1, egress2, creds1, creds2, ...): # ここで処理を記述 # ...

Foundry データセットの入力を追加する

場合によっては、Foundry の入力データを処理する外部トランスフォームを作成することが有用かもしれません。たとえば、表形式のデータセットの各行に対して追加のメタデータを集めるために API をクエリすることが必要かもしれません。または、Foundry のデータを外部のソフトウェアシステムにミラーリングする必要があるワークフローがあるかもしれません。

このようなケースは、安全な Foundry データを未知のセキュリティ保証とデータ起源の切断された別のシステムにエクスポートする可能性を開くため、輸出規制のワークフローと見なされます。トランスフォームの開発者がエクスポートワークフローのセキュリティを評価し、Foundry を離れるデータが外部システムで正しく保護されていることを確認するのは、トランスフォームの開発者の責任です。Foundry はガバナンスコントロールを提供して、開発者がセキュリティの意図を明確にエンコードできるようにし、情報セキュリティオフィサーが外部システムとやり取りするワークフローの範囲と意図を監査できるようにします。

輸出規制を適用するには、ExportControlをユーザーの外部トランスフォームに適用します。ExportControlは、エクスポートを目的としたセキュリティマーキングと組織IDのリストを受け入れます。その後、上流データに追加のセキュリティマーキングがマークされている場合、トランスフォームジョブは失敗することが保証されます。コードリポジトリでサポートされるエクスポート可能なマーキングのセットを拡張するには、情報セキュリティオフィサーがリポジトリの設定を調整する必要があります。設定 > リポジトリタブに移動します。次に、外部システムセクションで、ステップ 3 の下にこのリポジトリで Foundry の入力を外部システムと使用することを許可するを選択します。

セキュリティマーキングと組織は両方ともマーキングとして実装され、エクスポートコントロールの設定に一緒にリストされるべきです。マーキングが欠けているというエラーメッセージは、セキュリティマーキングまたは組織を指している可能性があります。

例として、Foundry データセットで指定された画像 URL をダウンロードするためのエクスポート外部トランスフォームを使用することができます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from transforms.api import transform, Input, Output from transforms.external.systems import use_external_systems, ExportControl, EgressPolicy from pyspark.sql.functions import udf import shutil # 外部システムの使用設定 @use_external_systems( export_control=ExportControl(markings=['<marking ID>']), egress=EgressPolicy(<policy RID>), ) # 入力データセットと出力データセットのパス設定 @transform( images_output=Output('/path/to/output/dataset'), image_urls=Input('/path/to/input/dataset'), ) def compute(export_control, egress, images_output, image_urls): # 画像をダウンロードするためのUDF関数 @udf def download(name, url): response = requests.get(url, stream=True) with images_output.filesystem().open(f'{name}.jpg', mode='wb') as out_file: shutil.copyfileobj(response.raw, out_file) return True # 画像のURLから画像をダウンロードし、結果をデータフレームに追加 image_urls.dataframe().withColumn('downloaded', download(col('Name'), col('Url'))).collect()

Foundry メディアセット出力の追加 [ベータ]

メディアセット出力は開発のベータ段階にあり、ユーザーのエンロールメントでは利用できないかもしれません。

画像、オーディオ、PDFなどのメディアファイルを取り込むために外部トランスフォームを使用する場合、ファイルをメディアセットに書き出すことができます。既存のメディアセットにメディアアイテムを書き込むには、プログラム的にアイテムを指定したメディアセット出力に置きます:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import transform from transforms.mediasets import MediaSetOutput from transforms.external.systems import use_external_systems import requests import tempfile # 外部システムを使用するデコレータ @use_external_systems() # 変換関数のデコレータ @transform( media_set_output=MediaSetOutput('/path/to/output/media/set') ) def compute(media_set_output): # APIから画像データを取得 response = requests.get('https://<API URL>') fname = 'my_image.png' # 一時ファイルを利用して画像データを処理 with tempfile.NamedTemporaryFile() as tmp: tmp.write(response.content) tmp.flush() # 一時ファイルを読み込んでメディアセットに追加 with open(tmp.name, 'rb') as tmp_read: media_set_output.put_media_item(tmp_read, path=fname)

外部システムへの接続方法をさらに

APIサービスとのやり取り

外部システム内に存在する場合、APIサービスをプログラム的にアクセスし利用することができます。例として、外部変換から Foundry データセットに地図画像を出力するために Mapbox Static Images API を使用することができます:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 # 外部システムとの連携を可能にするためのモジュールをインポートします from transforms.external.systems import EgressPolicy, use_external_systems, Credential # データ変換に必要なモジュールをインポートします from transforms.api import transform, Output # Mapboxという地図サービスのクライアントをインポートします from mapbox import Static # 外部システムとの連携を定義します @use_external_systems( # 外部システムへの出口ポリシーを設定します egress=EgressPolicy('<policy RID>'), # Mapboxの認証情報を設定します mapbox_creds=Credential('<credential RID>') ) # データ変換を定義します @transform( # 出力先を指定します output=Output('/Users/username/datasets/example_mapbox'), ) def compute(output, egress, mapbox_creds): # Mapboxへのアクセストークンを取得します mapbox_access_token = mapbox_creds.get('mapbox-token') # Mapboxクライアントを初期化します client = Static(access_token=mapbox_access_token) # ロンドンの衛星画像を取得します london = client.image('mapbox.satellite', lat=51.5072, lon=0.0, z=12) # ロンドンの画像取得結果をログに出力します with output.filesystem().open('london.log', 'w') as f: f.write(f'レスポンスコード: {london.status_code}') # ロンドンの画像データを一時的に保存します with open('london.png', 'wb') as f: f.flush() # ロンドンの画像データを出力先に保存します with output.filesystem().open('london.png', 'wb') as f: f.write(london.content)

データセットファイルとの対話

通常のトランスフォームと同様に、プログラムでファイルを開いたり、ストリーム処理したり、対話することができます。非構造化ファイルの読み書きについての詳細を学びましょう。

インクリメンタル処理

通常のトランスフォームと同様に、外部トランスフォームをインクリメンタルに実行することができます。

外部インクリメンタルトランスフォームでは、実行間の状態を維持することがよく必要とされます。通常のインクリメンタルトランスフォームでは、入力ファイルが追跡され、最新のトランザクションと比較されます。外部インクリメンタルトランスフォームの場合、インクリメンタル状態を明示的に別の方法で保存する必要があります。1つの方法は、最新のインクリメンタルトランスフォームの状態を保持する状態ファイルを保存することです。以下の例に示すように:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 from transforms.api import transform, Input, Output, configure, incremental from pyspark.sql import Row from pyspark.sql import functions as F from pyspark.sql import types as T import logging import time import json log = logging.getLogger(__name__) @incremental() @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @use_external_systems( egress=EgressPolicy('<policy RID>'), creds=Credential('<credential RID>') ) @transform( out=Output("<output_dataset>"), ) def compute(ctx, out, egress, creds): # 出力データセットのファイルシステムを取得して、ファイルを読み書きできるようにする: out_fs = out.filesystem() state_filename = "_state.json" # "_"で始まるファイル名は、データセットプレビューで非表示にするためのものです。 state = {"last_seen" : 0} # 任意の開始状態 # 出力データセットから状態を取得しようとします: try: with out_fs.open(state_filename, mode='r') as state_file: data = json.load(state_file) # 取得した状態を検証する: state = data logging.info(f"state file found, continuing from : {data}") except Exception as e: logging.warn("state file not found, starting over from default state") # ここで、API呼び出しを行ったり、カスタム処理を作成したり、データフレームを生成して保存したり、 # 出力データセットに直接ファイルを保存するロジックを記述します。 # 例として、次のようにデータフレームを生成して出力データセットに保存します(API呼び出しでデータを取得したかのように): out.write_dataframe(get_dataframe(ctx)) # 次のイテレーションの状態を更新する: state["last_seen"] = 1 + state["last_seen"] # 出力データセットに新しい状態を保存する: with out_fs.open(state_filename, "w") as state_file: json.dump(state, state_file) # この関数は、例としてデータフレームを生成するものです。本番環境ではこのステップは関係ありません: def get_dataframe(ctx): # データフレームのスキーマを定義する: schema = T.StructType([ T.StructField("name", T.StringType(), True), T.StructField("age", T.IntegerType(), True), T.StructField("city", T.StringType(), True) ]) # 行のリストを作成する: data = [("Alice", 25, "New York"), ("Bob", 30, "San Francisco"), ("Charlie", 35, "London")] # PySparkデータフレームを作成する: df = ctx.spark_session.createDataFrame(data, schema=schema)