Documentation Index
Fetch the complete documentation index at: https://docs.reducto.ai/llms.txt
Use this file to discover all available pages before exploring further.
When you need to process many documents, batch processing lets you run multiple requests concurrently. This is faster than processing documents sequentially and more suitable for immediate results than webhooks.
When to use batch processing
| Approach | Best for |
|---|
| Batch processing (this page) | Processing many documents, need results immediately |
| Webhooks | Fire-and-forget, long documents, notification when done |
| Sequential | Simple scripts, debugging, rate-limited scenarios |
Async Python (recommended)
Use AsyncReducto with asyncio for the best performance. The semaphore controls concurrency to avoid overwhelming the API.
Processing URLs
If your documents are already hosted (S3, web server, etc.), process URLs directly:
import asyncio
from reducto import AsyncReducto
client = AsyncReducto()
async def batch_parse_urls(urls: list[str], max_concurrency: int = 50):
"""Parse multiple URLs concurrently."""
sem = asyncio.Semaphore(max_concurrency)
async def process(url: str):
async with sem:
result = await client.parse.run(input=url)
return {"url": url, "pages": result.usage.num_pages}
tasks = [process(url) for url in urls]
return await asyncio.gather(*tasks)
# Usage
urls = [
"https://example.com/doc1.pdf",
"https://example.com/doc2.pdf",
"https://example.com/doc3.pdf",
]
results = asyncio.run(batch_parse_urls(urls))
Processing local files
For local files, upload first then parse:
import asyncio
from pathlib import Path
from reducto import AsyncReducto
client = AsyncReducto()
async def batch_parse_files(files: list[Path], max_concurrency: int = 50):
"""Parse multiple local files concurrently."""
sem = asyncio.Semaphore(max_concurrency)
async def process(path: Path):
async with sem:
upload = await client.upload(file=path)
result = await client.parse.run(input=upload)
return {"file": path.name, "pages": result.usage.num_pages}
tasks = [process(path) for path in files]
return await asyncio.gather(*tasks)
# Usage
files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_files(files))
for r in results:
print(f"{r['file']}: {r['pages']} pages")
With progress bar
import asyncio
from pathlib import Path
from reducto import AsyncReducto
from tqdm.asyncio import tqdm
client = AsyncReducto()
async def batch_parse_with_progress(files: list[Path], max_concurrency: int = 50):
sem = asyncio.Semaphore(max_concurrency)
async def process(path: Path):
async with sem:
upload = await client.upload(file=path)
result = await client.parse.run(input=upload)
return {"file": path.name, "pages": result.usage.num_pages}
tasks = [process(path) for path in files]
return await tqdm.gather(*tasks, desc="Processing documents")
files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_with_progress(files))
With error handling
Some documents may fail (corrupt files, unsupported formats). Handle errors gracefully to avoid losing all results:
import asyncio
from pathlib import Path
from reducto import AsyncReducto
client = AsyncReducto()
async def batch_parse_safe(files: list[Path], max_concurrency: int = 50):
"""Parse files with error handling."""
sem = asyncio.Semaphore(max_concurrency)
async def process(path: Path):
async with sem:
try:
upload = await client.upload(file=path)
result = await client.parse.run(input=upload)
return {"file": path.name, "success": True, "pages": result.usage.num_pages}
except Exception as e:
return {"file": path.name, "success": False, "error": str(e)}
tasks = [process(path) for path in files]
results = await asyncio.gather(*tasks)
successes = [r for r in results if r["success"]]
failures = [r for r in results if not r["success"]]
print(f"Processed {len(successes)} successfully, {len(failures)} failed")
return results
files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_safe(files))
Sync Python with threading
If you can’t use async, use ThreadPoolExecutor with the synchronous client:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from reducto import Reducto
client = Reducto()
def batch_parse_sync(files: list[Path], max_workers: int = 10):
"""Parse files using thread pool."""
def process(path: Path):
upload = client.upload(file=path)
result = client.parse.run(input=upload)
return {"file": path.name, "pages": result.usage.num_pages}
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process, f): f for f in files}
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
results.append({"file": futures[future].name, "error": str(e)})
return results
files = list(Path("documents").glob("*.pdf"))
results = batch_parse_sync(files)
The same patterns work for extraction. Define your schema once and apply it to all documents:
import asyncio
from reducto import AsyncReducto
client = AsyncReducto()
INVOICE_SCHEMA = {
"type": "object",
"properties": {
"invoice_number": {"type": "string", "description": "Invoice number"},
"date": {"type": "string", "description": "Invoice date"},
"total": {"type": "number", "description": "Total amount"}
}
}
async def batch_extract(urls: list[str], schema: dict, max_concurrency: int = 20):
sem = asyncio.Semaphore(max_concurrency)
async def extract(url: str):
async with sem:
result = await client.extract.run(
input=url,
instructions={"schema": schema}
)
return {"url": url, "data": result.result}
tasks = [extract(url) for url in urls]
return await asyncio.gather(*tasks)
urls = ["https://example.com/invoice1.pdf", "https://example.com/invoice2.pdf"]
results = asyncio.run(batch_extract(urls, INVOICE_SCHEMA))
JavaScript / TypeScript
import Reducto from 'reductoai';
import fs from 'fs';
import { glob } from 'glob';
const client = new Reducto();
async function batchParse(files) {
const results = await Promise.all(
files.map(async (file) => {
try {
const upload = await client.upload({ file: fs.createReadStream(file) });
const result = await client.parse.run({ input: upload });
return { file, pages: result.usage.num_pages, success: true };
} catch (error) {
return { file, error: error.message, success: false };
}
})
);
return results;
}
const files = glob.sync('documents/*.pdf');
const results = await batchParse(files);
console.log(results);
Saving results
Save results as you process to avoid losing work:
import asyncio
import json
from pathlib import Path
from reducto import AsyncReducto
client = AsyncReducto()
async def batch_parse_and_save(files: list[Path], output_dir: Path, max_concurrency: int = 50):
output_dir.mkdir(exist_ok=True)
sem = asyncio.Semaphore(max_concurrency)
async def process(path: Path):
async with sem:
upload = await client.upload(file=path)
result = await client.parse.run(input=upload)
# Save immediately
output_path = output_dir / f"{path.stem}.json"
output_path.write_text(result.model_dump_json(indent=2))
return {"file": path.name, "output": str(output_path)}
tasks = [process(path) for path in files]
return await asyncio.gather(*tasks)
files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_and_save(files, Path("output")))
Concurrency limits
| Method | Recommended concurrency |
|---|
AsyncReducto | 50-200 concurrent requests |
ThreadPoolExecutor | 10-50 workers |
run_job() (webhooks) | Unlimited |
Higher concurrency means faster processing but may hit rate limits. Start with lower values and increase as needed.
What about cURL?
Batch processing requires programming constructs (loops, concurrency control, error handling) that aren’t practical in cURL. For single-document processing via cURL, see the API reference.
For batch workflows without writing code, consider:
Best practices
- Use async when possible:
AsyncReducto is more efficient than threading
- Handle errors gracefully: Don’t let one failure stop the entire batch
- Save incrementally: Write results to disk as they complete
- Monitor progress: Use
tqdm or logging to track progress
- Set reasonable concurrency: Start low (20-50) and increase if stable
For very large batches or long-running documents, consider webhooks instead. They’re better suited for fire-and-forget processing where you don’t need immediate results.