Pipelines are workflows built by chaining multiple API calls together. The /extract and /split calls can take in a job_id output from a parse endpoint preventing parsing the same document multiple times — this can enable workflows that were previously too high latency or expensive to manage. Common pipelines look like this:
  • Parse —> Split —> Extract
  • Parse —> Extract
Below is an example using the /extract and /extract_async, where we’d like to extract two different sets of information from the same set of documents.

Problem

Lets say we have users uploading documents of two types: W2 forms and Passports. We’d like to extract different sets of information from each set of documents:
  1. For the passport: Passport number, name, and DOB.
  2. For the W2: Total Wages, Calendar Year, Employer Name
Without pipelining, we would have to either provide a schema that has fields for both types of documents, or we would have to make three separate extraction calls. Without pipelining, this would mean a lot of extra latency and being charged 6x for the same jobs. Pipeline Diagram

Example code snippet

The code snippet below walks through the approach.
  1. Parse the document.
  2. Use the Parse job_id to perform the classification extraction to determine if the document is a W-2 or Passport.
  3. Use the Parse job_id to perform the data extraction for the corresponding type of document.
from reducto import Reducto

client = Reducto(api_key=os.environ.get('REDUCTO_API_KEY'), timeout=300)
document_url = "https://support.adp.com/adp_payroll/content/hybrid/PDF/W2_Interactive.pdf"

# Step 1: Parse
parse_response = client.parse.run(document_url=document_url)
job_id = parse_response.job_id

# Step 2: Classify document type
classification = client.extract.run(
    document_url=f"jobid://{job_id}",
    schema={
        "type": "object",
        "properties": {
            "document_type": {"type": "string", "enum": ["W2", "Passport", "Other"]}
        },
        "required": ["document_type"],
    },
)
print(classification.result)

document_type = classification.result[0]["document_type"]

# Step 3: Choose schema based on classification
if document_type == "W2":
    schema = {
        "type": "object",
        "properties": {
            "total_wages": {"type": "number"},
            "calendar_year": {"type": "integer"},
            "employer_name": {"type": "string"}
        },
        "required": ["total_wages", "calendar_year", "employer_name"]
    }
elif document_type == "Passport":
    schema = {
        "type": "object",
        "properties": {
            "passport_number": {"type": "string"},
            "name": {"type": "string"},
            "date_of_birth": {"type": "string"},
        },
    }
else:
    raise ValueError(f"Unsupported document type: {document_type}")

# Step 4: Extract structured fields
extract_response = client.extract.run(
    document_url=f"jobid://{job_id}",
    schema=schema,
)

print(extract_response.result)