注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
Compute Modules 機能はベータ版であり、ユーザーのエンロールメントで利用できない場合があります。
このドキュメントは、Compute Modules アプリケーション内でプラットフォーム上でも効率的な開発者体験のために閲覧できます。
Compute Modules を始めるには、好みの開発環境を使用できます。数分で Compute Module を作成してデプロイし、Foundry でテストできるようになります。
Foundry では、フォルダーを選択し、+ New > Compute Module を選択し、ダイアログの手順に従って、空の compute-module を利用した関数またはパイプラインで始めます。次のステップは実行モードに応じて以下のドキュメントに従うか、よりシームレスな体験のために、ユーザーの Compute Module 内の Documentation タブを選択して、プラットフォーム内でのガイダンスに従ってください。
以下のセクションでは、オープンソースの Python ライブラリ ↗ を使用します。独自のクライアントを作成したり、SDK がサポートしていない別の言語で Compute Module を実装したい場合は、カスタム Compute Module クライアント の実装に関するドキュメントを参照してください。
前提条件:
Dockerfile
というファイルを作成します。Dockerfile
にコピーして貼り付けます:requirements.txt
という新しいファイルを作成します。このファイルはPythonアプリケーションの依存関係を指定します。以下をファイルにコピー&ペーストしてください。foundry-compute-modules
src
という名前の新しいサブディレクトリを作成します。ここに Python アプリケーションを保存します。src
ディレクトリの中に app.py
というファイルを作成します。app.py
内に次のコードをコピーして貼り付けてください。Copied!1 2 3 4 5 6 7 8 9 10 11
from compute_modules.annotations import function @function def add(context, event): # event 辞書から 'x' と 'y' を取り出し、それらを足し合わせた結果を文字列として返す関数 return str(event['x'] + event['y']) @function def hello(context, event): # event 辞書から 'name' を取り出し、それに 'Hello' を付け加えて返す関数 return 'Hello' + event['name']
タイプ推論を追加し、関数レジストリに計算モジュール関数を自動的に登録する方法を学びます。
計算モジュール関数を使用する際、関数は常に 2 個のパラメーターを受け取ります: イベントオブジェクトとコンテキストオブジェクトです。
コンテキストオブジェクト: メタデータと資格情報を含む Python の dict
オブジェクトパラメーターで、関数が必要とする可能性があるものです。たとえば、ユーザートークン、ソースの資格情報、およびその他の必要なデータが含まれます。たとえば、関数がオントロジーオブジェクトを取得するために OSDK を呼び出す必要がある場合、コンテキストオブジェクトにはユーザーがそのオントロジーオブジェクトにアクセスするために必要なトークンが含まれます。
イベントオブジェクト: 関数が処理するデータを含む Python の dict
オブジェクトパラメーターです。関数に渡されるすべてのパラメーター、たとえば加算関数の x
と y
、および hello
関数の name
などが含まれます。
イベント/戻りオブジェクトに静的型付けを使用する場合、ライブラリはペイロード/結果をその静的型付けオブジェクトに変換します。詳細については、自動関数スキーマ推論に関するドキュメントを確認してください。
関数の結果は JSON ブロブとして配線されるため、関数が JSON にシリアル化できることを確認してください。
ここで、コードを Foundry に公開し、Docker イメージを保存するために製作物リポジトリを使用します。
計算モジュールは、コンテナ化された環境でデータパイプラインの入力と出力の間のコネクタとして動作できます。この例では、入力と出力としてストリーミングデータセットを持つシンプルなユースケースを構築し、入力データを倍増する関数を定義し、出力データセットに書き込みます。動作するデータパイプラインをシミュレートするために仮想データを使用します。
Dockerfile
というファイルを作成します。Dockerfile
にコピーして貼り付けます。requirements.txt
という名前の新しいファイルを作成します。このファイルにPythonアプリケーションの依存関係を保存します。たとえば:requests == 2.31.0
src
という名前の新しいサブディレクトリを作成します。ここに Python アプリケーションを配置します。src
ディレクトリ内に app.py
という名前のファイルを作成します。app.py
にインポートします:import os # OS関連の操作を行うためのモジュール
import json # JSONデータの操作を行うためのモジュール
import time # 時間関連の操作を行うためのモジュール
import requests # HTTPリクエストを送るためのモジュール
app.py
内で、インプットとアウトプットアクセスのためのベアラートークンを取得します。with open(os.environ['BUILD2_TOKEN']) as f:
bearer_token = f.read()
# 環境変数 'BUILD2_TOKEN' からファイルパスを取得し、そのファイルを開く
# ファイルの内容を読み取り、bearer_token という変数に格納する
app.py
内で入力情報と出力情報を取得します:import os
import json
# 環境変数 'RESOURCE_ALIAS_MAP' からファイルパスを取得し、ファイルを開く
with open(os.environ['RESOURCE_ALIAS_MAP']) as f:
# ファイルの内容を JSON としてロード
resource_alias_map = json.load(f)
# JSON から入力情報と出力情報を取得
input_info = resource_alias_map['identifier you put in the config'] # コンフィグに設定した識別子
output_info = resource_alias_map['identifier you put in the config'] # コンフィグに設定した識別子
# 入力リソースの識別子とブランチを取得、ブランチが設定されていない場合は 'master' をデフォルトにする
input_rid = input_info['rid']
input_branch = input_info['branch'] or "master"
# 出力リソースの識別子とブランチを取得、ブランチが設定されていない場合は 'master' をデフォルトにする
output_rid = output_info['rid']
output_branch = output_info['branch'] or "master"
app.py
内で、入力と出力を操作し、計算を実行します。たとえば:FOUNDRY_URL = "yourenrollment.palantirfoundry.com"
def get_stream_latest_records():
# 入力ストリームから最新のレコードを取得するためのURLを構築
url = f"https://{FOUNDRY_URL}/stream-proxy/api/streams/{input_rid}/branches/{input_branch}/records"
# リクエストを送信してレスポンスを取得
response = requests.get(url, headers={"Authorization": f"Bearer {bearer_token}"})
# レスポンスをJSON形式で返す
return response.json()
def process_record(record):
# 入力ストリームのスキーマが 'x': Integer と仮定
x = record['value']['x']
# 出力ストリームのスキーマが 'twice_x': Integer と仮定
return {'twice_x': x * 2}
def put_record_to_stream(record):
# 出力ストリームにレコードを送信するためのURLを構築
url = f"https://{FOUNDRY_URL}/stream-proxy/api/streams/{output_rid}/branches/{output_branch}/jsonRecord"
# レコードをPOSTリクエストで送信
requests.post(url, json=record, headers={"Authorization": f"Bearer {bearer_token}"})
app.py
内でコードを自律タスクとして実行します。たとえば:while True:
records = get_stream_latest_records()
# records を処理する関数 process_record を使って各レコードを処理します
processed_records = list(map(process_record, records))
# 処理済みのレコードをストリームに追加します
[put_record_to_stream(record) for record in processed_records]
# 60秒間待機します
time.sleep(60)
これで、出力データセットにライブでストリーミングされる結果を確認できます。
入力と出力を操作するために、ベアラートークンと入力/出力情報を提供します。
次に、入力と出力を操作し、計算を実行するコードを書きます。以下のコードスニペットは、2つのストリームデータセットをパイプライン処理する簡単な例です。
stream-proxy
サービスを呼び出すことで、ベアラートークンと入力情報を使用して入力ストリームデータセットの最新レコードを読み取ります。これで、Foundryにコードを公開するために製作物リポジトリを使用できます。これはDockerイメージを保存するために使用されます。