Documentation Index
Fetch the complete documentation index at: https://docs.reducto.ai/llms.txt
Use this file to discover all available pages before exploring further.
The Go SDK supports concurrent document processing using goroutines and async job management with RunJob methods.
Concurrent Processing
Process multiple documents simultaneously with goroutines:
Parse Multiple Documents
package main
import (
"context"
"fmt"
"io"
"os"
"sync"
reducto "github.com/reductoai/reducto-go-sdk"
"github.com/reductoai/reducto-go-sdk/option"
"github.com/reductoai/reducto-go-sdk/shared"
)
func main() {
client := reducto.NewClient(option.WithAPIKey(os.Getenv("REDUCTO_API_KEY")))
files := []string{"doc1.pdf", "doc2.pdf", "doc3.pdf"}
var wg sync.WaitGroup
results := make([]*shared.ParseResponse, len(files))
errors := make([]error, len(files))
for i, filePath := range files {
wg.Add(1)
go func(idx int, path string) {
defer wg.Done()
file, err := os.Open(path)
if err != nil {
errors[idx] = err
return
}
defer file.Close()
upload, err := client.Upload(context.Background(), reducto.UploadParams{
File: reducto.F[io.Reader](file),
})
if err != nil {
errors[idx] = err
return
}
result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
ParseConfig: reducto.ParseConfigParam{
DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
shared.UnionString(upload.FileID),
),
},
})
results[idx] = result
errors[idx] = err
}(i, filePath)
}
wg.Wait()
for i, result := range results {
if errors[i] != nil {
fmt.Printf("%s: Error - %v\n", files[i], errors[i])
} else {
fmt.Printf("%s: %d pages\n", files[i], result.Usage.NumPages)
}
}
}
Rate Limiting
Control concurrency with a semaphore pattern:
func processWithLimit(client *reducto.Client, files []string, maxConcurrent int) {
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for _, filePath := range files {
wg.Add(1)
sem <- struct{}{} // Acquire
go func(path string) {
defer wg.Done()
defer func() { <-sem }() // Release
file, _ := os.Open(path)
defer file.Close()
upload, _ := client.Upload(context.Background(), reducto.UploadParams{
File: reducto.F[io.Reader](file),
})
result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
ParseConfig: reducto.ParseConfigParam{
DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
shared.UnionString(upload.FileID),
),
},
})
if err != nil {
fmt.Printf("%s: Error - %v\n", path, err)
} else {
fmt.Printf("%s: %d pages\n", path, result.Usage.NumPages)
}
}(filePath)
}
wg.Wait()
}
// Process 100 files, max 5 at a time
processWithLimit(client, fileList, 5)
Async Jobs (RunJob)
For documents that take longer to process, use RunJob methods to avoid timeouts:
Start Async Job
// Start job without waiting for completion
job, err := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
})
if err != nil {
return err
}
fmt.Printf("Job started: %s\n", job.JobID)
Poll for Completion
import "time"
func waitForJob(ctx context.Context, client *reducto.Client, jobID string) (*reducto.JobGetResponse, error) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
job, err := client.Job.Get(ctx, jobID)
if err != nil {
return nil, err
}
switch job.Status {
case "Completed":
return job, nil
case "Failed":
return nil, fmt.Errorf("job failed: %s", job.Reason)
default:
fmt.Printf("Status: %s, Progress: %.0f%%\n", job.Status, job.Progress)
}
}
}
}
// Usage
job, _ := client.Parse.RunJob(context.Background(), params)
result, err := waitForJob(context.Background(), client, job.JobID)
All Async Methods
Each endpoint has a RunJob variant:
// Parse
parseJob, _ := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
})
// Extract
extractJob, _ := client.Extract.RunJob(context.Background(), reducto.ExtractRunJobParams{
DocumentURL: reducto.F[reducto.ExtractRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
Schema: reducto.F[interface{}](schema),
})
// Split
splitJob, _ := client.Split.RunJob(context.Background(), reducto.SplitRunJobParams{
DocumentURL: reducto.F[reducto.SplitRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
SplitDescription: reducto.F(splitDesc),
})
Context Cancellation
Use context for timeouts and cancellation:
// Set timeout for entire operation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := client.Parse.Run(ctx, params)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("Operation timed out")
}
return err
}
Error Handling
Handle errors in concurrent operations:
type result struct {
file string
result *shared.ParseResponse
err error
}
func batchProcess(client *reducto.Client, files []string) []result {
results := make(chan result, len(files))
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go func(path string) {
defer wg.Done()
// ... process file ...
results <- result{file: path, result: res, err: err}
}(file)
}
go func() {
wg.Wait()
close(results)
}()
var output []result
for r := range results {
output = append(output, r)
if r.err != nil {
fmt.Printf("Failed: %s - %v\n", r.file, r.err)
} else {
fmt.Printf("Success: %s - %d pages\n", r.file, r.result.Usage.NumPages)
}
}
return output
}
When to Use Async Jobs
Use RunJob When
Processing large documents (50+ pages), running long extract operations, or integrating with webhooks.
Use Run When
Processing small documents, need immediate results, or simple scripts where waiting is acceptable.
Next Steps