Skip to content

Implement resumable jobs#1226

Open
brandur wants to merge 1 commit intomasterfrom
brandur-resumable-jobs
Open

Implement resumable jobs#1226
brandur wants to merge 1 commit intomasterfrom
brandur-resumable-jobs

Conversation

@brandur
Copy link
Copy Markdown
Contributor

@brandur brandur commented Apr 23, 2026

Here, implement "resumable" jobs, which are jobs that can checkpoint
their progress so that in case they have to stop early, they're picked
up from a point that lets them skip work that's already been done. This
is especially useful for long running jobs that are at risk of being
interrupted from something like a deploy.

Here's roughly the shape of the API, with the same normal Work
function that all jobs implement, and with a series of ResumableStep
calls within, each of which take a name for the step and function
representing it:

func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
    river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
        fmt.Println("Step 1")
        return nil
    })

    river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
        fmt.Println("Step 2")
        return nil
    })

    river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
        fmt.Println("Step 3")
        return nil
    })

    return nil
}

We also provide a cursor API for more granularity. This lets a step set
an arbitrary cursor value periodically as it's doing something like
looping over records in a set:

river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
    for _, id := range job.Args.IDs {
        if id <= cursor.LastProcessedID {
            continue
        }

        fmt.Printf("Processed %d\n", id)

        if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
            return err
        }
    }

    return nil
})

The function is ResumableStepCursor[TCursor any] where TCursor can
be defined arbitrarily by the user. This could be a simple scalar value
representing an ID, or a more complex struct value containing multiple
IDs, enabling nested loops that set inner and outer IDs at the same time.

ResumableStep and ResumableStepCursor steps can be freely
intermingled, and multiple ResumableStepCursor steps with different
cursor types are supported. Cursors must be JSON marshable because
they're stored to a job's metadata.

Lastly, we provide ResumableSetStepTx and ResumableSetStepCursorTx
for cases where a transaction guarantee is necessary. Normally,
resumable step and cursor are set as a job's being completed, but
there's a chance this is never called in case of sudden failure.
ResumableSetStepTx (and its cursor version) is available to durably
persist a step at the cost of an extra database operation similar to how
JobCompleteTx does the same for job completion.

One neat aspect the implementation here is that I was able to make it
entirely middleware-only. So all the resumable job logic goes in an
internal resumableMiddleware that's included in all clients by
default. This is kind of nice because it keeps its code highly modular
and will hopefully act as a template for future features.

Here, implement "resumable" jobs, which are jobs that can checkpoint
their progress so that in case they have to stop early, they're picked
up from a point that lets them skip work that's already been done. This
is especially useful for long running jobs that are at risk of being
interrupted from something like a deploy.

Here's roughly the shape of the API, with the same normal `Work`
function that all jobs implement, and with a series of `ResumableStep`
calls within, each of which take a name for the step and function
representing it:

    func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
        river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
            fmt.Println("Step 1")
            return nil
        })

        river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
            fmt.Println("Step 2")
            return nil
        })

        river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
            fmt.Println("Step 3")
            return nil
        })

        return nil
    }

We also provide a cursor API for more granularity. This lets a step set
an arbitrary cursor value periodically as it's doing something like
looping over records in a set:

    river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
        for _, id := range job.Args.IDs {
            if id <= cursor.LastProcessedID {
                continue
            }

            fmt.Printf("Processed %d\n", id)

            if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
                return err
            }
        }

        return nil
    })

The function is `ResumableStepCursor[TCursor any]` where `TCursor` can
be defined arbitrarily by the user. This could be a simple scalar value
representing an ID, or a more complex `struct` value containing multiple
IDs, enabling nested loops that set inner and outer IDs at the same time.

`ResumableStep` and `ResumableStepCursor` steps can be freely
intermingled, and multiple `ResumableStepCursor` steps with different
cursor types are supported. Cursors must be JSON marshable because
they're stored to a job's metadata.

Lastly, we provide `ResumableSetStepTx` and `ResumableSetStepCursorTx`
for cases where a transaction guarantee is necessary. Normally,
resumable step and cursor are set as a job's being completed, but
there's a chance this is never called in case of sudden failure.
`ResumableSetStepTx` (and its cursor version) is available to durably
persist a step at the cost of an extra database operation similar to how
`JobCompleteTx` does the same for job completion.

One neat aspect the implementation here is that I was able to make it
entirely middleware-only. So all the resumable job logic goes in an
internal `resumableMiddleware` that's included in all clients by
default. This is kind of nice because it keeps its code highly modular
and will hopefully act as a template for future features.
@brandur brandur force-pushed the brandur-resumable-jobs branch from a526d93 to 010be75 Compare April 23, 2026 06:21
@brandur brandur requested a review from bgentry April 23, 2026 07:21
Comment thread resumable_step_tx.go
return nil, err
}

updatedJob, err := client.Driver().UnwrapExecutor(tx).JobUpdate(ctx, &riverdriver.JobUpdateParams{
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bgentry Is it okay by you to use a normal JobUpdate here? Or would you be looking for a specialized JobSetMetadataIfRunning or something similar.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only updating metadata, not touching other attrs (like status), and making an immediate inline update, it should be safe.

I was definitely hoping this could be implemented without access to driver level APIs. Client.JobUpdateParams only has Output right now and we didn't add any sort of generalized metadata merging pattern, though I think we talked about that at some point?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, right on. Keep in mind this driver call is only needed in this special transactional path.

Most of the time, step/cursor are updated as part of the standardized metadata update that happens when a job is completed in the executor. I suspect this is the API that almost everyone should use.

However, we do also provide this transactional variant so you can set your step/cursor in the same tx as another operation you have going, very similar to JobCompleteTx in concept. I suspect this could be handy in long loops where each iteration is a heavy operation and you want to have some protection in case of a panic or process death somewhere down the line.

(See the three example tests for uses of each version of this -- one is a "normal" resumable job, one is a resumable job with cursor, and one is a resumable job with transactional checkpointing.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants