Deploy extraction strategies to Python transforms

After validating your extraction strategy in AIP Document Intelligence, you can deploy it as a Python transform to run batch extraction across all pages of all documents in a media set. The deployed template produces the same results as the corresponding configuration in AIP Document Intelligence.

Using the deployed template

Once the template is created in Code Repositories:

  1. Specify your output dataset in the @transform.using decorator in the src/myproject/document_extraction/my_extraction.py file.
  2. Trigger the build.

The template uses lightweight transforms for optimal performance. Legacy versions used Spark-based transforms, which are much slower due to Spark overhead. We recommend migrating to lightweight transforms if you have not already.

Preview mode is not yet supported in this template. Errors are expected when using preview, but the actual build will work correctly.

Incremental processing

By default, the document extraction transform is non-incremental, meaning that all documents are processed on every run. You can configure the transform to run incrementally by uncommenting the @incremental(...) decorator line. For an incremental transform, when new documents are added to the input media set, re-running the transform will only process the new documents and append results to the output dataset.

Customizing the prompt

For generative AI configurations, the template inherits the prompt you specified in AIP Document Intelligence. You can view the prompts in src/myproject/document_extraction/prompts.py.

We do not recommend editing prompts directly in the template, as this causes discrepancies between Document Intelligence results and batch job results. Instead, make prompt adjustments in Document Intelligence, verify the results there, then redeploy to create a new template.

Transform input typeCustomizable prompt
VisionLLMDocumentsExtractorInputUser prompt only (system prompt is fixed)
VisionLLMLayoutDocumentsExtractorInput (layout-aware extraction)System prompt only (user prompt is fixed)

For layout-aware extraction configurations, the user prompt must remain fixed because it contains a special JSON schema that preserves layout structure information. Modifying this prompt will significantly reduce extraction success rates.

Custom image preprocessing

For documents that require image transformations before extraction, such as rotated text content, you should:

  1. Create a separate transform pipeline to apply the image transformations.
  2. Save the processed results to a new media set.
  3. Use Document Intelligence on the processed media set for extraction.

Run on a subset of media items

For layout-aware generative AI configurations, the vision LLM must generate valid JSON adhering to a specific schema. If the response is invalid JSON or does not follow the schema, the extraction fails with an ERROR_RESPONSE_JSON_PARSING error.

In practice, approximately 5% of extractions may fail with top-tier models. For failed rows, you still get valid layoutInfo with extraction results from the layout model only.

To re-run extraction on failed rows:

  1. Use the filter_on_media_items argument with a list of media item IDs to process only specific items.
  2. Remove the @incremental decorator so these rows are re-processed instead of being detected as finished.

Improve runtime performance

The THREAD_NUMBER parameter controls concurrent threads, where each thread extracts data from one document page at a time. Higher values result in faster job completion.

SettingValueNotes
Default20Conservative setting suitable for most environments
Maximum tested300Achievable in development environments with abundant Vision LLM capacity

Setting the THREAD_NUMBER value too high in capacity-restricted environments will cause rate limit errors. The retry loop then consumes significant capacity, affecting other jobs using the same model. You should monitor usage when adjusting this parameter.

Find logs

To view build logs, select Telemetry on the build details page. To filter for document extraction logs, filter the message column for values starting with aip_workflows.

Row-level vs. document-level chunking

The extraction output contains one row per page. By default, DocumentChunker.create_chunks_per_document combines all pages from the same document into a single Markdown string before chunking.

To chunk each row independently without combining pages, use DocumentChunker.create_chunks_per_row instead:

Copied!
1 2 3 4 5 6 7 8 9 10 chunking_result = chunker.create_chunks_per_row( extraction_df, chunk_mode="markdown", # "recursive" for plain text, "markdown" for markdown text content_column="extractionResult", id_column="media_item_rid", # used as prefix for chunk_id chunk_size=8192, chunk_overlap=0, thread_number=20, strip_markdown=False, # set True to remove ```markdown and ``` wrappers before chunking )

Create embeddings without chunking

Chunking is recommended before creating embeddings because embedding models have context limits. To skip chunking while preserving the pipeline structure, set chunk_size to a very large value in create_chunks_per_row, such as sys.maxsize.

Template examples

The following examples show the transform code generated for each extraction configuration. These are for reference only. You should create the transform using the deploy tool in Document Intelligence instead of manually writing transform code for document extraction.

Traditional extraction: Raw text

Extracts text by reading document metadata. Only available for electronically generated PDFs.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with raw text extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_document_to_text_raw( media_item_rid, page_num ).read().decode("utf-8") return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)

Traditional extraction: OCR

Uses traditional Optical Character Recognition (OCR) to extract text without preserving layout information.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with OCR text extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_document_to_text_ocr_output_text( media_item_rid, page_num ).read().decode("utf-8") return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)

Traditional extraction: Layout-aware OCR

Uses advanced OCR with bounding boxes to preserve document layout and structure.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with layout-aware OCR extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_media_item(media_item_rid, str(page_num), { "type": "documentToText", "documentToText": { "operation": { "type": "extractLayoutAwareContent", "extractLayoutAwareContent": { "parameters": { "languages": ["ENG"] } } } } }) extraction_result = str(extraction_result.json()) return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)

Generative AI extraction: Basic

Uses a vision language model to extract content as Markdown without preprocessing.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput from .prompts import USER_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, with_ocr=False, prompt=USER_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Generative AI extraction: With OCR preprocessing

Uses a vision language model with OCR preprocessing for improved extraction on complex documents.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput from .prompts import USER_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, with_ocr=True, prompt=USER_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Generative AI extraction: Layout-aware

Uses a vision language model with layout-aware OCR preprocessing, returning layout information alongside extracted content.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput from .prompts import SYSTEM_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMLayoutDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, include_layout_info="no_overlay", system_prompt=SYSTEM_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Generative AI extraction: Layout-aware with table cropping

Uses a vision language model with layout-aware OCR preprocessing and table cropping for improved table extraction accuracy.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput from .prompts import SYSTEM_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMLayoutDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, include_layout_info="crop_tables", system_prompt=SYSTEM_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)

Chunk extracted text and generate embeddings

If embedding is not needed, remove the embedder from the transform decorator and the embedding_result line.

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from transforms.api import Input, Output, incremental, transform from aip_workflows.document_intelligence.transforms import DocumentChunker, DocumentEmbedderInput THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["embedder"]) # uncomment this line if incremental is needed @transform.using( extraction_input=Input("ri.foundry.main.dataset.abc"), # typically the output dataset from the extraction transform output=Output("ri.foundry.main.dataset.xyz"), embedder=DocumentEmbedderInput("ri.language-model-service..language-model.text-embedding-3-large"), ) def chunk_and_embed(extraction_input, output, embedder): extraction_df = extraction_input.polars(lazy=True) chunker = DocumentChunker() chunking_result = chunker.create_chunks_per_document( extraction_df, chunk_mode="markdown", # "recursive" for raw text, "markdown" for markdown text content_column="extractionResult", # content column name id_column="media_item_rid", # id of the document, used to combine content (e.g. from different pages) of the single document page_column="page_num", # page number column name chunk_size=8192, chunk_overlap=0, thread_number=THREAD_NUMBER, strip_markdown=True, # when True, removes ```markdown prefix and ``` suffix from the content before chunking ) embedding_result = embedder.create_embeddings( chunking_result, content_column="chunk_content", thread_number=THREAD_NUMBER, ) output.write_dataframe(embedding_result)