Skip to main content
The AsyncReducto client provides the same interface as the sync client, but allows concurrent document processing. Use it when you need to process multiple documents simultaneously.

Basic Usage

import asyncio
from pathlib import Path
from reducto import AsyncReducto

async def main():
    client = AsyncReducto()  # Same initialization as sync
    
    upload = await client.upload(file=Path("document.pdf"))
    result = await client.parse.run(input=upload.file_id)
    
    for chunk in result.result.chunks:
        print(chunk.content)

asyncio.run(main())
The async client mirrors the sync client exactly. Every method is available; just add await.

Concurrent Processing

The real value of async is processing multiple documents simultaneously with asyncio.gather:

Parse Multiple Documents

import asyncio
from pathlib import Path
from reducto import AsyncReducto

async def process_batch():
    client = AsyncReducto()
    files = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
    
    # Upload all files concurrently
    uploads = await asyncio.gather(*[
        client.upload(file=Path(f)) for f in files
    ])
    
    # Parse all documents concurrently
    results = await asyncio.gather(*[
        client.parse.run(input=upload.file_id) for upload in uploads
    ])
    
    return results

results = asyncio.run(process_batch())

Extract from Multiple Documents

async def extract_batch():
    client = AsyncReducto()
    files = ["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"]
    
    schema = {
        "type": "object",
        "properties": {
            "invoice_number": {"type": "string"},
            "total": {"type": "number"}
        }
    }
    
    uploads = await asyncio.gather(*[
        client.upload(file=Path(f)) for f in files
    ])
    
    results = await asyncio.gather(*[
        client.extract.run(
            input=upload.file_id,
            instructions={"schema": schema}
        ) for upload in uploads
    ])
    
    return results

Split Multiple Documents

async def split_batch():
    client = AsyncReducto()
    files = ["report1.pdf", "report2.pdf", "report3.pdf"]
    
    split_desc = [
        {"name": "Summary", "description": "Executive summary"},
        {"name": "Details", "description": "Detailed content"}
    ]
    
    uploads = await asyncio.gather(*[
        client.upload(file=Path(f)) for f in files
    ])
    
    results = await asyncio.gather(*[
        client.split.run(
            input=upload.file_id,
            split_description=split_desc
        ) for upload in uploads
    ])
    
    return results

Fill Multiple Forms

async def fill_forms_batch():
    client = AsyncReducto()
    
    forms = [
        (Path("form1.pdf"), "Fill name with 'Alice'"),
        (Path("form2.pdf"), "Fill name with 'Bob'"),
        (Path("form3.pdf"), "Fill name with 'Charlie'"),
    ]
    
    async def fill_form(file_path, instructions):
        upload = await client.upload(file=file_path)
        return await client.edit.run(
            document_url=upload.file_id,
            edit_instructions=instructions
        )
    
    results = await asyncio.gather(*[
        fill_form(path, instr) for path, instr in forms
    ])
    
    return results

Run Pipeline on Multiple Documents

async def pipeline_batch():
    client = AsyncReducto()
    files = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
    pipeline_id = "your_pipeline_id"
    
    uploads = await asyncio.gather(*[
        client.upload(file=Path(f)) for f in files
    ])
    
    results = await asyncio.gather(*[
        client.pipeline.run(
            input=upload.file_id,
            pipeline_id=pipeline_id
        ) for upload in uploads
    ])
    
    return results

Rate Limiting

Control concurrency with semaphores to avoid overwhelming the API:
import asyncio
from reducto import AsyncReducto

async def process_with_rate_limit(files: list[str], max_concurrent: int = 5):
    client = AsyncReducto()
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_one(file_path):
        async with semaphore:
            upload = await client.upload(file=Path(file_path))
            return await client.parse.run(input=upload.file_id)
    
    results = await asyncio.gather(*[
        process_one(f) for f in files
    ])
    
    return results

# Process 100 files, max 5 at a time
results = asyncio.run(process_with_rate_limit(file_list, max_concurrent=5))

Context Manager

Use the context manager to ensure proper cleanup:
async def main():
    async with AsyncReducto() as client:
        upload = await client.upload(file=Path("document.pdf"))
        result = await client.parse.run(input=upload.file_id)
        return result

result = asyncio.run(main())

Error Handling

Error handling works the same as the sync client:
import reducto

async def safe_process():
    client = AsyncReducto()
    
    try:
        upload = await client.upload(file=Path("document.pdf"))
        result = await client.parse.run(input=upload.file_id)
        return result
    except reducto.APIConnectionError as e:
        print(f"Connection failed: {e}")
    except reducto.RateLimitError as e:
        print(f"Rate limited: {e}")
    except reducto.APIStatusError as e:
        print(f"API error: {e.status_code}")
For batch processing, use return_exceptions=True to handle failures gracefully:
async def batch_with_error_handling(files):
    client = AsyncReducto()
    
    async def process_one(file_path):
        upload = await client.upload(file=Path(file_path))
        return await client.parse.run(input=upload.file_id)
    
    results = await asyncio.gather(
        *[process_one(f) for f in files],
        return_exceptions=True  # Don't fail entire batch on one error
    )
    
    for file_path, result in zip(files, results):
        if isinstance(result, Exception):
            print(f"Failed: {file_path} - {result}")
        else:
            print(f"Success: {file_path} - {result.usage.num_pages} pages")
    
    return results

Advanced Features

Raw Response Access

async def get_raw_response():
    client = AsyncReducto()
    response = await client.parse.with_raw_response.run(input=upload.file_id)
    print(response.headers)
    return response.parse()

Streaming Response

async def stream_response():
    client = AsyncReducto()
    async with client.parse.with_streaming_response.run(input=upload.file_id) as response:
        async for line in response.iter_lines():
            print(line)

When to Use Async

Use Async When

Processing 10+ documents, building web services, or integrating with async frameworks (FastAPI, aiohttp).

Use Sync When

Processing single documents, simple scripts, or when async adds unnecessary complexity.

Next Steps