transforms-aip
라이브러리는 언어 모델 API를 PySpark 워크플로에 통합하는 것을 단순화합니다. 이 라이브러리를 사용하면 사용자는 Spark DataFrame 내의 데이터를 사용하여 완성 및 임베딩 모델에 대한 요청을 생성할 수 있습니다. transforms-aip
라이브러리의 기능은 다음과 같습니다:
이러한 기능은 사용자가 최소한의 설정만으로 사용할 수 있습니다.
transforms-aip
라이브러리 설치하기transforms-aip
라이브러리를 사용하기 시작하려면, 다음의 종속성을 transforms 저장소에 이 순서대로 설치해야 합니다:
palantir_models>=0.933.0
transforms-aip>=0.386.0
이 섹션에는 완성 예시와 임베딩 예시가 포함되어 있습니다.
완성을 생성하려는 주어진 데이터셋의 경우, 텍스트 열을 라이브러리에 공급하기만 하면 됩니다. 아래의 예에서는 question
이라는 텍스트 열을 사용하였습니다:
id | question |
---|---|
1 | What is the capital of Canada? |
2 | Which country has the largest population? |
3 | Name the longest river in South America. |
4 | How many states are there in the United States? |
5 | What is the name of the largest ocean on Earth? |
아래는 완성을 실행하기 위한 전체 코드 스니펫입니다; 열이 추가될 때의 단계에 대한 주석에 주목하세요:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 # 설정 @configure(["NUM_EXECUTORS_2"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 사용할 모델을 지정하고 레포지토리에서 가져오기 chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # 500개 질문 샘플링 sample_questions = questions.dataframe().limit(500) # 프롬프트 구성 # 오케스트레이터에 전달할 question_prompt 열 추가 questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # 오케스트레이터 생성 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # OpenAI 호환 속성을 전달할 수 있음 model_properties=CompletionModelProperties(temperature=0.6), ) # llm_answer 열에 응답 생성 # _completion_error 열에 응답과 관련된 문제가 발생한 경우 생성 answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # .cache() if doing further operations # 결과 저장 output.write_dataframe(answered)
이렇게 하면 제공한 이름(llm_answer
)을 가진 답변 열과 요청이 실패한 경우를 나타내는 오류 열(_completion_error
)이 생성됩니다.
예를 들어 이 변환의 결과물은 다음과 같습니다:
id | question | question_prompt | llm_answer | _completion_error |
---|---|---|---|---|
1 | What is the capital of Canada? | 이 질문에 답하세요: 캐나다의 수도는 어디입니까? | 오타와 | null |
2 | Which country has the largest population? | 이 질문에 답하세요: 인구가 가장 많은 나라는 어디입니까? | 중국 | null |
3 | Name the longest river in South America. | 이 질문에 답하세요: 남미에서 가장 긴 강의 이름은 무엇입니까? | 아마존 강 | null |
4 | How many states are there in the United States? | 이 질문에 답하세요: 미국에는 총 몇 개의 주가 있습니까? | 50 | null |
5 | What is the name of the largest ocean on Earth? | 이 질문에 답하세요: 지구에서 가장 큰 바다의 이름은 무엇입니까? | 태평양 | null |
임베딩의 경우, 오케스트레이터를 사용하는 코드 구조는 Completion 오케스트레이터와 유사합니다. 아래와 같습니다:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput # 매 분당 요청 수 제한 RATE_LIMIT_PER_MIN = 100 # 매 분당 토큰 수 제한 TOKEN_LIMIT_PER_MIN = 50000 @configure(["NUM_EXECUTORS_2"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 임베딩 모델 사용을 위해 임포트해야 함 embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. 500개의 질문을 가져옵니다 sample_questions = questions.dataframe().limit(500) # 2. 오케스트레이터를 인스턴스화합니다 embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. 임베딩을 실행합니다 # 임베딩 결과를 embedding_result 컬럼에 생성합니다 # 문제가 발생하면 _embeddings_error 컬럼에 기록합니다 questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. 결과를 저장합니다 output.write_dataframe(questions_with_embeddings)
완료 오케스트레이터와 마찬가지로, 임베딩 오케스트레이터는 임베딩 응답 열(embedding_result
)과 오류 열(_embeddings_error
)을 생성합니다:
id | question | embedding_result | _embeddings_error |
---|---|---|---|
1 | 캐나다의 수도는 무엇입니까? | [0.12, 0.54, ...] | null |
2 | 인구가 가장 많은 나라는 어디입니까? | [-0.23, 0.87, ...] | null |
3 | 남아메리카에서 가장 긴 강의 이름을 말하십시오. | [0.65, -0.48, ...] | null |
4 | 미국에는 몇 개의 주가 있습니까? | [0.33, 0.92, ...] | null |
5 | 지구상에서 가장 큰 바다의 이름은 무엇입니까? | [-0.11, 0.34, ...] | null |
실행에 대한 일반적인 정보를 얻으려면, 다음 예제와 같이 오케스트레이터의 생성자에 verbose=True
인수를 전달하십시오:
Copied!1 2 3 4 5 6 7 8 9
# CompletionOrchestrator 인스턴스를 생성합니다. # 이 인스턴스는 주어진 속도 제한(RATE_LIMIT_PER_MIN), 토큰 제한(TOKEN_LIMIT_PER_MIN), 채팅 모델(chat_model)을 사용하여 작동합니다. # verbose=True는 자세한 로깅을 활성화합니다. completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, verbose=True, )
이는 실행을 이해하는 데 도움이 되는 메타데이터 열을 결과에 추가합니다. 이러한 열에는 요청이 실행된 파티션, 요청의 타임스탬프 및 사용된 토큰이 포함됩니다.
파이프라인 후반부에서 DataFrame에 추가 작업을 수행하는 경우, 오케스트레이터 호출 끝에 .cache()
를 추가하는 것이 때때로 필요합니다.
예를 들면:
Copied!1 2 3 4 5 6
# 'completions' 데이터프레임에 컬럼을 추가하고, 결과를 'result' 변수에 저장합니다. # ctx: 컨텍스트 변수 # questions_with_prompt: 질문과 프롬프트가 포함된 데이터프레임 # "question_prompt": 새로운 컬럼의 이름 # "llm_answer": 추가할 데이터의 컬럼 이름 result = completions.withColumn(ctx, questions_with_prompt, "question_prompt", "llm_answer").cache()
이 액션은 Spark Optimizer가 최적화의 일부로 호출을 실수로 제거하지 않도록 보장합니다. 응답이나 오류 없이 빈 행을 만나게 되면, 이는 종종 .cache()
를 사용해야 하는 상황을 나타냅니다.