Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.reducto.ai/llms.txt

Use this file to discover all available pages before exploring further.

The Go SDK supports concurrent document processing using goroutines and async job management with RunJob methods.

Concurrent Processing

Process multiple documents simultaneously with goroutines:

Parse Multiple Documents

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "sync"

    reducto "github.com/reductoai/reducto-go-sdk"
    "github.com/reductoai/reducto-go-sdk/option"
    "github.com/reductoai/reducto-go-sdk/shared"
)

func main() {
    client := reducto.NewClient(option.WithAPIKey(os.Getenv("REDUCTO_API_KEY")))
    files := []string{"doc1.pdf", "doc2.pdf", "doc3.pdf"}

    var wg sync.WaitGroup
    results := make([]*shared.ParseResponse, len(files))
    errors := make([]error, len(files))

    for i, filePath := range files {
        wg.Add(1)
        go func(idx int, path string) {
            defer wg.Done()

            file, err := os.Open(path)
            if err != nil {
                errors[idx] = err
                return
            }
            defer file.Close()

            upload, err := client.Upload(context.Background(), reducto.UploadParams{
                File: reducto.F[io.Reader](file),
            })
            if err != nil {
                errors[idx] = err
                return
            }

            result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
                ParseConfig: reducto.ParseConfigParam{
                    DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
                        shared.UnionString(upload.FileID),
                    ),
                },
            })
            results[idx] = result
            errors[idx] = err
        }(i, filePath)
    }

    wg.Wait()

    for i, result := range results {
        if errors[i] != nil {
            fmt.Printf("%s: Error - %v\n", files[i], errors[i])
        } else {
            fmt.Printf("%s: %d pages\n", files[i], result.Usage.NumPages)
        }
    }
}

Rate Limiting

Control concurrency with a semaphore pattern:
func processWithLimit(client *reducto.Client, files []string, maxConcurrent int) {
    sem := make(chan struct{}, maxConcurrent)
    var wg sync.WaitGroup

    for _, filePath := range files {
        wg.Add(1)
        sem <- struct{}{} // Acquire

        go func(path string) {
            defer wg.Done()
            defer func() { <-sem }() // Release

            file, _ := os.Open(path)
            defer file.Close()

            upload, _ := client.Upload(context.Background(), reducto.UploadParams{
                File: reducto.F[io.Reader](file),
            })

            result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
                ParseConfig: reducto.ParseConfigParam{
                    DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
                        shared.UnionString(upload.FileID),
                    ),
                },
            })

            if err != nil {
                fmt.Printf("%s: Error - %v\n", path, err)
            } else {
                fmt.Printf("%s: %d pages\n", path, result.Usage.NumPages)
            }
        }(filePath)
    }

    wg.Wait()
}

// Process 100 files, max 5 at a time
processWithLimit(client, fileList, 5)

Async Jobs (RunJob)

For documents that take longer to process, use RunJob methods to avoid timeouts:

Start Async Job

// Start job without waiting for completion
job, err := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
    DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
})
if err != nil {
    return err
}
fmt.Printf("Job started: %s\n", job.JobID)

Poll for Completion

import "time"

func waitForJob(ctx context.Context, client *reducto.Client, jobID string) (*reducto.JobGetResponse, error) {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-ticker.C:
            job, err := client.Job.Get(ctx, jobID)
            if err != nil {
                return nil, err
            }

            switch job.Status {
            case "Completed":
                return job, nil
            case "Failed":
                return nil, fmt.Errorf("job failed: %s", job.Reason)
            default:
                fmt.Printf("Status: %s, Progress: %.0f%%\n", job.Status, job.Progress)
            }
        }
    }
}

// Usage
job, _ := client.Parse.RunJob(context.Background(), params)
result, err := waitForJob(context.Background(), client, job.JobID)

All Async Methods

Each endpoint has a RunJob variant:
// Parse
parseJob, _ := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
    DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
})

// Extract
extractJob, _ := client.Extract.RunJob(context.Background(), reducto.ExtractRunJobParams{
    DocumentURL: reducto.F[reducto.ExtractRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
    Schema: reducto.F[interface{}](schema),
})

// Split
splitJob, _ := client.Split.RunJob(context.Background(), reducto.SplitRunJobParams{
    DocumentURL: reducto.F[reducto.SplitRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
    SplitDescription: reducto.F(splitDesc),
})

Context Cancellation

Use context for timeouts and cancellation:
// Set timeout for entire operation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

result, err := client.Parse.Run(ctx, params)
if err != nil {
    if ctx.Err() == context.DeadlineExceeded {
        fmt.Println("Operation timed out")
    }
    return err
}

Error Handling

Handle errors in concurrent operations:
type result struct {
    file   string
    result *shared.ParseResponse
    err    error
}

func batchProcess(client *reducto.Client, files []string) []result {
    results := make(chan result, len(files))
    var wg sync.WaitGroup

    for _, file := range files {
        wg.Add(1)
        go func(path string) {
            defer wg.Done()
            // ... process file ...
            results <- result{file: path, result: res, err: err}
        }(file)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    var output []result
    for r := range results {
        output = append(output, r)
        if r.err != nil {
            fmt.Printf("Failed: %s - %v\n", r.file, r.err)
        } else {
            fmt.Printf("Success: %s - %d pages\n", r.file, r.result.Usage.NumPages)
        }
    }
    return output
}

When to Use Async Jobs

Use RunJob When

Processing large documents (50+ pages), running long extract operations, or integrating with webhooks.

Use Run When

Processing small documents, need immediate results, or simple scripts where waiting is acceptable.

Next Steps