After validating your extraction strategy in AIP Document Intelligence, you can deploy it as a Python transform to run batch extraction across all pages of all documents in a media set. The deployed template produces the same results as the corresponding configuration in AIP Document Intelligence.
Once the template is created in Code Repositories:
@transform.using decorator in the src/myproject/document_extraction/my_extraction.py file.The template uses lightweight transforms for optimal performance. Legacy versions used Spark-based transforms, which are much slower due to Spark overhead. We recommend migrating to lightweight transforms if you have not already.
Preview mode is not yet supported in this template. Errors are expected when using preview, but the actual build will work correctly.
By default, the document extraction transform is non-incremental, meaning that all documents are processed on every run. You can configure the transform to run incrementally by uncommenting the @incremental(...) decorator line. For an incremental transform, when new documents are added to the input media set, re-running the transform will only process the new documents and append results to the output dataset.
For generative AI configurations, the template inherits the prompt you specified in AIP Document Intelligence. You can view the prompts in src/myproject/document_extraction/prompts.py.
We do not recommend editing prompts directly in the template, as this causes discrepancies between Document Intelligence results and batch job results. Instead, make prompt adjustments in Document Intelligence, verify the results there, then redeploy to create a new template.
| Transform input type | Customizable prompt |
|---|---|
VisionLLMDocumentsExtractorInput | User prompt only (system prompt is fixed) |
VisionLLMLayoutDocumentsExtractorInput (layout-aware extraction) | System prompt only (user prompt is fixed) |
For layout-aware extraction configurations, the user prompt must remain fixed because it contains a special JSON schema that preserves layout structure information. Modifying this prompt will significantly reduce extraction success rates.
For documents that require image transformations before extraction, such as rotated text content, you should:
For layout-aware generative AI configurations, the vision LLM must generate valid JSON adhering to a specific schema. If the response is invalid JSON or does not follow the schema, the extraction fails with an ERROR_RESPONSE_JSON_PARSING error.
In practice, approximately 5% of extractions may fail with top-tier models. For failed rows, you still get valid layoutInfo with extraction results from the layout model only.
To re-run extraction on failed rows:
filter_on_media_items argument with a list of media item IDs to process only specific items.@incremental decorator so these rows are re-processed instead of being detected as finished.The THREAD_NUMBER parameter controls concurrent threads, where each thread extracts data from one document page at a time. Higher values result in faster job completion.
| Setting | Value | Notes |
|---|---|---|
| Default | 20 | Conservative setting suitable for most environments |
| Maximum tested | 300 | Achievable in development environments with abundant Vision LLM capacity |
Setting the THREAD_NUMBER value too high in capacity-restricted environments will cause rate limit errors. The retry loop then consumes significant capacity, affecting other jobs using the same model. You should monitor usage when adjusting this parameter.
The following examples show the transform code generated for each extraction configuration. These are for reference only. You should create the transform using the deploy tool in Document Intelligence instead of manually writing transform code for document extraction.
Extracts text by reading document metadata. Only available for electronically generated PDFs.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with raw text extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_document_to_text_raw( media_item_rid, page_num ).read().decode("utf-8") return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)
Uses traditional Optical Character Recognition (OCR) to extract text without preserving layout information.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with OCR text extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_document_to_text_ocr_output_text( media_item_rid, page_num ).read().decode("utf-8") return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)
Uses advanced OCR with bounding boxes to preserve document layout and structure.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80import polars as pl from concurrent.futures import ThreadPoolExecutor from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH THREAD_NUMBER = 20 # @incremental(v2_semantics=True) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), ) def extract(media_input, output): """ Extracts content from pdf documents with layout-aware OCR extraction """ media_refs = pl.from_pandas( media_input.list_media_items_by_path_with_media_reference().pandas(), schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String}, ) def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame: def create_page_tasks(row): media_item_rid = row[MEDIA_ITEM_RID] metadata = media_input.get_media_item_metadata(media_item_rid).document if metadata is None: raise ValueError(f"Media item {media_item_rid} is not a document") if metadata.pages is None: raise ValueError(f"Media item {media_item_rid} has no page count") return [(row, page_num) for page_num in range(metadata.pages)] def process_single_page(task): row, page_num = task media_item_rid = row[MEDIA_ITEM_RID] media_reference = row[MEDIA_REFERENCE] extraction_result = media_input.transform_media_item(media_item_rid, str(page_num), { "type": "documentToText", "documentToText": { "operation": { "type": "extractLayoutAwareContent", "extractLayoutAwareContent": { "parameters": { "languages": ["ENG"] } } } } }) extraction_result = str(extraction_result.json()) return { "media_item_rid": media_item_rid, "media_reference": media_reference, "page_num": page_num, "extraction_result": extraction_result } all_tasks = [] for row in batch_df.iter_rows(named=True): all_tasks.extend(create_page_tasks(row)) with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor: results = list(executor.map(process_single_page, all_tasks)) return pl.DataFrame(results) extracted_data = media_refs.lazy().map_batches( process_batch, schema={ "media_item_rid": pl.String, "media_reference": pl.String, "page_num": pl.Int64, "extraction_result": pl.String, }, streamable=True, ) output.write_dataframe(extracted_data)
Uses a vision language model to extract content as Markdown without preprocessing.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput from .prompts import USER_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, with_ocr=False, prompt=USER_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)
Uses a vision language model with OCR preprocessing for improved extraction on complex documents.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput from .prompts import USER_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, with_ocr=True, prompt=USER_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)
Uses a vision language model with layout-aware OCR preprocessing, returning layout information alongside extracted content.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput from .prompts import SYSTEM_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMLayoutDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, include_layout_info="no_overlay", system_prompt=SYSTEM_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)
Uses a vision language model with layout-aware OCR preprocessing and table cropping for improved table extraction accuracy.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28from transforms.api import Output, incremental, transform from transforms.mediasets import MediaSetInput from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput from .prompts import SYSTEM_PROMPT THREAD_NUMBER = 20 # @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed @transform.using( output=Output("ri.foundry.main.dataset.abc"), media_input=MediaSetInput("ri.mio.main.media-set.abc"), extractor=VisionLLMLayoutDocumentsExtractorInput( "ri.language-model-service..language-model.anthropic-claude-xxx-sonnet" ), ) def extract(media_input, output, extractor): """ Extracts content from pdf documents as markdown. """ extracted_data = extractor.create_extraction( media_input, include_layout_info="crop_tables", system_prompt=SYSTEM_PROMPT, thread_number=THREAD_NUMBER ) output.write_dataframe(extracted_data)