Skip to main content
When you need to process many documents, batch processing lets you run multiple requests concurrently. This is faster than processing documents sequentially and more suitable for immediate results than webhooks.

When to use batch processing

ApproachBest for
Batch processing (this page)Processing many documents, need results immediately
WebhooksFire-and-forget, long documents, notification when done
SequentialSimple scripts, debugging, rate-limited scenarios
Use AsyncReducto with asyncio for the best performance. The semaphore controls concurrency to avoid overwhelming the API.

Processing URLs

If your documents are already hosted (S3, web server, etc.), process URLs directly:
import asyncio
from reducto import AsyncReducto

client = AsyncReducto()

async def batch_parse_urls(urls: list[str], max_concurrency: int = 50):
    """Parse multiple URLs concurrently."""
    sem = asyncio.Semaphore(max_concurrency)
    
    async def process(url: str):
        async with sem:
            result = await client.parse.run(input=url)
            return {"url": url, "pages": result.usage.num_pages}
    
    tasks = [process(url) for url in urls]
    return await asyncio.gather(*tasks)

# Usage
urls = [
    "https://example.com/doc1.pdf",
    "https://example.com/doc2.pdf",
    "https://example.com/doc3.pdf",
]
results = asyncio.run(batch_parse_urls(urls))

Processing local files

For local files, upload first then parse:
import asyncio
from pathlib import Path
from reducto import AsyncReducto

client = AsyncReducto()

async def batch_parse_files(files: list[Path], max_concurrency: int = 50):
    """Parse multiple local files concurrently."""
    sem = asyncio.Semaphore(max_concurrency)
    
    async def process(path: Path):
        async with sem:
            upload = await client.upload(file=path)
            result = await client.parse.run(input=upload)
            return {"file": path.name, "pages": result.usage.num_pages}
    
    tasks = [process(path) for path in files]
    return await asyncio.gather(*tasks)

# Usage
files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_files(files))

for r in results:
    print(f"{r['file']}: {r['pages']} pages")

With progress bar

import asyncio
from pathlib import Path
from reducto import AsyncReducto
from tqdm.asyncio import tqdm

client = AsyncReducto()

async def batch_parse_with_progress(files: list[Path], max_concurrency: int = 50):
    sem = asyncio.Semaphore(max_concurrency)
    
    async def process(path: Path):
        async with sem:
            upload = await client.upload(file=path)
            result = await client.parse.run(input=upload)
            return {"file": path.name, "pages": result.usage.num_pages}
    
    tasks = [process(path) for path in files]
    return await tqdm.gather(*tasks, desc="Processing documents")

files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_with_progress(files))

With error handling

Some documents may fail (corrupt files, unsupported formats). Handle errors gracefully to avoid losing all results:
import asyncio
from pathlib import Path
from reducto import AsyncReducto

client = AsyncReducto()

async def batch_parse_safe(files: list[Path], max_concurrency: int = 50):
    """Parse files with error handling."""
    sem = asyncio.Semaphore(max_concurrency)
    
    async def process(path: Path):
        async with sem:
            try:
                upload = await client.upload(file=path)
                result = await client.parse.run(input=upload)
                return {"file": path.name, "success": True, "pages": result.usage.num_pages}
            except Exception as e:
                return {"file": path.name, "success": False, "error": str(e)}
    
    tasks = [process(path) for path in files]
    results = await asyncio.gather(*tasks)
    
    successes = [r for r in results if r["success"]]
    failures = [r for r in results if not r["success"]]
    
    print(f"Processed {len(successes)} successfully, {len(failures)} failed")
    return results

files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_safe(files))

Sync Python with threading

If you can’t use async, use ThreadPoolExecutor with the synchronous client:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from reducto import Reducto

client = Reducto()

def batch_parse_sync(files: list[Path], max_workers: int = 10):
    """Parse files using thread pool."""
    
    def process(path: Path):
        upload = client.upload(file=path)
        result = client.parse.run(input=upload)
        return {"file": path.name, "pages": result.usage.num_pages}
    
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process, f): f for f in files}
        for future in as_completed(futures):
            try:
                results.append(future.result())
            except Exception as e:
                results.append({"file": futures[future].name, "error": str(e)})
    
    return results

files = list(Path("documents").glob("*.pdf"))
results = batch_parse_sync(files)

Batch extraction

The same patterns work for extraction. Define your schema once and apply it to all documents:
import asyncio
from reducto import AsyncReducto

client = AsyncReducto()

INVOICE_SCHEMA = {
    "type": "object",
    "properties": {
        "invoice_number": {"type": "string", "description": "Invoice number"},
        "date": {"type": "string", "description": "Invoice date"},
        "total": {"type": "number", "description": "Total amount"}
    }
}

async def batch_extract(urls: list[str], schema: dict, max_concurrency: int = 20):
    sem = asyncio.Semaphore(max_concurrency)
    
    async def extract(url: str):
        async with sem:
            result = await client.extract.run(
                input=url,
                instructions={"schema": schema}
            )
            return {"url": url, "data": result.result}
    
    tasks = [extract(url) for url in urls]
    return await asyncio.gather(*tasks)

urls = ["https://example.com/invoice1.pdf", "https://example.com/invoice2.pdf"]
results = asyncio.run(batch_extract(urls, INVOICE_SCHEMA))

JavaScript / TypeScript

import Reducto from 'reductoai';
import fs from 'fs';
import { glob } from 'glob';

const client = new Reducto();

async function batchParse(files) {
  const results = await Promise.all(
    files.map(async (file) => {
      try {
        const upload = await client.upload({ file: fs.createReadStream(file) });
        const result = await client.parse.run({ input: upload });
        return { file, pages: result.usage.num_pages, success: true };
      } catch (error) {
        return { file, error: error.message, success: false };
      }
    })
  );
  return results;
}

const files = glob.sync('documents/*.pdf');
const results = await batchParse(files);
console.log(results);

Saving results

Save results as you process to avoid losing work:
import asyncio
import json
from pathlib import Path
from reducto import AsyncReducto

client = AsyncReducto()

async def batch_parse_and_save(files: list[Path], output_dir: Path, max_concurrency: int = 50):
    output_dir.mkdir(exist_ok=True)
    sem = asyncio.Semaphore(max_concurrency)
    
    async def process(path: Path):
        async with sem:
            upload = await client.upload(file=path)
            result = await client.parse.run(input=upload)
            
            # Save immediately
            output_path = output_dir / f"{path.stem}.json"
            output_path.write_text(result.model_dump_json(indent=2))
            
            return {"file": path.name, "output": str(output_path)}
    
    tasks = [process(path) for path in files]
    return await asyncio.gather(*tasks)

files = list(Path("documents").glob("*.pdf"))
results = asyncio.run(batch_parse_and_save(files, Path("output")))

Concurrency limits

MethodRecommended concurrency
AsyncReducto50-200 concurrent requests
ThreadPoolExecutor10-50 workers
run_job() (webhooks)Unlimited
Higher concurrency means faster processing but may hit rate limits. Start with lower values and increase as needed.

What about cURL?

Batch processing requires programming constructs (loops, concurrency control, error handling) that aren’t practical in cURL. For single-document processing via cURL, see the API reference. For batch workflows without writing code, consider:

Best practices

  1. Use async when possible: AsyncReducto is more efficient than threading
  2. Handle errors gracefully: Don’t let one failure stop the entire batch
  3. Save incrementally: Write results to disk as they complete
  4. Monitor progress: Use tqdm or logging to track progress
  5. Set reasonable concurrency: Start low (20-50) and increase if stable
For very large batches or long-running documents, consider webhooks instead. They’re better suited for fire-and-forget processing where you don’t need immediate results.