Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added "resumable jobs" that can be broken down into multiple steps and with a step persisted after it finishes that lets them skip work that's already been done. This is particularly useful for long running jobs that may experience a cancellation (like in the event of a deploy) during the span of their run. [PR #1226](https://github.com/riverqueue/river/pull/1226).

## [0.35.0] - 2026-04-18

### Changed
Expand Down
7 changes: 6 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// the more abstract config.Middleware for middleware are set, but not both,
// so in practice we never append all three of these to each other.
{
middleware := config.Middleware
middleware := defaultMiddleware()
middleware = append(middleware, config.Middleware...)
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
middleware = append(middleware, jobInsertMiddleware)
}
Expand Down Expand Up @@ -1002,6 +1003,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return client, nil
}

func defaultMiddleware() []rivertype.Middleware {
return []rivertype.Middleware{&resumableMiddleware{}}
}

// Start starts the client's job fetching and working loops. Once this is called,
// the client will run in a background goroutine until stopped. All jobs are
// run with a context inheriting from the provided context, but with a timeout
Expand Down
114 changes: 111 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,62 @@ func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs])
return nil
}

type resumableClientTestArgs struct{}

func (resumableClientTestArgs) Kind() string { return "resumable_client_test" }

type resumableClientTestWorker struct {
WorkerDefaults[resumableClientTestArgs]

calls []string
callsMu sync.Mutex
failedOnce atomic.Bool
}

func (w *resumableClientTestWorker) Calls() []string {
w.callsMu.Lock()
defer w.callsMu.Unlock()

return append([]string(nil), w.calls...)
}

func (w *resumableClientTestWorker) Work(ctx context.Context, job *Job[resumableClientTestArgs]) error {
appendCall := func(call string) {
w.callsMu.Lock()
defer w.callsMu.Unlock()

w.calls = append(w.calls, call)
}

ResumableStep(ctx, "step1", func(ctx context.Context) error {
appendCall("step1")
return nil
})

ResumableStepCursor(ctx, "step2", func(ctx context.Context, cursor int) error {
appendCall("step2:" + strconv.Itoa(cursor))

for itemID := cursor + 1; itemID <= 2; itemID++ {
appendCall("item:" + strconv.Itoa(itemID))
if err := ResumableSetCursor(ctx, itemID); err != nil {
return err
}
if !w.failedOnce.Swap(true) {
return errors.New("retry me")
}
}

return nil
})

ResumableStep(ctx, "step3", func(ctx context.Context) error {
appendCall("step3")
return nil
})

return nil
}

func makeAwaitWorker[T JobArgs](startedCh chan<- int64, doneCh chan struct{}) Worker[T] {
return WorkFunc(func(ctx context.Context, job *Job[T]) error {
client := ClientFromContext[pgx.Tx](ctx)
Expand Down Expand Up @@ -6936,6 +6992,58 @@ func Test_Client_JobCompletion(t *testing.T) {
require.Nil(t, reloadedJob.FinalizedAt)
})

t.Run("ResumableJobRetriesAndResumes", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.RetryPolicy = &retrypolicytest.RetryPolicyNoJitter{}

worker := &resumableClientTestWorker{}
AddWorker(config.Workers, worker)

client, bundle := setup(t, config)

insertRes, err := client.Insert(ctx, resumableClientTestArgs{}, nil)
require.NoError(t, err)

// Wait for the first attempt to fail after step2 checkpoints cursor
// progress and intentionally returns "retry me", leaving the job queued
// for retry.
eventFailed := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
require.Equal(t, EventKindJobFailed, eventFailed.Kind)
require.Equal(t, insertRes.Job.ID, eventFailed.Job.ID)

var retryableMetadata map[string]any
require.Contains(t, []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateRetryable}, eventFailed.Job.State)
require.NoError(t, json.Unmarshal(eventFailed.Job.Metadata, &retryableMetadata))
require.Equal(t, "step1", retryableMetadata["river:resumable_step"])
require.Equal(t, map[string]any{"step2": float64(1)}, retryableMetadata["river:resumable_cursor"])

// Wait for the retried attempt to resume and then complete successfully.
eventCompleted := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
require.Equal(t, EventKindJobCompleted, eventCompleted.Kind)
require.Equal(t, insertRes.Job.ID, eventCompleted.Job.ID)

reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateCompleted, reloadedJob.State)
require.Len(t, reloadedJob.Errors, 1)

var metadata map[string]any
require.NoError(t, json.Unmarshal(reloadedJob.Metadata, &metadata))
require.Equal(t, "step1", metadata["river:resumable_step"])
require.Equal(t, map[string]any{"step2": float64(1)}, metadata["river:resumable_cursor"])

require.Equal(t, []string{
"step1",
"step2:0",
"item:1",
"step2:1",
"item:2",
"step3",
}, worker.Calls())
})

t.Run("JobThatReturnsJobCancelErrorIsImmediatelyCancelled", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -7602,7 +7710,7 @@ func Test_NewClient_Validations(t *testing.T) {
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
},
},
{
Expand All @@ -7613,7 +7721,7 @@ func Test_NewClient_Validations(t *testing.T) {
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 2)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 3)
},
},
{
Expand All @@ -7625,7 +7733,7 @@ func Test_NewClient_Validations(t *testing.T) {
},
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindJobInsert), 1)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 2)
},
},
{
Expand Down
105 changes: 105 additions & 0 deletions example_resumable_cursor_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package river_test

import (
"context"
"fmt"
"log/slog"
"os"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
)

type ResumableCursorArgs struct {
IDs []int `json:"ids"`
}

func (ResumableCursorArgs) Kind() string { return "resumable_cursor" }

type ResumableCursor struct {
LastProcessedID int `json:"last_processed_id"`
}

type ResumableCursorWorker struct {
river.WorkerDefaults[ResumableCursorArgs]
}

func (w *ResumableCursorWorker) Work(ctx context.Context, job *river.Job[ResumableCursorArgs]) error {
river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
for _, id := range job.Args.IDs {
if id <= cursor.LastProcessedID {
continue
}

fmt.Printf("Processed %d\n", id)

if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
return err
}
}

return nil
})

return nil
}

// Example_resumableCursor demonstrates the use of a resumable cursor step, a
// step that can store arbitrary JSON state to resume a partially completed loop.
func Example_resumableCursor() { //nolint:dupl
ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()

workers := river.NewWorkers()
river.AddWorker(workers, &ResumableCursorWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}

// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

if _, err = riverClient.Insert(ctx, ResumableCursorArgs{
IDs: []int{1, 2, 3},
}, nil); err != nil {
panic(err)
}

// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// Processed 1
// Processed 2
// Processed 3
}
96 changes: 96 additions & 0 deletions example_resumable_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package river_test

import (
"context"
"fmt"
"log/slog"
"os"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
)

type ResumableArgs struct{}

func (ResumableArgs) Kind() string { return "resumable" }

type ResumableWorker struct {
river.WorkerDefaults[ResumableArgs]
}

func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
fmt.Println("Step 1")
return nil
})

river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
fmt.Println("Step 2")
return nil
})

river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
fmt.Println("Step 3")
return nil
})

return nil
}

// Example_resumable demonstrates the use of a "resumable job", a job that has
// multiple steps, and which can be resumed after each one.
func Example_resumable() { //nolint:dupl
ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()

workers := river.NewWorkers()
river.AddWorker(workers, &ResumableWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}

// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

if _, err = riverClient.Insert(ctx, ResumableArgs{}, nil); err != nil {
panic(err)
}

// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

// Output:
// Step 1
// Step 2
// Step 3
}
Loading
Loading