Conversation
Here, implement "resumable" jobs, which are jobs that can checkpoint
their progress so that in case they have to stop early, they're picked
up from a point that lets them skip work that's already been done. This
is especially useful for long running jobs that are at risk of being
interrupted from something like a deploy.
Here's roughly the shape of the API, with the same normal `Work`
function that all jobs implement, and with a series of `ResumableStep`
calls within, each of which take a name for the step and function
representing it:
func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
fmt.Println("Step 1")
return nil
})
river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
fmt.Println("Step 2")
return nil
})
river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
fmt.Println("Step 3")
return nil
})
return nil
}
We also provide a cursor API for more granularity. This lets a step set
an arbitrary cursor value periodically as it's doing something like
looping over records in a set:
river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
for _, id := range job.Args.IDs {
if id <= cursor.LastProcessedID {
continue
}
fmt.Printf("Processed %d\n", id)
if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
return err
}
}
return nil
})
The function is `ResumableStepCursor[TCursor any]` where `TCursor` can
be defined arbitrarily by the user. This could be a simple scalar value
representing an ID, or a more complex `struct` value containing multiple
IDs, enabling nested loops that set inner and outer IDs at the same time.
`ResumableStep` and `ResumableStepCursor` steps can be freely
intermingled, and multiple `ResumableStepCursor` steps with different
cursor types are supported. Cursors must be JSON marshable because
they're stored to a job's metadata.
Lastly, we provide `ResumableSetStepTx` and `ResumableSetStepCursorTx`
for cases where a transaction guarantee is necessary. Normally,
resumable step and cursor are set as a job's being completed, but
there's a chance this is never called in case of sudden failure.
`ResumableSetStepTx` (and its cursor version) is available to durably
persist a step at the cost of an extra database operation similar to how
`JobCompleteTx` does the same for job completion.
One neat aspect the implementation here is that I was able to make it
entirely middleware-only. So all the resumable job logic goes in an
internal `resumableMiddleware` that's included in all clients by
default. This is kind of nice because it keeps its code highly modular
and will hopefully act as a template for future features.
a526d93 to
010be75
Compare
| return nil, err | ||
| } | ||
|
|
||
| updatedJob, err := client.Driver().UnwrapExecutor(tx).JobUpdate(ctx, &riverdriver.JobUpdateParams{ |
There was a problem hiding this comment.
@bgentry Is it okay by you to use a normal JobUpdate here? Or would you be looking for a specialized JobSetMetadataIfRunning or something similar.
There was a problem hiding this comment.
If only updating metadata, not touching other attrs (like status), and making an immediate inline update, it should be safe.
I was definitely hoping this could be implemented without access to driver level APIs. Client.JobUpdateParams only has Output right now and we didn't add any sort of generalized metadata merging pattern, though I think we talked about that at some point?
There was a problem hiding this comment.
Ah yeah, right on. Keep in mind this driver call is only needed in this special transactional path.
Most of the time, step/cursor are updated as part of the standardized metadata update that happens when a job is completed in the executor. I suspect this is the API that almost everyone should use.
However, we do also provide this transactional variant so you can set your step/cursor in the same tx as another operation you have going, very similar to JobCompleteTx in concept. I suspect this could be handy in long loops where each iteration is a heavy operation and you want to have some protection in case of a panic or process death somewhere down the line.
(See the three example tests for uses of each version of this -- one is a "normal" resumable job, one is a resumable job with cursor, and one is a resumable job with transactional checkpointing.)
Here, implement "resumable" jobs, which are jobs that can checkpoint
their progress so that in case they have to stop early, they're picked
up from a point that lets them skip work that's already been done. This
is especially useful for long running jobs that are at risk of being
interrupted from something like a deploy.
Here's roughly the shape of the API, with the same normal
Workfunction that all jobs implement, and with a series of
ResumableStepcalls within, each of which take a name for the step and function
representing it:
We also provide a cursor API for more granularity. This lets a step set
an arbitrary cursor value periodically as it's doing something like
looping over records in a set:
The function is
ResumableStepCursor[TCursor any]whereTCursorcanbe defined arbitrarily by the user. This could be a simple scalar value
representing an ID, or a more complex
structvalue containing multipleIDs, enabling nested loops that set inner and outer IDs at the same time.
ResumableStepandResumableStepCursorsteps can be freelyintermingled, and multiple
ResumableStepCursorsteps with differentcursor types are supported. Cursors must be JSON marshable because
they're stored to a job's metadata.
Lastly, we provide
ResumableSetStepTxandResumableSetStepCursorTxfor cases where a transaction guarantee is necessary. Normally,
resumable step and cursor are set as a job's being completed, but
there's a chance this is never called in case of sudden failure.
ResumableSetStepTx(and its cursor version) is available to durablypersist a step at the cost of an extra database operation similar to how
JobCompleteTxdoes the same for job completion.One neat aspect the implementation here is that I was able to make it
entirely middleware-only. So all the resumable job logic goes in an
internal
resumableMiddlewarethat's included in all clients bydefault. This is kind of nice because it keeps its code highly modular
and will hopefully act as a template for future features.