AIP orchestrators

The transforms-aip library simplifies the integration of language model APIs into PySpark workflows. The library enables users to create requests for completions and embedding models using the data within a Spark DataFrame. Features of the transforms-aip library include:

  • Rate limit management: Handles both request and token based rate limiting.
  • Distributed workload: Shares tasks equally across executors in the Spark job, maximizing the benefits of Spark.
  • Optimized performance: Ensure the maximum speed for requests as permitted by the current rate limits.
  • Error handling: If a request fails due to any error, the system will capture information about the failure and continue with the execution in order to avoid failing the entire build.

These features are available with minimal configuration required from the user.

Installing the transforms-aip library

To begin using the transforms-aip library, you'll need to install the following dependencies in your transforms repository in this order:

  1. palantir_models>=0.933.0
  2. transforms-aip>=0.441.0

Usage

This section contains a completions example, embedding example, vision example and an example showing how to calculate token count.

For any use of the transforms-aip library, we strongly recommend using the KUBERNETES_NO_EXECUTORS profile for optimum speed and cost efficiency. Review our documentation on configuring profiles for more information.

Completions example

For a given dataset for which we want to create completions, we simply need to supply the text column to the library. This can be seen in the example below, using the text column question:

idquestion
1What is the capital of Canada?
2Which country has the largest population?
3Name the longest river in South America.
4How many states are there in the United States?
5What is the name of the largest ocean on Earth?

Below is a full code snippet for running completions; note the comments for the steps when columns get added:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # You must specify which model to use and import it into the repository chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # Sample 500 questions sample_questions = questions.dataframe().limit(500) # Build prompts # Add a question_prompt column to pass to the orchestrator questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # Create Orchestrator completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # OpenAI compatible properties can be passed in model_properties=CompletionModelProperties(temperature=0.6), ) # Creates llm_answer column with the response # Creates _completion_error-llm_answer column with any issues encountered with the response answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # Save results output.write_dataframe(answered)

This creates an answer column with the name you supply (llm_answer) and an error column (_completion_error-llm_answer) to indicate if any of the requests failed. The error column will always take the form (_completion_error-<result_column_name>)

For example the output of this transform would be:

idquestionquestion_promptllm_answer_completion_error-llm_answer
1What is the capital of Canada?Answer this question: What is the capital of Canada?Ottawanull
2Which country has the largest population?Answer this question: Which country has the largest population?Chinanull
3Name the longest river in South America.Answer this question: Name the longest river in South America.Amazon Rivernull
4How many states are there in the United States?Answer this question: How many states are there in the United States?50null
5What is the name of the largest ocean on Earth?Answer this question: What is the name of the largest ocean on Earth?Pacific Oceannull

Additional prompting strategies

In addition to simple single-column prompts, you can also use more complicated prompting strategies, such as prompts that pass multiple columns, a combination of strings and columns, or images.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from transforms.aip.prompt import ( StringPromptComponent, ImagePromptComponent, MultimediaPromptComponent, ChatMessageRole, ) # Create a prompt that passes two columns, one for a system prompt and one for a user prompt prompt = [ StringPromptComponent(col="system_prompt_col", user=ChatMessageRole.SYSTEM), StringPromptComponent(col="prompt_col"), ] # Create a prompt that uses a string literal for the system prompt, and a column for the user prompt string_literal_prompt = [ StringPromptComponent(text="prompt string literal", user=ChatMessageRole.SYSTEM), StringPromptComponent(col="prompt_col"), ] # Create a multimedia prompt that passes an image definition for a vision query multimedia_prompt = [ MultimediaPromptComponent(["column_name"], ChatMessageRole.SYSTEM), MultimediaPromptComponent( [ "column_name2", ImagePromptComponent(mediasetInput, "mediaItemRid_column"), ] ), ]

Vision completions example

The orchestrator can also handle prompts that include images loaded from media sets, including token-based rate limiting associated with image sizes.

Below is a full code snippet for running a vision model:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.mediasets import MediaSetInput from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from transforms.aip.prompt import MultimediaPromptComponent, ImagePromptComponent from palantir_models.transforms import OpenAiGptChatWithVisionLanguageModelInput from language_model_service_api.languagemodelservice_api import ( ChatMessageRole, ) RATE_LIMIT_PER_MIN = 60 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), pngs=MediaSetInput("input_media_set"), model=OpenAiGptChatWithVisionLanguageModelInput( "ri.language-model-service..language-model.gpt-4-vision_azure" ), ) def compute(ctx, output, pngs, model): # 1. get mediaItemRids for the desired mediaset df = pngs.list_media_items_by_path(ctx) # 2. add a system prompt column to the df df = df.withColumn( "system_prompt", F.lit("Briefly describe the following image:") ) # 3. define the completions orchestrator completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, model, model_properties=CompletionModelProperties(max_tokens=4096), ) # 4. call the model with a prompt answered = completions.withColumn( ctx, df, [ MultimediaPromptComponent(["system_prompt"], ChatMessageRole.SYSTEM), MultimediaPromptComponent([ImagePromptComponent(pngs, "mediaItemRid")]), ], "llm_answer", ) # 5. save results output.write_dataframe(answered)

This creates an answer column with the name you provided (llm_answer) and an error column (_completion_error-llm_answer) to indicate if any of the requests failed. The error column will always take the form _completion_error-<result_column_name>.

Embeddings example

For embeddings, the code structure to use the orchestrator is similar to the Completion Orchestrator. Seen below:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # Embedding model must be imported for use embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. take 500 questions sample_questions = questions.dataframe().limit(500) # 2. instantiate orchestrator embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. run embeddings # Creates embedding in embedding_result column # Creates _embeddings_error-embedding_result column to record any issues questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. save results output.write_dataframe(questions_with_embeddings)

As with the completion orchestrator, the embedding orchestrator creates an embedding response column (embedding_result) and error column (_embeddings_error-embedding_result) in the format (_embeddings_error-<result_column_name>):

idquestionembedding_result_embeddings_error-embedding_result
1What is the capital of Canada?[0.12, 0.54, ...]null
2Which country has the largest population?[-0.23, 0.87, ...]null
3Name the longest river in South America.[0.65, -0.48, ...]null
4How many states are there in the United States?[0.33, 0.92, ...]null
5What is the name of the largest ocean on Earth?[-0.11, 0.34, ...]null

Calculate Tokens

The library also exposes an easy to use module for understanding the token number of your inputs, as shown below:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from palantir_models.transforms import GenericEmbeddingModelInput from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType from transforms.aip.tokenizer import Tokenizer from transforms.api import Input, Output, configure, transform @configure(["KUBERNETES_NO_EXECUTORS_SMALL"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model): # 1. take 500 questions sample_questions = questions.dataframe().limit(500) # 2. Get the appropriate tokenizer ada_tokenizer = Tokenizer.get_tokenizer(embedding_model) # 3. Make a UDF to tokenize rows @udf(returnType=IntegerType()) def calc_tokens(input_str: str) -> int: return ada_tokenizer.estimate_token_count(input_str) # 4. Calculate for each row with_tokens = sample_questions.withColumn( "num_tokens", calc_tokens(col("question")) ) # 5. save results output.write_dataframe(with_tokens)

This code creates a new column (num_tokens) with the number of tokens for the string in the question column. This number is calculated based on the encoding registered for that model (if an encoding exists), which maps fragments of words to tokens recognized by the language model.

There are configured tokenizers for all platform supported OpenAI models. For other models, or if a tokenizer can't be found, the system will default to a heuristic (word count divided by four). This will likely be inaccurate.

Debugging

For general information about the run, pass the argument verbose=True to the constructor of an orchestrator, as shown in the following example:

Copied!
1 2 3 4 5 6 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, verbose=True, )

This adds metadata columns to the result to help understand the run. These columns include which partition the request was run on, the timestamp of the request, and the tokens used.