Skip to main content
The Go SDK supports concurrent document processing using goroutines and async job management with RunJob methods.

Concurrent Processing

Process multiple documents simultaneously with goroutines:

Parse Multiple Documents

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "sync"

    reducto "github.com/reductoai/reducto-go-sdk"
    "github.com/reductoai/reducto-go-sdk/option"
    "github.com/reductoai/reducto-go-sdk/shared"
)

func main() {
    client := reducto.NewClient(option.WithAPIKey(os.Getenv("REDUCTO_API_KEY")))
    files := []string{"doc1.pdf", "doc2.pdf", "doc3.pdf"}

    var wg sync.WaitGroup
    results := make([]*shared.ParseResponse, len(files))
    errors := make([]error, len(files))

    for i, filePath := range files {
        wg.Add(1)
        go func(idx int, path string) {
            defer wg.Done()

            file, err := os.Open(path)
            if err != nil {
                errors[idx] = err
                return
            }
            defer file.Close()

            upload, err := client.Upload(context.Background(), reducto.UploadParams{
                File: reducto.F[io.Reader](file),
            })
            if err != nil {
                errors[idx] = err
                return
            }

            result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
                ParseConfig: reducto.ParseConfigParam{
                    DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
                        shared.UnionString(upload.FileID),
                    ),
                },
            })
            results[idx] = result
            errors[idx] = err
        }(i, filePath)
    }

    wg.Wait()

    for i, result := range results {
        if errors[i] != nil {
            fmt.Printf("%s: Error - %v\n", files[i], errors[i])
        } else {
            fmt.Printf("%s: %d pages\n", files[i], result.Usage.NumPages)
        }
    }
}

Rate Limiting

Control concurrency with a semaphore pattern:
func processWithLimit(client *reducto.Client, files []string, maxConcurrent int) {
    sem := make(chan struct{}, maxConcurrent)
    var wg sync.WaitGroup

    for _, filePath := range files {
        wg.Add(1)
        sem <- struct{}{} // Acquire

        go func(path string) {
            defer wg.Done()
            defer func() { <-sem }() // Release

            file, _ := os.Open(path)
            defer file.Close()

            upload, _ := client.Upload(context.Background(), reducto.UploadParams{
                File: reducto.F[io.Reader](file),
            })

            result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
                ParseConfig: reducto.ParseConfigParam{
                    DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
                        shared.UnionString(upload.FileID),
                    ),
                },
            })

            if err != nil {
                fmt.Printf("%s: Error - %v\n", path, err)
            } else {
                fmt.Printf("%s: %d pages\n", path, result.Usage.NumPages)
            }
        }(filePath)
    }

    wg.Wait()
}

// Process 100 files, max 5 at a time
processWithLimit(client, fileList, 5)

Async Jobs (RunJob)

For documents that take longer to process, use RunJob methods to avoid timeouts:

Start Async Job

// Start job without waiting for completion
job, err := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
    DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
})
if err != nil {
    return err
}
fmt.Printf("Job started: %s\n", job.JobID)

Poll for Completion

import "time"

func waitForJob(ctx context.Context, client *reducto.Client, jobID string) (*reducto.JobGetResponse, error) {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case <-ticker.C:
            job, err := client.Job.Get(ctx, jobID)
            if err != nil {
                return nil, err
            }

            switch job.Status {
            case "Completed":
                return job, nil
            case "Failed":
                return nil, fmt.Errorf("job failed: %s", job.Reason)
            default:
                fmt.Printf("Status: %s, Progress: %.0f%%\n", job.Status, job.Progress)
            }
        }
    }
}

// Usage
job, _ := client.Parse.RunJob(context.Background(), params)
result, err := waitForJob(context.Background(), client, job.JobID)

All Async Methods

Each endpoint has a RunJob variant:
// Parse
parseJob, _ := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
    DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
})

// Extract
extractJob, _ := client.Extract.RunJob(context.Background(), reducto.ExtractRunJobParams{
    DocumentURL: reducto.F[reducto.ExtractRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
    Schema: reducto.F[interface{}](schema),
})

// Split
splitJob, _ := client.Split.RunJob(context.Background(), reducto.SplitRunJobParams{
    DocumentURL: reducto.F[reducto.SplitRunJobParamsDocumentURLUnion](
        shared.UnionString(upload.FileID),
    ),
    SplitDescription: reducto.F(splitDesc),
})

Context Cancellation

Use context for timeouts and cancellation:
// Set timeout for entire operation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

result, err := client.Parse.Run(ctx, params)
if err != nil {
    if ctx.Err() == context.DeadlineExceeded {
        fmt.Println("Operation timed out")
    }
    return err
}

Error Handling

Handle errors in concurrent operations:
type result struct {
    file   string
    result *shared.ParseResponse
    err    error
}

func batchProcess(client *reducto.Client, files []string) []result {
    results := make(chan result, len(files))
    var wg sync.WaitGroup

    for _, file := range files {
        wg.Add(1)
        go func(path string) {
            defer wg.Done()
            // ... process file ...
            results <- result{file: path, result: res, err: err}
        }(file)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    var output []result
    for r := range results {
        output = append(output, r)
        if r.err != nil {
            fmt.Printf("Failed: %s - %v\n", r.file, r.err)
        } else {
            fmt.Printf("Success: %s - %d pages\n", r.file, r.result.Usage.NumPages)
        }
    }
    return output
}

When to Use Async Jobs

Use RunJob When

Processing large documents (50+ pages), running long extract operations, or integrating with webhooks.

Use Run When

Processing small documents, need immediate results, or simple scripts where waiting is acceptable.

Next Steps