본 번역은 검증되지 않았습니다. AIP를 통해 영문원문으로부터 번역되었습니다.

AIP 오케스트레이터

transforms-aip 라이브러리는 언어 모델 API를 PySpark 워크플로에 통합하는 것을 단순화합니다. 이 라이브러리를 사용하면 사용자는 Spark DataFrame 내의 데이터를 사용하여 완성 및 임베딩 모델에 대한 요청을 생성할 수 있습니다. transforms-aip 라이브러리의 기능은 다음과 같습니다:

  • 요율 제한 관리: 요청 및 토큰 기반 요율 제한을 모두 처리합니다.
  • 분산된 작업량: Spark 작업의 실행자에게 작업을 균등하게 공유하여 Spark의 이점을 최대화합니다.
  • 최적화된 성능: 현재 요율 제한에서 허용하는 최대 속도로 요청을 보장합니다.
  • 오류 처리: 요청이 어떤 오류로 인해 실패하면 시스템이 실패에 대한 정보를 캡처하고 전체 빌드가 실패하는 것을 피하기 위해 실행을 계속합니다.

이러한 기능은 사용자가 최소한의 설정만으로 사용할 수 있습니다.

transforms-aip 라이브러리 설치하기

transforms-aip 라이브러리를 사용하기 시작하려면, 다음의 종속성을 transforms 저장소에 이 순서대로 설치해야 합니다:

  1. palantir_models>=0.933.0
  2. transforms-aip>=0.386.0

사용법

이 섹션에는 완성 예시임베딩 예시가 포함되어 있습니다.

완성 예시

완성을 생성하려는 주어진 데이터셋의 경우, 텍스트 열을 라이브러리에 공급하기만 하면 됩니다. 아래의 예에서는 question이라는 텍스트 열을 사용하였습니다:

idquestion
1What is the capital of Canada?
2Which country has the largest population?
3Name the longest river in South America.
4How many states are there in the United States?
5What is the name of the largest ocean on Earth?

아래는 완성을 실행하기 위한 전체 코드 스니펫입니다; 열이 추가될 때의 단계에 대한 주석에 주목하세요:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from pyspark.sql import functions as F from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import ( CompletionOrchestrator, CompletionModelProperties, ) from palantir_models.transforms import OpenAiGptChatLanguageModelInput RATE_LIMIT_PER_MIN = 100 TOKEN_LIMIT_PER_MIN = 50000 # 설정 @configure(["NUM_EXECUTORS_2"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 사용할 모델을 지정하고 레포지토리에서 가져오기 chat_model=OpenAiGptChatLanguageModelInput( "ri.language-model-service..language-model.gpt-35_azure" ), ) def compute(output, questions, chat_model, ctx): base_prompt = "Answer this question: " # 500개 질문 샘플링 sample_questions = questions.dataframe().limit(500) # 프롬프트 구성 # 오케스트레이터에 전달할 question_prompt 열 추가 questions_with_prompt = sample_questions.withColumn( "question_prompt", F.concat(F.lit(base_prompt), F.col("question")) ) # 오케스트레이터 생성 completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, # OpenAI 호환 속성을 전달할 수 있음 model_properties=CompletionModelProperties(temperature=0.6), ) # llm_answer 열에 응답 생성 # _completion_error 열에 응답과 관련된 문제가 발생한 경우 생성 answered = completions.withColumn( ctx, questions_with_prompt, "question_prompt", "llm_answer" ) # .cache() if doing further operations # 결과 저장 output.write_dataframe(answered)

이렇게 하면 제공한 이름(llm_answer)을 가진 답변 열과 요청이 실패한 경우를 나타내는 오류 열(_completion_error)이 생성됩니다.

예를 들어 이 변환의 결과물은 다음과 같습니다:

idquestionquestion_promptllm_answer_completion_error
1What is the capital of Canada?이 질문에 답하세요: 캐나다의 수도는 어디입니까?오타와null
2Which country has the largest population?이 질문에 답하세요: 인구가 가장 많은 나라는 어디입니까?중국null
3Name the longest river in South America.이 질문에 답하세요: 남미에서 가장 긴 강의 이름은 무엇입니까?아마존 강null
4How many states are there in the United States?이 질문에 답하세요: 미국에는 총 몇 개의 주가 있습니까?50null
5What is the name of the largest ocean on Earth?이 질문에 답하세요: 지구에서 가장 큰 바다의 이름은 무엇입니까?태평양null

임베딩 예제

임베딩의 경우, 오케스트레이터를 사용하는 코드 구조는 Completion 오케스트레이터와 유사합니다. 아래와 같습니다:

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from transforms.api import transform, Input, Output, configure from transforms.aip.orchestrators import EmbeddingOrchestrator from palantir_models.transforms import GenericEmbeddingModelInput # 매 분당 요청 수 제한 RATE_LIMIT_PER_MIN = 100 # 매 분당 토큰 수 제한 TOKEN_LIMIT_PER_MIN = 50000 @configure(["NUM_EXECUTORS_2"]) @transform( output=Output("output_dataset"), questions=Input("input_dataset"), # 임베딩 모델 사용을 위해 임포트해야 함 embedding_model=GenericEmbeddingModelInput( "ri.language-model-service..language-model.text-embedding-ada-002_azure" ), ) def compute(output, questions, embedding_model, ctx): # 1. 500개의 질문을 가져옵니다 sample_questions = questions.dataframe().limit(500) # 2. 오케스트레이터를 인스턴스화합니다 embeddings = EmbeddingOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, embedding_model, ) # 3. 임베딩을 실행합니다 # 임베딩 결과를 embedding_result 컬럼에 생성합니다 # 문제가 발생하면 _embeddings_error 컬럼에 기록합니다 questions_with_embeddings = embeddings.withColumn( ctx, sample_questions, "question", "embedding_result" ) # 4. 결과를 저장합니다 output.write_dataframe(questions_with_embeddings)

완료 오케스트레이터와 마찬가지로, 임베딩 오케스트레이터는 임베딩 응답 열(embedding_result)과 오류 열(_embeddings_error)을 생성합니다:

idquestionembedding_result_embeddings_error
1캐나다의 수도는 무엇입니까?[0.12, 0.54, ...]null
2인구가 가장 많은 나라는 어디입니까?[-0.23, 0.87, ...]null
3남아메리카에서 가장 긴 강의 이름을 말하십시오.[0.65, -0.48, ...]null
4미국에는 몇 개의 주가 있습니까?[0.33, 0.92, ...]null
5지구상에서 가장 큰 바다의 이름은 무엇입니까?[-0.11, 0.34, ...]null

디버깅

실행에 대한 일반적인 정보를 얻으려면, 다음 예제와 같이 오케스트레이터의 생성자에 verbose=True 인수를 전달하십시오:

Copied!
1 2 3 4 5 6 7 8 9 # CompletionOrchestrator 인스턴스를 생성합니다. # 이 인스턴스는 주어진 속도 제한(RATE_LIMIT_PER_MIN), 토큰 제한(TOKEN_LIMIT_PER_MIN), 채팅 모델(chat_model)을 사용하여 작동합니다. # verbose=True는 자세한 로깅을 활성화합니다. completions = CompletionOrchestrator( RATE_LIMIT_PER_MIN, TOKEN_LIMIT_PER_MIN, chat_model, verbose=True, )

이는 실행을 이해하는 데 도움이 되는 메타데이터 열을 결과에 추가합니다. 이러한 열에는 요청이 실행된 파티션, 요청의 타임스탬프 및 사용된 토큰이 포함됩니다.

일반적인 문제들

파이프라인 후반부에서 DataFrame에 추가 작업을 수행하는 경우, 오케스트레이터 호출 끝에 .cache()를 추가하는 것이 때때로 필요합니다.

예를 들면:

Copied!
1 2 3 4 5 6 # 'completions' 데이터프레임에 컬럼을 추가하고, 결과를 'result' 변수에 저장합니다. # ctx: 컨텍스트 변수 # questions_with_prompt: 질문과 프롬프트가 포함된 데이터프레임 # "question_prompt": 새로운 컬럼의 이름 # "llm_answer": 추가할 데이터의 컬럼 이름 result = completions.withColumn(ctx, questions_with_prompt, "question_prompt", "llm_answer").cache()

이 액션은 Spark Optimizer가 최적화의 일부로 호출을 실수로 제거하지 않도록 보장합니다. 응답이나 오류 없이 빈 행을 만나게 되면, 이는 종종 .cache()를 사용해야 하는 상황을 나타냅니다.