注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

AIP オーケストレーター

非推奨の警告

このライブラリはもはや積極的に保守・サポートされていません。現在、代替品が開発中です。その間、データセット全体に LLM を適用する別の方法として Pipeline Builder のUse LLM nodeを参照してください。

transforms-aip ライブラリは、PySpark ワークフローに言語モデル API を統合することを簡素化します。このライブラリを使用すると、Spark DataFrame 内のデータを使用して補完モデルおよび埋め込みモデルのリクエストを作成できます。transforms-aip ライブラリの機能は以下のとおりです。

  • レート制限管理: リクエストおよびトークンベースのレート制限を処理します。
  • 分散ワークロード: Spark ジョブ内のエグゼキュータ間でタスクを均等に共有し、Spark の利点を最大限に活用します。
  • 最適化されたパフォーマンス: 現在のレート制限によって許可される最大速度でリクエストを実行します。
  • エラーハンドリング: リクエストがエラーで失敗した場合、システムは失敗に関する情報をキャプチャし、全体のビルドを失敗させないように実行を続行します。

これらの機能は、ユーザーによる最小限の構成で利用できます。

transforms-aip ライブラリのインストール

transforms-aip ライブラリを使用し始めるには、以下の依存関係を次の順序でユーザーのトランスフォームリポジトリにインストールする必要があります。

  1. palantir_models>=0.933.0
  2. transforms-aip>=0.441.0

使用法

このセクションには、completions の例embedding の例vision の例および トークン数を計算する方法の例が含まれています。

transforms-aip ライブラリの使用に際しては、最適な速度とコスト効率のために KUBERNETES_NO_EXECUTORS プロファイルを使用することを強くお勧めします。プロファイルの構成に関するドキュメントを参照してください。

Completions の例

補完を作成したいデータセットに対しては、単にテキスト列をライブラリに提供するだけです。以下の例では、question というテキスト列を使用しています。

idquestion
1What is the capital of Canada?
2Which country has the largest population?
3Name the longest river in South America.
4How many states are there in the United States?
5What is the name of the largest ocean on Earth?

以下は completions を実行するための完全なコードスニペットです。列が追加されるステップのコメントに注意してください。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput # レートリミットとトークンリミットの設定 RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 # Kubernetesでの実行設定 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), # 出力データセットの設定 questions=Input("input_dataset"), # 入力データセットの設定 # 使用するモデルの指定とリポジトリへのインポート chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # ベースとなるプロンプトの設定 # 500の質問をサンプリング sample_questions = questions.dataframe().limit(500) # プロンプトの作成 # orchestratorに渡すためのquestion_prompt列を追加 questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # Orchestratorの作成 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # OpenAI互換のプロパティを渡すことが可能 model_properties=CompletionModelProperties(temperature=0.6), ) # llm_answer列に回答を作成 # _completion_error-llm_answer列に回答作成に関する問題を記録 answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # 結果の保存 output.write_dataframe(answered)

これにより、指定した名前(llm_answer)の回答列と、リクエストのいずれかが失敗したことを示すエラー列(_completion_error-llm_answer)が作成されます。エラー列は常に(_completion_error-<result_column_name>)の形式を取ります。

たとえば、このトランスフォームの出力は次のようになります:

id質問question_promptllm_answer_completion_error-llm_answer
1カナダの首都は何ですか?この質問に答えてください:カナダの首都は何ですか?オタワnull
2最も人口が多い国はどこですか?この質問に答えてください:最も人口が多い国はどこですか?中国null
3南アメリカで最も長い川の名前は何ですか?この質問に答えてください:南アメリカで最も長い川の名前は何ですか?アマゾン川null
4アメリカにはいくつの州がありますか?この質問に答えてください:アメリカにはいくつの州がありますか?50null
5地球上で最も大きな海の名前は何ですか?この質問に答えてください:地球上で最も大きな海の名前は何ですか?太平洋null

追加のプロンプト戦略

単純な単一列プロンプトに加えて、複数の列を渡すプロンプト、文字列と列の組み合わせ、または画像など、より複雑なプロンプト戦略も使用できます。

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from transforms.aip.prompt import ( StringPromptComponent, ImagePromptComponent, MultimediaPromptComponent, ChatMessageRole, ) # システムプロンプトとユーザープロンプトの2つの列を渡すプロンプトを作成します prompt = [ StringPromptComponent(col="system_prompt_col", user=ChatMessageRole.SYSTEM), # システムプロンプト StringPromptComponent(col="prompt_col"), # ユーザープロンプト ] # システムプロンプトには文字列リテラルを、ユーザープロンプトには列を使用するプロンプトを作成します string_literal_prompt = [ StringPromptComponent(text="prompt string literal", user=ChatMessageRole.SYSTEM), # システムプロンプト StringPromptComponent(col="prompt_col"), # ユーザープロンプト ] # ビジョンクエリの画像定義を渡すマルチメディアプロンプトを作成します multimedia_prompt = [ MultimediaPromptComponent(["column_name"], ChatMessageRole.SYSTEM), # システムプロンプト MultimediaPromptComponent( [ "column_name2", ImagePromptComponent(mediasetInput, "mediaItemRid_column"), # ユーザープロンプト ] ), ]

ビジョンモデルの例

オーケストレータは、イメージサイズに関連したトークンベースのレート制限を含む、メディアセットからロードされたイメージを含むプロンプトも処理できます。

以下は、ビジョンモデルを実行するための完全なコードスニペットです:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.mediasets import MediaSetInput from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from transforms.aip.prompt import MultimediaPromptComponent, ImagePromptComponent from palantir_models.transforms import OpenAiGptChatWithVisionLanguageModelInput from language_model_service_api.languagemodelservice_api import ( ChatMessageRole, ) RATE_LIMIT_PER_MIN = 60 # 1分あたりのリクエストレート制限 TOKEN_LIMIT_PER_MIN = 50000 # 1分あたりのトークン制限 # KUBERNETES_NO_EXECUTORSのプロファイルで設定 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) # Transform関数を定義 @transform( output=Output("output_dataset"), # 出力データセットを指定 pngs=MediaSetInput("input_media_set"), # 入力メディアセットを指定 model=OpenAiGptChatWithVisionLanguageModelInput( # 使用するモデルを指定 "ri.language-model-service..language-model.gpt-4-vision_azure" ), ) def compute(ctx, output, pngs, model): # 1. 望ましいメディアセットのmediaItemRidsを取得 df = pngs.list_media_items_by_path(ctx) # 2. dfにシステムプロンプトの列を追加 df = df.withColumn( "system_prompt", F.lit("Briefly describe the following image:") # "次の画像を簡単に説明してください:"というプロンプトを追加 ) # 3. 完成品のオーケストレータを定義 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, model, model_properties=CompletionModelProperties(max_tokens=4096), # 最大トークン数を4096に設定 ) # 4. プロンプトとともにモデルを呼び出す answered = completions.withColumn( ctx, df, [ MultimediaPromptComponent(["system_prompt"], ChatMessageRole.SYSTEM), # システムメッセージとしてプロンプトを追加 MultimediaPromptComponent([ImagePromptComponent(pngs, "mediaItemRid")]), # 画像プロンプトを追加 ], "llm_answer", # 回答を"llm_answer"として保存 ) # 5. 結果を保存 output.write_dataframe(answered)

これにより、指定した名前の回答列(llm_answer)と、リクエストが失敗した場合に示すエラー列(_completion_error-llm_answer)が作成されます。エラー列は常に _completion_error-<result_column_name> の形式を取ります。

埋め込みの例

埋め込みについては、オーケストレーターを使用するためのコード構造は、Completion Orchestratorと同様です。以下に示します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 # KUBERNETES_NO_EXECUTORSプロフィールを使用するように設定 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), # 出力データセットの設定 questions=Input("input_dataset"), # 入力データセットの設定 # 使用するためにEmbedding modelをインポートする必要がある embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. 500問の質問を取る sample_questions = questions.dataframe().limit(500) # 2. オーケストレータをインスタンス化 embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. 埋め込みを実行 # embedding_result列に埋め込みを作成 # _embeddings_error-embedding_result列を作成して、問題を記録 questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. 結果を保存 output.write_dataframe(questions_with_embeddings)

完了オーケストレータと同様に、埋め込みオーケストレータは埋め込み応答列(embedding_result)とエラー列(_embeddings_error-embedding_result)を作成します。フォーマットは(_embeddings_error-<result_column_name>)です:

id質問embedding_result_embeddings_error-embedding_result
1カナダの首都は何ですか?[0.12, 0.54, ...]null
2人口が最も多い国はどこですか?[-0.23, 0.87, ...]null
3南アメリカで最も長い川の名前は何ですか?[0.65, -0.48, ...]null
4アメリカ合衆国には何州ありますか?[0.33, 0.92, ...]null
5地球上で最も大きな海の名前は何ですか?[-0.11, 0.34, ...]null

トークンの計算

ライブラリはまた、以下に示すように、ユーザーの入力のトークン数を理解するための簡単に使用できるモジュールも提供します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from palantir_models.transforms import GenericEmbeddingModelInput from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType from transforms.aip.tokenizer import Tokenizer from transforms.api import Input, Output, configure, transform @configure(["KUBERNETES_NO_EXECUTORS_SMALL"]) @transform( output=Output("output_dataset"), # 出力データセットを設定 questions=Input("input_dataset"), # 入力データセットを設定 embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), # 使用する埋め込みモデルを指定 ) def compute(output, questions, embedding_model): # 1. 500個の質問を取得 sample_questions = questions.dataframe().limit(500) # 2. 適切なトークナイザーを取得 ada_tokenizer = Tokenizer.get_tokenizer(embedding_model) # 3. 行をトークナイズするUDFを作成 @udf(returnType=IntegerType()) def calc_tokens(input_str: str) -> int: return ada_tokenizer.estimate_token_count(input_str) # 4. 各行について計算 with_tokens = sample_questions.withColumn( "num_tokens", calc_tokens(col("question")) ) # 5. 結果を保存 output.write_dataframe(with_tokens)

このコードは、question 列内の文字列に対するトークンの数を持つ新しい列 (num_tokens) を作成します。この数は、そのモデルに登録されているエンコーディングに基づいて計算されます(エンコーディングが存在する場合)。これは、言語モデルが認識するトークンに単語の断片をマッピングします。

すべてのプラットフォーム対応の OpenAI モデルには設定済みのトークナイザーがあります。他のモデル、またはトークナイザーが見つからない場合、システムはヒューリスティック(単語数を四で割ったもの)にデフォルト設定します。これはおそらく不正確になる可能性があります。

デバッグ

ランの一般的な情報については、以下の例に示すように、オーケストレータのコンストラクタに引数 verbose=True を渡します:

Copied!
1 2 3 4 5 6 7 8 9 10 11 # CompletionOrchestratorは補完の実行を管理するクラスです。 # RATE_LIMIT_PER_MINは1分あたりの最大リクエスト数を定義します。 # TOKEN_LIMIT_PER_MINは1分あたりの最大トークン数を定義します。 # chat_modelは使用するチャットモデルを定義します。 # verboseがTrueの場合、詳細なログが出力されます。 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, verbose=True, )

これにより、実行の理解を助けるためのメタデータ列が結果に追加されます。これらの列には、リクエストが実行されたパーティション、リクエストのタイムスタンプ、使用されたトークンが含まれます。