RunJob methods.
Concurrent Processing
Process multiple documents simultaneously with goroutines:Parse Multiple Documents
Copy
Ask AI
package main
import (
"context"
"fmt"
"io"
"os"
"sync"
reducto "github.com/reductoai/reducto-go-sdk"
"github.com/reductoai/reducto-go-sdk/option"
"github.com/reductoai/reducto-go-sdk/shared"
)
func main() {
client := reducto.NewClient(option.WithAPIKey(os.Getenv("REDUCTO_API_KEY")))
files := []string{"doc1.pdf", "doc2.pdf", "doc3.pdf"}
var wg sync.WaitGroup
results := make([]*shared.ParseResponse, len(files))
errors := make([]error, len(files))
for i, filePath := range files {
wg.Add(1)
go func(idx int, path string) {
defer wg.Done()
file, err := os.Open(path)
if err != nil {
errors[idx] = err
return
}
defer file.Close()
upload, err := client.Upload(context.Background(), reducto.UploadParams{
File: reducto.F[io.Reader](file),
})
if err != nil {
errors[idx] = err
return
}
result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
ParseConfig: reducto.ParseConfigParam{
DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
shared.UnionString(upload.FileID),
),
},
})
results[idx] = result
errors[idx] = err
}(i, filePath)
}
wg.Wait()
for i, result := range results {
if errors[i] != nil {
fmt.Printf("%s: Error - %v\n", files[i], errors[i])
} else {
fmt.Printf("%s: %d pages\n", files[i], result.Usage.NumPages)
}
}
}
Rate Limiting
Control concurrency with a semaphore pattern:Copy
Ask AI
func processWithLimit(client *reducto.Client, files []string, maxConcurrent int) {
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for _, filePath := range files {
wg.Add(1)
sem <- struct{}{} // Acquire
go func(path string) {
defer wg.Done()
defer func() { <-sem }() // Release
file, _ := os.Open(path)
defer file.Close()
upload, _ := client.Upload(context.Background(), reducto.UploadParams{
File: reducto.F[io.Reader](file),
})
result, err := client.Parse.Run(context.Background(), reducto.ParseRunParams{
ParseConfig: reducto.ParseConfigParam{
DocumentURL: reducto.F[reducto.ParseConfigDocumentURLUnionParam](
shared.UnionString(upload.FileID),
),
},
})
if err != nil {
fmt.Printf("%s: Error - %v\n", path, err)
} else {
fmt.Printf("%s: %d pages\n", path, result.Usage.NumPages)
}
}(filePath)
}
wg.Wait()
}
// Process 100 files, max 5 at a time
processWithLimit(client, fileList, 5)
Async Jobs (RunJob)
For documents that take longer to process, useRunJob methods to avoid timeouts:
Start Async Job
Copy
Ask AI
// Start job without waiting for completion
job, err := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
})
if err != nil {
return err
}
fmt.Printf("Job started: %s\n", job.JobID)
Poll for Completion
Copy
Ask AI
import "time"
func waitForJob(ctx context.Context, client *reducto.Client, jobID string) (*reducto.JobGetResponse, error) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
job, err := client.Job.Get(ctx, jobID)
if err != nil {
return nil, err
}
switch job.Status {
case "Completed":
return job, nil
case "Failed":
return nil, fmt.Errorf("job failed: %s", job.Reason)
default:
fmt.Printf("Status: %s, Progress: %.0f%%\n", job.Status, job.Progress)
}
}
}
}
// Usage
job, _ := client.Parse.RunJob(context.Background(), params)
result, err := waitForJob(context.Background(), client, job.JobID)
All Async Methods
Each endpoint has aRunJob variant:
Copy
Ask AI
// Parse
parseJob, _ := client.Parse.RunJob(context.Background(), reducto.ParseRunJobParams{
DocumentURL: reducto.F[reducto.ParseRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
})
// Extract
extractJob, _ := client.Extract.RunJob(context.Background(), reducto.ExtractRunJobParams{
DocumentURL: reducto.F[reducto.ExtractRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
Schema: reducto.F[interface{}](schema),
})
// Split
splitJob, _ := client.Split.RunJob(context.Background(), reducto.SplitRunJobParams{
DocumentURL: reducto.F[reducto.SplitRunJobParamsDocumentURLUnion](
shared.UnionString(upload.FileID),
),
SplitDescription: reducto.F(splitDesc),
})
Context Cancellation
Use context for timeouts and cancellation:Copy
Ask AI
// Set timeout for entire operation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := client.Parse.Run(ctx, params)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("Operation timed out")
}
return err
}
Error Handling
Handle errors in concurrent operations:Copy
Ask AI
type result struct {
file string
result *shared.ParseResponse
err error
}
func batchProcess(client *reducto.Client, files []string) []result {
results := make(chan result, len(files))
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go func(path string) {
defer wg.Done()
// ... process file ...
results <- result{file: path, result: res, err: err}
}(file)
}
go func() {
wg.Wait()
close(results)
}()
var output []result
for r := range results {
output = append(output, r)
if r.err != nil {
fmt.Printf("Failed: %s - %v\n", r.file, r.err)
} else {
fmt.Printf("Success: %s - %d pages\n", r.file, r.result.Usage.NumPages)
}
}
return output
}
When to Use Async Jobs
Use RunJob When
Processing large documents (50+ pages), running long extract operations, or integrating with webhooks.
Use Run When
Processing small documents, need immediate results, or simple scripts where waiting is acceptable.
Next Steps
- See job management for the complete job API
- Check error handling for the exception reference