注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
サポートされている言語 (Python、Java、または TypeScript) で計算モジュールを作成し、利用可能な SDK を使用する場合、クライアントロジックを手動で作成する必要はありません。
しかし、SDK を使用していない場合や、現在サポートされていない言語で計算モジュールを構築する必要がある場合は、以下に説明するプロセスを使用して自分で計算モジュールクライアントを作成する必要があります。
計算モジュールクライアントは、計算モジュール内のロジックの実行を管理し、3 つの主要な責任を負います。
クライアントのメイン実行サイクルを開始する前に、計算モジュール関数のスキーマを公開することをお勧めします。これにより、計算モジュールのスキーマが Foundry の他の部分に公開されます。代わりに、計算モジュールの Functions タブでこの関数スキーマを手動で定義することもできます。
詳細については、自動関数スキーマ推論に関するドキュメントを確認してください。
クライアントは、処理する必要がある新しいジョブを継続的に内部計算モジュールサービスにポーリングします。ジョブが存在する場合、クライアントはそのジョブに対応する関数を見つけ、その関数を関連するペイロードで呼び出します。
関数が完了し、結果をクライアントに返したら、クライアントはその出力を計算モジュールサービスに報告する責任があります。
以下は、計算モジュールクライアントの実行ライフサイクルを簡単に示した視覚的な表現です。
内部計算モジュールサービスに最初に HTTP リクエストを送信しようとする際に 'connection refused' エラーが表示される場合があります。これは起動時の予想される動作であり、短いスリープ期間の後に再試行することで修正できます。
GET
ジョブMODULE_AUTH_TOKEN string
DEFAULT_CA_PATH string
GET_JOB_URI string
curl --header "Module-Auth-Token: $MODULE_AUTH_TOKEN" \ # 認証トークンをヘッダーに設定
--cacert $DEFAULT_CA_PATH \ # 証明書ファイルのパスを指定
--request GET \ # GETリクエストを送信
$GET_JOB_URI # リクエストを送信するURI
jobId string
queryType string
query JSON object
temporaryCredentialsAuthToken string
authHeader string
Copied!1 2 3 4 5 6 7 8 9 10 11
{ "computeModuleJobV1": { "jobId": "9a2a1e94-41d3-47d7-807f-db2f4c547b9c", // ジョブの一意の識別子 "queryType": "multiply", // クエリの種類、ここでは「掛け算」 "query": { "n": 4.0, // 掛け算に使用する数値 }, "temporaryCredentialsAuthToken": "token-data", // 一時的な認証トークン "authHeader": "auth-header" // 認証ヘッダー } }
POST
結果result_data octet-stream
jobId string
GET
ジョブリクエストから提供された jobId
。MODULE_AUTH_TOKEN string
DEFAULT_CA_PATH string
POST_RESULT_URI string
なし
curl --header "Content-Type: application/octet-stream" \ # コンテンツタイプをバイナリデータに設定
--header "Module-Auth-Token: $MODULE_AUTH_TOKEN" \ # モジュール認証トークンをヘッダーに追加
--cacert $DEFAULT_CA_PATH \ # CA証明書のパスを指定
--request POST \ # HTTP POSTリクエストを指定
--data $result_data \ # 送信するデータを指定
$POST_RESULT_URI/$jobId # リクエストの送信先URIとジョブIDを指定
POST
関数スキーマschema_data JSON array
MODULE_AUTH_TOKEN string
DEFAULT_CA_PATH string
POST_SCHEMA_URI string
204: リクエストが受け付けられました。
なし
curl --header "Content-Type: application/json" \
--header "Module-Auth-Token: $MODULE_AUTH_TOKEN" \
--cacert $DEFAULT_CA_PATH \
--request POST \
--data $schema_data \
$POST_SCHEMA_URI
# コメント:
# - `--header "Content-Type: application/json"`: リクエストのコンテンツタイプをJSONに設定
# - `--header "Module-Auth-Token: $MODULE_AUTH_TOKEN"`: 認証トークンをヘッダーに追加
# - `--cacert $DEFAULT_CA_PATH`: 使用するCA証明書のパスを指定
# - `--request POST`: POSTリクエストを指定
# - `--data $schema_data`: POSTリクエストのデータを指定
# - `$POST_SCHEMA_URI`: リクエストを送信するURI
app.py
import json
import logging as log
import os
import socket
import time
import requests
log.basicConfig(level=log.INFO)
certPath = os.environ["DEFAULT_CA_PATH"]
with open(os.environ["MODULE_AUTH_TOKEN"], "r") as f:
moduleAuthToken = f.read()
ip = socket.gethostbyname(socket.gethostname())
getJobUri = f"https://{ip}:8945/interactive-module/api/internal-query/job"
postResultUri = f"https://{ip}:8945/interactive-module/api/internal-query/results"
postSchemaUri = f"https://{ip}:8945/interactive-module/api/internal-query/schemas"
SCHEMAS = [
{
"functionName": "multiply",
"inputs": [
{
"name": "n",
"dataType": {"float": {}, "type": "float"},
"required": True,
"constraints": [],
},
],
"output": {
"single": {
"dataType": {
"float": {},
"type": "float",
}
},
"type": "single",
},
},
{
"functionName": "divide",
"inputs": [
{
"name": "n",
"dataType": {"float": {}, "type": "float"},
"required": True,
"constraints": [],
},
],
"output": {
"single": {
"dataType": {
"float": {},
"type": "float",
}
},
"type": "single",
},
},
]
# コンピュートモジュールサービスからジョブを取得する関数
# ステータスコードが200の場合のみジョブが存在する
# ステータスコードが204の場合は再度試行する
# このエンドポイントはロングポーリングが有効であり、遅延なく呼び出すことができる
def getJobBlocking():
while True:
response = requests.get(getJobUri, headers={"Module-Auth-Token": moduleAuthToken}, verify=certPath)
if response.status_code == 200:
return response.json()
elif response.status_code == 204:
log.info("No job found, trying again!")
# クエリの種類に基づいてクエリを処理する関数
def get_result(query_type, query):
if query_type == "multiply":
return float(query["n"]) * 2
elif query_type == "divide":
return float(query["n"]) / 2
else:
log.info(f"Unknown query type: {query_type}")
# コンピュートモジュールサービスにジョブの結果を投稿する関数
# 受信したすべてのジョブには結果を投稿する必要があり、さもなくば新しいジョブがこのワーカーにルーティングされない
def postResult(jobId, result):
response = requests.post(
f"{postResultUri}/{jobId}",
data=json.dumps(result).encode("utf-8"),
headers={"Module-Auth-Token": moduleAuthToken, "Content-Type": "application/octet-stream"},
verify=certPath,
)
if response.status_code != 204:
log.info(f"Failed to post result: {response.status_code}")
# コンピュートモジュールの関数のスキーマをコンピュートモジュールサービスに投稿する関数
# これは1回のみ呼び出す必要があるため、メインループに入る前に呼び出す
def postSchema():
num_tries = 0
success = False
while not success and num_tries < 5:
try:
response = requests.post(
postSchemaUri,
json=SCHEMAS,
headers={"Module-Auth-Token": moduleAuthToken, "Content-Type": "application/json"},
verify=certPath,
)
success = True
log.info(f"POST schema status: {response.status_code}")
log.info(f"POST schema text: {response.text} reason: {response.reason}")
except Exception as e:
log.error(f"Exception occurred posting schema: {e}")
time.sleep(2**num_tries)
num_tries += 1
postSchema()
# 永遠に試行する
while True:
try:
job = getJobBlocking()
v1 = job["computeModuleJobV1"]
job_id = v1["jobId"]
query_type = v1["queryType"]
query = v1["query"]
result = get_result(query_type, query)
postResult(job_id, result)
except Exception as e:
log.info(f"Something failed {str(e)}")
time.sleep(1)