注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

カスタム計算モジュールクライアントの作成 (上級)

なぜクライアントが必要なのか?

サポートされている言語 (Python、Java、または TypeScript) で計算モジュールを作成し、利用可能な SDK を使用する場合、クライアントロジックを手動で作成する必要はありません。

しかし、SDK を使用していない場合や、現在サポートされていない言語で計算モジュールを構築する必要がある場合は、以下に説明するプロセスを使用して自分で計算モジュールクライアントを作成する必要があります。

カスタムクライアントの責任

計算モジュールクライアントは、計算モジュール内のロジックの実行を管理し、3 つの主要な責任を負います。

計算モジュールの関数スキーマを投稿する (オプション)

クライアントのメイン実行サイクルを開始する前に、計算モジュール関数のスキーマを公開することをお勧めします。これにより、計算モジュールのスキーマが Foundry の他の部分に公開されます。代わりに、計算モジュールの Functions タブでこの関数スキーマを手動で定義することもできます。

詳細については、自動関数スキーマ推論に関するドキュメントを確認してください。

新しいジョブをポーリングする

クライアントは、処理する必要がある新しいジョブを継続的に内部計算モジュールサービスにポーリングします。ジョブが存在する場合、クライアントはそのジョブに対応する関数を見つけ、その関数を関連するペイロードで呼び出します。

計算モジュールの結果 (出力) を投稿する

関数が完了し、結果をクライアントに返したら、クライアントはその出力を計算モジュールサービスに報告する責任があります。

以下は、計算モジュールクライアントの実行ライフサイクルを簡単に示した視覚的な表現です。

計算モジュールクライアントの実行サイクル。

API リファレンス

内部計算モジュールサービスに最初に HTTP リクエストを送信しようとする際に 'connection refused' エラーが表示される場合があります。これは起動時の予想される動作であり、短いスリープ期間の後に再試行することで修正できます。

GET ジョブ

変数

MODULE_AUTH_TOKEN string

  • 詳細については、環境変数に関するドキュメントを確認してください。

DEFAULT_CA_PATH string

  • 詳細については、環境変数に関するドキュメントを確認してください。

GET_JOB_URI string

  • 詳細については、環境変数に関するドキュメントを確認してください。

予想されるステータスコード

  • 200: 処理すべき新しいジョブが存在します。
  • 204: 処理すべき新しいジョブは存在しません。
curl --header "Module-Auth-Token: $MODULE_AUTH_TOKEN" \  # 認証トークンをヘッダーに設定
    --cacert $DEFAULT_CA_PATH \  # 証明書ファイルのパスを指定
    --request GET \  # GETリクエストを送信
    $GET_JOB_URI  # リクエストを送信するURI

レスポンスパラメーター

jobId string

  • 処理されるジョブの一意の識別子。

queryType string

  • 呼び出される関数の名前。

query JSON object

  • 関数に渡されるペイロード。

temporaryCredentialsAuthToken string

  • Foundry データサイドカーで使用される一時トークン。

authHeader string

  • Foundry 内の他のサービスを呼び出すために使用できる Foundry 認証トークン。
  • 特定のモードでのみ利用可能。
Copied!
1 2 3 4 5 6 7 8 9 10 11 { "computeModuleJobV1": { "jobId": "9a2a1e94-41d3-47d7-807f-db2f4c547b9c", // ジョブの一意の識別子 "queryType": "multiply", // クエリの種類、ここでは「掛け算」 "query": { "n": 4.0, // 掛け算に使用する数値 }, "temporaryCredentialsAuthToken": "token-data", // 一時的な認証トークン "authHeader": "auth-header" // 認証ヘッダー } }

POST 結果

変数

result_data octet-stream

  • 計算モジュール関数から返された結果。

jobId string

  • 対応する GET ジョブリクエストから提供された jobId

MODULE_AUTH_TOKEN string

  • 詳細については、環境変数ドキュメントを参照してください。

DEFAULT_CA_PATH string

  • 詳細については、環境変数ドキュメントを参照してください。

POST_RESULT_URI string

  • 詳細については、環境変数ドキュメントを参照してください。

期待されるステータスコード

  • 204: リクエストは受け入れられました。

レスポンスパラメーター

なし

curl --header "Content-Type: application/octet-stream" \  # コンテンツタイプをバイナリデータに設定
    --header "Module-Auth-Token: $MODULE_AUTH_TOKEN" \  # モジュール認証トークンをヘッダーに追加
    --cacert $DEFAULT_CA_PATH \  # CA証明書のパスを指定
    --request POST \  # HTTP POSTリクエストを指定
    --data $result_data \  # 送信するデータを指定
    $POST_RESULT_URI/$jobId  # リクエストの送信先URIとジョブIDを指定

POST 関数スキーマ

変数

schema_data JSON array

  • JSON 形式の計算モジュール関数のスキーマ。

MODULE_AUTH_TOKEN string

  • 詳細については、環境変数のドキュメントを参照してください。

DEFAULT_CA_PATH string

  • 詳細については、環境変数のドキュメントを参照してください。

POST_SCHEMA_URI string

  • 詳細については、環境変数のドキュメントを参照してください。

期待されるステータスコード

204: リクエストが受け付けられました。

レスポンスパラメーター

なし

curl --header "Content-Type: application/json" \
    --header "Module-Auth-Token: $MODULE_AUTH_TOKEN" \
    --cacert $DEFAULT_CA_PATH \
    --request POST \
    --data $schema_data \
    $POST_SCHEMA_URI

# コメント:
# - `--header "Content-Type: application/json"`: リクエストのコンテンツタイプをJSONに設定
# - `--header "Module-Auth-Token: $MODULE_AUTH_TOKEN"`: 認証トークンをヘッダーに追加
# - `--cacert $DEFAULT_CA_PATH`: 使用するCA証明書のパスを指定
# - `--request POST`: POSTリクエストを指定
# - `--data $schema_data`: POSTリクエストのデータを指定
# - `$POST_SCHEMA_URI`: リクエストを送信するURI

Python の例

app.py

import json
import logging as log
import os
import socket
import time

import requests

log.basicConfig(level=log.INFO)

certPath = os.environ["DEFAULT_CA_PATH"]

with open(os.environ["MODULE_AUTH_TOKEN"], "r") as f:
    moduleAuthToken = f.read()

ip = socket.gethostbyname(socket.gethostname())

getJobUri = f"https://{ip}:8945/interactive-module/api/internal-query/job"
postResultUri = f"https://{ip}:8945/interactive-module/api/internal-query/results"
postSchemaUri = f"https://{ip}:8945/interactive-module/api/internal-query/schemas"

SCHEMAS = [
    {
        "functionName": "multiply",
        "inputs": [
            {
                "name": "n",
                "dataType": {"float": {}, "type": "float"},
                "required": True,
                "constraints": [],
            },
        ],
        "output": {
            "single": {
                "dataType": {
                    "float": {},
                    "type": "float",
                }
            },
            "type": "single",
        },
    },
    {
        "functionName": "divide",
        "inputs": [
            {
                "name": "n",
                "dataType": {"float": {}, "type": "float"},
                "required": True,
                "constraints": [],
            },
        ],
        "output": {
            "single": {
                "dataType": {
                    "float": {},
                    "type": "float",
                }
            },
            "type": "single",
        },
    },
]


# コンピュートモジュールサービスからジョブを取得する関数
# ステータスコードが200の場合のみジョブが存在する
# ステータスコードが204の場合は再度試行する
# このエンドポイントはロングポーリングが有効であり、遅延なく呼び出すことができる
def getJobBlocking():
    while True:
        response = requests.get(getJobUri, headers={"Module-Auth-Token": moduleAuthToken}, verify=certPath)
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 204:
            log.info("No job found, trying again!")


# クエリの種類に基づいてクエリを処理する関数
def get_result(query_type, query):
    if query_type == "multiply":
        return float(query["n"]) * 2
    elif query_type == "divide":
        return float(query["n"]) / 2
    else:
        log.info(f"Unknown query type: {query_type}")


# コンピュートモジュールサービスにジョブの結果を投稿する関数
# 受信したすべてのジョブには結果を投稿する必要があり、さもなくば新しいジョブがこのワーカーにルーティングされない
def postResult(jobId, result):
    response = requests.post(
        f"{postResultUri}/{jobId}",
        data=json.dumps(result).encode("utf-8"),
        headers={"Module-Auth-Token": moduleAuthToken, "Content-Type": "application/octet-stream"},
        verify=certPath,
    )
    if response.status_code != 204:
        log.info(f"Failed to post result: {response.status_code}")


# コンピュートモジュールの関数のスキーマをコンピュートモジュールサービスに投稿する関数
# これは1回のみ呼び出す必要があるため、メインループに入る前に呼び出す
def postSchema():
    num_tries = 0
    success = False
    while not success and num_tries < 5:
        try:
            response = requests.post(
                postSchemaUri,
                json=SCHEMAS,
                headers={"Module-Auth-Token": moduleAuthToken, "Content-Type": "application/json"},
                verify=certPath,
            )
            success = True
            log.info(f"POST schema status: {response.status_code}")
            log.info(f"POST schema text: {response.text} reason: {response.reason}")
        except Exception as e:
            log.error(f"Exception occurred posting schema: {e}")
            time.sleep(2**num_tries)
            num_tries += 1


postSchema()

# 永遠に試行する
while True:
    try:
        job = getJobBlocking()
        v1 = job["computeModuleJobV1"]
        job_id = v1["jobId"]
        query_type = v1["queryType"]
        query = v1["query"]
        result = get_result(query_type, query)
        postResult(job_id, result)
    except Exception as e:
        log.info(f"Something failed {str(e)}")
        time.sleep(1)