The transforms-aip
library simplifies the integration of language model APIs into PySpark workflows. The library enables users to create requests for completions and embedding models using the data within a Spark DataFrame. Features of the transforms-aip
library include:
These features are available with minimal configuration required from the user.
transforms-aip
libraryTo begin using the transforms-aip
library, you'll need to install the following dependencies in your transforms repository in this order:
palantir_models>=0.933.0
transforms-aip>=0.441.0
This section contains a completions example, embedding example, vision example and an example showing how to calculate token count.
For any use of the transforms-aip
library, we strongly recommend using the KUBERNETES_NO_EXECUTORS
profile for optimum speed and cost efficiency. Review our documentation on configuring profiles for more information.
For a given dataset for which we want to create completions, we simply need to supply the text column to the library. This can be seen in the example below, using the text column question
:
id | question |
---|---|
1 | What is the capital of Canada? |
2 | Which country has the largest population? |
3 | Name the longest river in South America. |
4 | How many states are there in the United States? |
5 | What is the name of the largest ocean on Earth? |
Below is a full code snippet for running completions; note the comments for the steps when columns get added:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # You must specify which model to use and import it into the repository chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # Sample 500 questions sample_questions = questions.dataframe().limit(500) # Build prompts # Add a question_prompt column to pass to the orchestrator questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # Create Orchestrator completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # OpenAI compatible properties can be passed in model_properties=CompletionModelProperties(temperature=0.6), ) # Creates llm_answer column with the response # Creates _completion_error-llm_answer column with any issues encountered with the response answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # Save results output.write_dataframe(answered)
This creates an answer column with the name you supply (llm_answer
) and an error column (_completion_error-llm_answer
) to indicate if any of the requests failed. The error column will always take the form (_completion_error-<result_column_name>
)
For example the output of this transform would be:
id | question | question_prompt | llm_answer | _completion_error-llm_answer |
---|---|---|---|---|
1 | What is the capital of Canada? | Answer this question: What is the capital of Canada? | Ottawa | null |
2 | Which country has the largest population? | Answer this question: Which country has the largest population? | China | null |
3 | Name the longest river in South America. | Answer this question: Name the longest river in South America. | Amazon River | null |
4 | How many states are there in the United States? | Answer this question: How many states are there in the United States? | 50 | null |
5 | What is the name of the largest ocean on Earth? | Answer this question: What is the name of the largest ocean on Earth? | Pacific Ocean | null |
In addition to simple single-column prompts, you can also use more complicated prompting strategies, such as prompts that pass multiple columns, a combination of strings and columns, or images.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from transforms.aip.prompt import ( StringPromptComponent, ImagePromptComponent, MultimediaPromptComponent, ChatMessageRole, ) # Create a prompt that passes two columns, one for a system prompt and one for a user prompt prompt = [ StringPromptComponent(col="system_prompt_col", user=ChatMessageRole.SYSTEM), StringPromptComponent(col="prompt_col"), ] # Create a prompt that uses a string literal for the system prompt, and a column for the user prompt string_literal_prompt = [ StringPromptComponent(text="prompt string literal", user=ChatMessageRole.SYSTEM), StringPromptComponent(col="prompt_col"), ] # Create a multimedia prompt that passes an image definition for a vision query multimedia_prompt = [ MultimediaPromptComponent(["column_name"], ChatMessageRole.SYSTEM), MultimediaPromptComponent( [ "column_name2", ImagePromptComponent(mediasetInput, "mediaItemRid_column"), ] ), ]
The orchestrator can also handle prompts that include images loaded from media sets, including token-based rate limiting associated with image sizes.
Below is a full code snippet for running a vision model:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.mediasets import MediaSetInput from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from transforms.aip.prompt import MultimediaPromptComponent, ImagePromptComponent from palantir_models.transforms import OpenAiGptChatWithVisionLanguageModelInput from language_model_service_api.languagemodelservice_api import ( ChatMessageRole, ) RATE_LIMIT_PER_MIN = 60 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), pngs=MediaSetInput("input_media_set"), model=OpenAiGptChatWithVisionLanguageModelInput( "ri.language-model-service..language-model.gpt-4-vision_azure" ), ) def compute(ctx, output, pngs, model): # 1. get mediaItemRids for the desired mediaset df = pngs.list_media_items_by_path(ctx) # 2. add a system prompt column to the df df = df.withColumn( "system_prompt", F.lit("Briefly describe the following image:") ) # 3. define the completions orchestrator completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, model, model_properties=CompletionModelProperties(max_tokens=4096), ) # 4. call the model with a prompt answered = completions.withColumn( ctx, df, [ MultimediaPromptComponent(["system_prompt"], ChatMessageRole.SYSTEM), MultimediaPromptComponent([ImagePromptComponent(pngs, "mediaItemRid")]), ], "llm_answer", ) # 5. save results output.write_dataframe(answered)
This creates an answer column with the name you provided (llm_answer
) and an error column (_completion_error-llm_answer
) to indicate if any of the requests failed. The error column will always take the form _completion_error-<result_column_name>
.
For embeddings, the code structure to use the orchestrator is similar to the Completion Orchestrator. Seen below:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 @configure(profile=["KUBERNETES_NO_EXECUTORS"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # Embedding model must be imported for use embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. take 500 questions sample_questions = questions.dataframe().limit(500) # 2. instantiate orchestrator embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. run embeddings # Creates embedding in embedding_result column # Creates _embeddings_error-embedding_result column to record any issues questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. save results output.write_dataframe(questions_with_embeddings)
As with the completion orchestrator, the embedding orchestrator creates an embedding response column (embedding_result
) and error column (_embeddings_error-embedding_result
) in the format (_embeddings_error-<result_column_name>
):
id | question | embedding_result | _embeddings_error-embedding_result |
---|---|---|---|
1 | What is the capital of Canada? | [0.12, 0.54, ...] | null |
2 | Which country has the largest population? | [-0.23, 0.87, ...] | null |
3 | Name the longest river in South America. | [0.65, -0.48, ...] | null |
4 | How many states are there in the United States? | [0.33, 0.92, ...] | null |
5 | What is the name of the largest ocean on Earth? | [-0.11, 0.34, ...] | null |
The library also exposes an easy to use module for understanding the token number of your inputs, as shown below:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
from palantir_models.transforms import GenericEmbeddingModelInput from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType from transforms.aip.tokenizer import Tokenizer from transforms.api import Input, Output, configure, transform @configure(["KUBERNETES_NO_EXECUTORS_SMALL"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model): # 1. take 500 questions sample_questions = questions.dataframe().limit(500) # 2. Get the appropriate tokenizer ada_tokenizer = Tokenizer.get_tokenizer(embedding_model) # 3. Make a UDF to tokenize rows @udf(returnType=IntegerType()) def calc_tokens(input_str: str) -> int: return ada_tokenizer.estimate_token_count(input_str) # 4. Calculate for each row with_tokens = sample_questions.withColumn( "num_tokens", calc_tokens(col("question")) ) # 5. save results output.write_dataframe(with_tokens)
This code creates a new column (num_tokens
) with the number of tokens for the string in the question
column. This number is calculated based on the encoding registered for that model (if an encoding exists), which maps fragments of words to tokens recognized by the language model.
There are configured tokenizers for all platform supported OpenAI models. For other models, or if a tokenizer can't be found, the system will default to a heuristic (word count divided by four). This will likely be inaccurate.
For general information about the run, pass the argument verbose=True
to the constructor of an orchestrator, as shown in the following example:
Copied!1 2 3 4 5 6
completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, verbose=True, )
This adds metadata columns to the result to help understand the run. These columns include which partition the request was run on, the timestamp of the request, and the tokens used.