Documentation Index Fetch the complete documentation index at: https://docs.reducto.ai/llms.txt
Use this file to discover all available pages before exploring further.
The AsyncReducto client provides the same interface as the sync client, but allows concurrent document processing. Use it when you need to process multiple documents simultaneously.
Basic Usage
import asyncio
from pathlib import Path
from reducto import AsyncReducto
async def main ():
client = AsyncReducto() # Same initialization as sync
upload = await client.upload( file = Path( "document.pdf" ))
result = await client.parse.run( input = upload.file_id)
for chunk in result.result.chunks:
print (chunk.content)
asyncio.run(main())
The async client mirrors the sync client exactly. Every method is available; just add await.
Concurrent Processing
The real value of async is processing multiple documents simultaneously with asyncio.gather:
Parse Multiple Documents
import asyncio
from pathlib import Path
from reducto import AsyncReducto
async def process_batch ():
client = AsyncReducto()
files = [ "doc1.pdf" , "doc2.pdf" , "doc3.pdf" ]
# Upload all files concurrently
uploads = await asyncio.gather( * [
client.upload( file = Path(f)) for f in files
])
# Parse all documents concurrently
results = await asyncio.gather( * [
client.parse.run( input = upload.file_id) for upload in uploads
])
return results
results = asyncio.run(process_batch())
async def extract_batch ():
client = AsyncReducto()
files = [ "invoice1.pdf" , "invoice2.pdf" , "invoice3.pdf" ]
schema = {
"type" : "object" ,
"properties" : {
"invoice_number" : { "type" : "string" },
"total" : { "type" : "number" }
}
}
uploads = await asyncio.gather( * [
client.upload( file = Path(f)) for f in files
])
results = await asyncio.gather( * [
client.extract.run(
input = upload.file_id,
instructions = { "schema" : schema}
) for upload in uploads
])
return results
Split Multiple Documents
async def split_batch ():
client = AsyncReducto()
files = [ "report1.pdf" , "report2.pdf" , "report3.pdf" ]
split_desc = [
{ "name" : "Summary" , "description" : "Executive summary" },
{ "name" : "Details" , "description" : "Detailed content" }
]
uploads = await asyncio.gather( * [
client.upload( file = Path(f)) for f in files
])
results = await asyncio.gather( * [
client.split.run(
input = upload.file_id,
split_description = split_desc
) for upload in uploads
])
return results
async def fill_forms_batch ():
client = AsyncReducto()
forms = [
(Path( "form1.pdf" ), "Fill name with 'Alice'" ),
(Path( "form2.pdf" ), "Fill name with 'Bob'" ),
(Path( "form3.pdf" ), "Fill name with 'Charlie'" ),
]
async def fill_form ( file_path , instructions ):
upload = await client.upload( file = file_path)
return await client.edit.run(
document_url = upload.file_id,
edit_instructions = instructions
)
results = await asyncio.gather( * [
fill_form(path, instr) for path, instr in forms
])
return results
Run Pipeline on Multiple Documents
async def pipeline_batch ():
client = AsyncReducto()
files = [ "doc1.pdf" , "doc2.pdf" , "doc3.pdf" ]
pipeline_id = "your_pipeline_id"
uploads = await asyncio.gather( * [
client.upload( file = Path(f)) for f in files
])
results = await asyncio.gather( * [
client.pipeline.run(
input = upload.file_id,
pipeline_id = pipeline_id
) for upload in uploads
])
return results
Rate Limiting
Control concurrency with semaphores to avoid overwhelming the API:
import asyncio
from reducto import AsyncReducto
async def process_with_rate_limit ( files : list[ str ], max_concurrent : int = 5 ):
client = AsyncReducto()
semaphore = asyncio.Semaphore(max_concurrent)
async def process_one ( file_path ):
async with semaphore:
upload = await client.upload( file = Path(file_path))
return await client.parse.run( input = upload.file_id)
results = await asyncio.gather( * [
process_one(f) for f in files
])
return results
# Process 100 files, max 5 at a time
results = asyncio.run(process_with_rate_limit(file_list, max_concurrent = 5 ))
Context Manager
Use the context manager to ensure proper cleanup:
async def main ():
async with AsyncReducto() as client:
upload = await client.upload( file = Path( "document.pdf" ))
result = await client.parse.run( input = upload.file_id)
return result
result = asyncio.run(main())
Error Handling
Error handling works the same as the sync client:
import reducto
async def safe_process ():
client = AsyncReducto()
try :
upload = await client.upload( file = Path( "document.pdf" ))
result = await client.parse.run( input = upload.file_id)
return result
except reducto.APIConnectionError as e:
print ( f "Connection failed: { e } " )
except reducto.RateLimitError as e:
print ( f "Rate limited: { e } " )
except reducto.APIStatusError as e:
print ( f "API error: { e.status_code } " )
For batch processing, use return_exceptions=True to handle failures gracefully:
async def batch_with_error_handling ( files ):
client = AsyncReducto()
async def process_one ( file_path ):
upload = await client.upload( file = Path(file_path))
return await client.parse.run( input = upload.file_id)
results = await asyncio.gather(
* [process_one(f) for f in files],
return_exceptions = True # Don't fail entire batch on one error
)
for file_path, result in zip (files, results):
if isinstance (result, Exception ):
print ( f "Failed: { file_path } - { result } " )
else :
print ( f "Success: { file_path } - { result.usage.num_pages } pages" )
return results
Advanced Features
Raw Response Access
async def get_raw_response ():
client = AsyncReducto()
response = await client.parse.with_raw_response.run( input = upload.file_id)
print (response.headers)
return response.parse()
Streaming Response
async def stream_response ():
client = AsyncReducto()
async with client.parse.with_streaming_response.run( input = upload.file_id) as response:
async for line in response.iter_lines():
print (line)
When to Use Async
Use Async When Processing 10+ documents, building web services, or integrating with async frameworks (FastAPI, aiohttp).
Use Sync When Processing single documents, simple scripts, or when async adds unnecessary complexity.
Next Steps