From bfe5ba0e119fb866e65060d3f042e626e532ae22 Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 21 Apr 2026 17:41:27 -0500 Subject: [PATCH] Convert SQLite JSON columns to JSONB Alright, so this one's been some cleanup that I've been intending for a while. Previously, sqlc made using JSONB in SQLite completely unusable because it sent back the binary format which you're explicitly not supposed to parse yourself. I ended up fixing this like a year ago in [1], but getting a sqlc release out the door took so long that I ended up forgetting about it. Here, modify our SQLite definitions so that JSON becomes JSONB, and add migration to convert existing installation values to the same. Luckily, this doesn't require a table rewrite because both JSON and JSONB are SQLite BLOB types. I didn't want to add a migration version just for SQLite, so to keep everything in sync I also added a version 7 for Postgres that's just a no-op with a comment. [1] https://github.com/sqlc-dev/sqlc/pull/3968 --- CHANGELOG.md | 21 ++++ riverdriver/river_driver_interface.go | 2 +- .../main/007_sqlite_json_to_jsonb.down.sql | 1 + .../main/007_sqlite_json_to_jsonb.up.sql | 2 + riverdriver/riverdrivertest/migration.go | 2 + .../main/007_sqlite_json_to_jsonb.down.sql | 1 + .../main/007_sqlite_json_to_jsonb.up.sql | 2 + .../internal/dbsqlc/river_client.sql | 2 +- .../internal/dbsqlc/river_client_queue.sql | 2 +- .../riversqlite/internal/dbsqlc/river_job.sql | 42 ++++---- .../internal/dbsqlc/river_job.sql.go | 96 +++++++++---------- .../internal/dbsqlc/river_queue.sql | 6 +- .../internal/dbsqlc/river_queue.sql.go | 18 ++-- .../riversqlite/internal/dbsqlc/sqlc.yaml | 7 ++ .../main/007_sqlite_json_to_jsonb.down.sql | 60 ++++++++++++ .../main/007_sqlite_json_to_jsonb.up.sql | 63 ++++++++++++ .../riversqlite/river_sqlite_driver.go | 4 +- 17 files changed, 245 insertions(+), 86 deletions(-) create mode 100644 riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.down.sql create mode 100644 riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.up.sql create mode 100644 riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.down.sql create mode 100644 riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.up.sql create mode 100644 riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.down.sql create mode 100644 riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.up.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 159020b0..3223cd13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ Version 0.36.0 contains a new database migration, version 7, but running it is **only necessary if you're using SQLite**. It converts all uses of `json` to `jsonb`. If using Postgres, version 7 is a no-op. You can run it now or wait for another migration in the future and run the two together. + +See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version: + +```shell +go install github.com/riverqueue/river/cmd/river@latest +river migrate-up --database-url "$DATABASE_URL" +``` + +If not using River's internal migration system, the raw SQL can alternatively be dumped with: + +```shell +go install github.com/riverqueue/river/cmd/river@latest +river migrate-get --database-url sqlite:// --version 6 --up > river7.up.sql +river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql +``` + +### Changed + +- Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224). + ## [0.35.0] - 2026-04-18 ### Changed diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 8727a6e3..1bd50eaa 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -883,7 +883,7 @@ func MigrationLineMainTruncateTables(version int) []string { return []string{"river_job", "river_leader"} case 4: return []string{"river_job", "river_leader", "river_queue"} - case 0, 5, 6: + case 0, 5, 6, 7: return []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"} } diff --git a/riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.down.sql b/riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.down.sql new file mode 100644 index 00000000..245e5460 --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.down.sql @@ -0,0 +1 @@ +-- No-op migration to keep version numbers in sync across all drivers. diff --git a/riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.up.sql b/riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.up.sql new file mode 100644 index 00000000..f48a922c --- /dev/null +++ b/riverdriver/riverdatabasesql/migration/main/007_sqlite_json_to_jsonb.up.sql @@ -0,0 +1,2 @@ +-- No-op migration to keep version numbers in sync across all drivers. +-- The SQLite driver uses this version for a JSONB conversion. diff --git a/riverdriver/riverdrivertest/migration.go b/riverdriver/riverdrivertest/migration.go index ba87a2bf..65b940cd 100644 --- a/riverdriver/riverdrivertest/migration.go +++ b/riverdriver/riverdrivertest/migration.go @@ -70,6 +70,8 @@ func exerciseMigration[TTx any](ctx context.Context, t *testing.T, driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 5)) require.Equal(t, []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"}, driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 6)) + require.Equal(t, []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"}, + driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 7)) require.Equal(t, []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"}, driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 0)) }) diff --git a/riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.down.sql b/riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.down.sql new file mode 100644 index 00000000..245e5460 --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.down.sql @@ -0,0 +1 @@ +-- No-op migration to keep version numbers in sync across all drivers. diff --git a/riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.up.sql b/riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.up.sql new file mode 100644 index 00000000..f48a922c --- /dev/null +++ b/riverdriver/riverpgxv5/migration/main/007_sqlite_json_to_jsonb.up.sql @@ -0,0 +1,2 @@ +-- No-op migration to keep version numbers in sync across all drivers. +-- The SQLite driver uses this version for a JSONB conversion. diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_client.sql b/riverdriver/riversqlite/internal/dbsqlc/river_client.sql index e09cce0c..9266824d 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_client.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_client.sql @@ -1,7 +1,7 @@ CREATE TABLE river_client ( id text PRIMARY KEY NOT NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - metadata blob NOT NULL DEFAULT (json('{}')), + metadata jsonb NOT NULL DEFAULT (jsonb('{}')), paused_at timestamp, updated_at timestamp NOT NULL, CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_client_queue.sql b/riverdriver/riversqlite/internal/dbsqlc/river_client_queue.sql index 55b9a51a..46aa4269 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_client_queue.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_client_queue.sql @@ -3,7 +3,7 @@ CREATE TABLE river_client_queue ( name text NOT NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, max_workers integer NOT NULL DEFAULT 0, - metadata blob NOT NULL DEFAULT (json('{}')), + metadata jsonb NOT NULL DEFAULT (jsonb('{}')), num_jobs_completed integer NOT NULL DEFAULT 0, num_jobs_running integer NOT NULL DEFAULT 0, updated_at timestamp NOT NULL, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..7bf047d6 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -1,20 +1,20 @@ CREATE TABLE river_job ( id integer PRIMARY KEY, -- SQLite makes this autoincrementing automatically - args blob NOT NULL DEFAULT '{}', + args jsonb NOT NULL DEFAULT (jsonb('{}')), attempt integer NOT NULL DEFAULT 0, attempted_at timestamp, - attempted_by blob, -- JSON array of strings + attempted_by jsonb, -- JSON array of strings created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - errors blob, -- JSON array of error objects + errors jsonb, -- JSON array of error objects finalized_at timestamp, kind text NOT NULL, max_attempts integer NOT NULL, - metadata blob NOT NULL DEFAULT (json('{}')), + metadata jsonb NOT NULL DEFAULT (jsonb('{}')), priority integer NOT NULL DEFAULT 1, queue text NOT NULL DEFAULT 'default', state text NOT NULL DEFAULT 'available', scheduled_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - tags blob NOT NULL DEFAULT (json('[]')), -- JSON array of strings + tags jsonb NOT NULL DEFAULT (jsonb('[]')), -- JSON array of strings unique_key blob, unique_states integer, CONSTRAINT finalized_or_finalized_at_null CHECK ( @@ -44,7 +44,7 @@ SET finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = json_set(metadata, '$.cancel_attempted_at', cast(@cancel_attempted_at AS text)) + metadata = jsonb_set(metadata, '$.cancel_attempted_at', cast(@cancel_attempted_at AS text)) WHERE id = @id AND state NOT IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NULL @@ -219,12 +219,12 @@ INSERT INTO /* TEMPLATE: schema */river_job( coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')), @kind, @max_attempts, - json(cast(@metadata AS blob)), + jsonb(@metadata), @priority, @queue, coalesce(cast(sqlc.narg('scheduled_at') AS text), datetime('now', 'subsec')), @state, - json(cast(@tags AS blob)), + jsonb(@tags), CASE WHEN length(cast(@unique_key AS blob)) = 0 THEN NULL ELSE @unique_key END, @unique_states ) @@ -265,12 +265,12 @@ INSERT INTO /* TEMPLATE: schema */river_job( coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')), @kind, @max_attempts, - json(cast(@metadata AS blob)), + jsonb(@metadata), @priority, @queue, coalesce(cast(sqlc.narg('scheduled_at') AS text), datetime('now', 'subsec')), @state, - json(cast(@tags AS blob)), + jsonb(@tags), CASE WHEN length(cast(@unique_key AS blob)) = 0 THEN NULL ELSE @unique_key END, @unique_states ) @@ -313,18 +313,18 @@ INSERT INTO /* TEMPLATE: schema */river_job( @args, @attempt, cast(sqlc.narg('attempted_at') as text), - CASE WHEN length(cast(@attempted_by AS blob)) = 0 THEN NULL ELSE json(@attempted_by) END, + CASE WHEN length(cast(@attempted_by AS blob)) = 0 THEN NULL ELSE jsonb(@attempted_by) END, coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')), CASE WHEN length(cast(@errors AS blob)) = 0 THEN NULL ELSE @errors END, cast(sqlc.narg('finalized_at') as text), @kind, @max_attempts, - json(cast(@metadata AS blob)), + jsonb(@metadata), @priority, @queue, coalesce(cast(sqlc.narg('scheduled_at') AS text), datetime('now', 'subsec')), @state, - json(cast(@tags AS blob)), + jsonb(@tags), CASE WHEN length(cast(@unique_key AS blob)) = 0 THEN NULL ELSE @unique_key END, @unique_states ) RETURNING *; @@ -358,10 +358,10 @@ LIMIT @max; -- name: JobRescue :exec UPDATE /* TEMPLATE: schema */river_job SET - errors = json_insert(coalesce(errors, json('[]')), '$[#]', json(cast(@error AS blob))), + errors = jsonb_insert(coalesce(errors, jsonb('[]')), '$[#]', jsonb(@error)), finalized_at = cast(sqlc.narg('finalized_at') as text), scheduled_at = @scheduled_at, - metadata = json_set( + metadata = jsonb_set( metadata, '$."river:rescue_count"', coalesce( @@ -444,7 +444,7 @@ RETURNING *; -- name: JobScheduleSetDiscarded :many UPDATE /* TEMPLATE: schema */river_job -SET metadata = json_patch(metadata, json('{"unique_key_conflict": "scheduler_discarded"}')), +SET metadata = jsonb_patch(metadata, jsonb('{"unique_key_conflict": "scheduler_discarded"}')), finalized_at = coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), state = 'discarded' WHERE id IN (sqlc.slice('id')) @@ -454,7 +454,7 @@ RETURNING *; -- for JobSetStateIfRunning to use when falling back to non-running jobs. -- name: JobSetMetadataIfNotRunning :one UPDATE /* TEMPLATE: schema */river_job -SET metadata = json_patch(metadata, json(cast(@metadata_updates AS blob))) +SET metadata = jsonb_patch(metadata, jsonb(@metadata_updates)) WHERE id = @id AND state != 'running' RETURNING *; @@ -472,7 +472,7 @@ SET THEN @attempt ELSE attempt END, errors = CASE WHEN cast(@errors_do_update AS boolean) - THEN json_insert(coalesce(errors, json('[]')), '$[#]', json(cast(@error AS blob))) + THEN jsonb_insert(coalesce(errors, jsonb('[]')), '$[#]', jsonb(@error)) ELSE errors END, finalized_at = CASE WHEN /* should_cancel */((@state = 'retryable' OR @state = 'scheduled') AND (metadata -> 'cancel_attempted_at') iS NOT NULL) THEN coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) @@ -480,7 +480,7 @@ SET THEN @finalized_at ELSE finalized_at END, metadata = CASE WHEN cast(@metadata_do_merge AS boolean) - THEN json_patch(metadata, json(cast(@metadata_updates AS blob))) + THEN jsonb_patch(metadata, jsonb(@metadata_updates)) ELSE metadata END, scheduled_at = CASE WHEN /* NOT should_cancel */(cast(@state AS text) <> 'retryable' AND @state <> 'scheduled' OR (metadata -> 'cancel_attempted_at') IS NULL) AND cast(@scheduled_at_do_update AS boolean) THEN @scheduled_at @@ -495,7 +495,7 @@ RETURNING *; -- name: JobUpdate :one UPDATE /* TEMPLATE: schema */river_job SET - metadata = CASE WHEN cast(@metadata_do_merge AS boolean) THEN json_patch(metadata, json(cast(@metadata AS blob))) ELSE metadata END + metadata = CASE WHEN cast(@metadata_do_merge AS boolean) THEN jsonb_patch(metadata, jsonb(@metadata)) ELSE metadata END WHERE id = @id RETURNING *; @@ -510,7 +510,7 @@ SET errors = CASE WHEN cast(@errors_do_update AS boolean) THEN @errors ELSE errors END, finalized_at = CASE WHEN cast(@finalized_at_do_update AS boolean) THEN @finalized_at ELSE finalized_at END, max_attempts = CASE WHEN cast(@max_attempts_do_update AS boolean) THEN @max_attempts ELSE max_attempts END, - metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END, + metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN jsonb(@metadata) ELSE metadata END, state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END WHERE id = @id RETURNING *; \ No newline at end of file diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 1963523d..b10b0042 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -21,11 +21,11 @@ SET finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE coalesce(cast(?1 AS text), datetime('now', 'subsec')) END, -- Mark the job as cancelled by query so that the rescuer knows not to -- rescue it, even if it gets stuck in the running state: - metadata = json_set(metadata, '$.cancel_attempted_at', cast(?2 AS text)) + metadata = jsonb_set(metadata, '$.cancel_attempted_at', cast(?2 AS text)) WHERE id = ?3 AND state NOT IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NULL -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobCancelParams struct { @@ -179,7 +179,7 @@ FROM /* TEMPLATE: schema */river_job WHERE id = ?1 -- Do not touch running jobs: AND river_job.state != 'running' -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` // Differs by necessity from other drivers because SQLite doesn't support @@ -282,7 +282,7 @@ WHERE id IN ( ORDER BY /* TEMPLATE_BEGIN: order_by_clause */ id /* TEMPLATE_END */ LIMIT ?1 ) -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` func (q *Queries) JobDeleteMany(ctx context.Context, db DBTX, max int64) ([]*RiverJob, error) { @@ -352,7 +352,7 @@ WHERE id IN ( id ASC LIMIT ?3 ) -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobGetAvailableParams struct { @@ -407,7 +407,7 @@ func (q *Queries) JobGetAvailable(ctx context.Context, db DBTX, arg *JobGetAvail } const jobGetByID = `-- name: JobGetByID :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE id = ?1 LIMIT 1 @@ -440,7 +440,7 @@ func (q *Queries) JobGetByID(ctx context.Context, db DBTX, id int64) (*RiverJob, } const jobGetByIDMany = `-- name: JobGetByIDMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE id IN (/*SLICE:id*/?) ORDER BY id @@ -499,7 +499,7 @@ func (q *Queries) JobGetByIDMany(ctx context.Context, db DBTX, id []int64) ([]*R } const jobGetByKindMany = `-- name: JobGetByKindMany :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE kind IN (/*SLICE:kind*/?) ORDER BY id @@ -558,7 +558,7 @@ func (q *Queries) JobGetByKindMany(ctx context.Context, db DBTX, kind []string) } const jobGetStuck = `-- name: JobGetStuck :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE state = 'running' AND attempted_at < cast(?1 AS text) @@ -634,12 +634,12 @@ INSERT INTO /* TEMPLATE: schema */river_job( coalesce(cast(?3 AS text), datetime('now', 'subsec')), ?4, ?5, - json(cast(?6 AS blob)), + jsonb(?6), ?7, ?8, coalesce(cast(?9 AS text), datetime('now', 'subsec')), ?10, - json(cast(?11 AS blob)), + jsonb(?11), CASE WHEN length(cast(?12 AS blob)) = 0 THEN NULL ELSE ?12 END, ?13 ) @@ -659,7 +659,7 @@ ON CONFLICT (unique_key) END >= 1 -- Something needs to be updated for a row to be returned on a conflict. DO UPDATE SET kind = EXCLUDED.kind -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobInsertFastParams struct { @@ -668,12 +668,12 @@ type JobInsertFastParams struct { CreatedAt *string Kind string MaxAttempts int64 - Metadata []byte + Metadata interface{} Priority int64 Queue string ScheduledAt *string State string - Tags []byte + Tags interface{} UniqueKey []byte UniqueStates *int64 } @@ -746,12 +746,12 @@ INSERT INTO /* TEMPLATE: schema */river_job( coalesce(cast(?2 AS text), datetime('now', 'subsec')), ?3, ?4, - json(cast(?5 AS blob)), + jsonb(?5), ?6, ?7, coalesce(cast(?8 AS text), datetime('now', 'subsec')), ?9, - json(cast(?10 AS blob)), + jsonb(?10), CASE WHEN length(cast(?11 AS blob)) = 0 THEN NULL ELSE ?11 END, ?12 ) @@ -777,12 +777,12 @@ type JobInsertFastNoReturningParams struct { CreatedAt *string Kind string MaxAttempts int64 - Metadata []byte + Metadata interface{} Priority int64 Queue string ScheduledAt *string State string - Tags []byte + Tags interface{} UniqueKey []byte UniqueStates *int64 } @@ -831,21 +831,21 @@ INSERT INTO /* TEMPLATE: schema */river_job( ?1, ?2, cast(?3 as text), - CASE WHEN length(cast(?4 AS blob)) = 0 THEN NULL ELSE json(?4) END, + CASE WHEN length(cast(?4 AS blob)) = 0 THEN NULL ELSE jsonb(?4) END, coalesce(cast(?5 AS text), datetime('now', 'subsec')), CASE WHEN length(cast(?6 AS blob)) = 0 THEN NULL ELSE ?6 END, cast(?7 as text), ?8, ?9, - json(cast(?10 AS blob)), + jsonb(?10), ?11, ?12, coalesce(cast(?13 AS text), datetime('now', 'subsec')), ?14, - json(cast(?15 AS blob)), + jsonb(?15), CASE WHEN length(cast(?16 AS blob)) = 0 THEN NULL ELSE ?16 END, ?17 -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +) RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobInsertFullParams struct { @@ -858,12 +858,12 @@ type JobInsertFullParams struct { FinalizedAt *string Kind string MaxAttempts int64 - Metadata []byte + Metadata interface{} Priority int64 Queue string ScheduledAt *string State string - Tags []byte + Tags interface{} UniqueKey []byte UniqueStates *int64 } @@ -966,7 +966,7 @@ func (q *Queries) JobKindList(ctx context.Context, db DBTX, arg *JobKindListPara } const jobList = `-- name: JobList :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE /* TEMPLATE_BEGIN: where_clause */ true /* TEMPLATE_END */ ORDER BY /* TEMPLATE_BEGIN: order_by_clause */ id /* TEMPLATE_END */ @@ -1018,10 +1018,10 @@ func (q *Queries) JobList(ctx context.Context, db DBTX, max int64) ([]*RiverJob, const jobRescue = `-- name: JobRescue :exec UPDATE /* TEMPLATE: schema */river_job SET - errors = json_insert(coalesce(errors, json('[]')), '$[#]', json(cast(?1 AS blob))), + errors = jsonb_insert(coalesce(errors, jsonb('[]')), '$[#]', jsonb(?1)), finalized_at = cast(?2 as text), scheduled_at = ?3, - metadata = json_set( + metadata = jsonb_set( metadata, '$."river:rescue_count"', coalesce( @@ -1037,7 +1037,7 @@ WHERE id = ?5 ` type JobRescueParams struct { - Error []byte + Error interface{} FinalizedAt *string ScheduledAt time.Time State string @@ -1084,7 +1084,7 @@ WHERE id = ?2 state <> 'available' OR scheduled_at > coalesce(cast(?1 AS text), datetime('now', 'subsec')) ) -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobRetryParams struct { @@ -1128,7 +1128,7 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, arg *JobRetryParams) (* } const jobScheduleGetCollision = `-- name: JobScheduleGetCollision :one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE id <> ?1 AND unique_key = ?2 @@ -1178,7 +1178,7 @@ func (q *Queries) JobScheduleGetCollision(ctx context.Context, db DBTX, arg *Job } const jobScheduleGetEligible = `-- name: JobScheduleGetEligible :many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +SELECT id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states FROM /* TEMPLATE: schema */river_job WHERE state IN ('retryable', 'scheduled') @@ -1242,7 +1242,7 @@ UPDATE /* TEMPLATE: schema */river_job SET state = 'available' WHERE id IN (/*SLICE:id*/?) -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` func (q *Queries) JobScheduleSetAvailable(ctx context.Context, db DBTX, id []int64) ([]*RiverJob, error) { @@ -1299,11 +1299,11 @@ func (q *Queries) JobScheduleSetAvailable(ctx context.Context, db DBTX, id []int const jobScheduleSetDiscarded = `-- name: JobScheduleSetDiscarded :many UPDATE /* TEMPLATE: schema */river_job -SET metadata = json_patch(metadata, json('{"unique_key_conflict": "scheduler_discarded"}')), +SET metadata = jsonb_patch(metadata, jsonb('{"unique_key_conflict": "scheduler_discarded"}')), finalized_at = coalesce(cast(?1 AS text), datetime('now', 'subsec')), state = 'discarded' WHERE id IN (/*SLICE:id*/?) -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobScheduleSetDiscardedParams struct { @@ -1366,14 +1366,14 @@ func (q *Queries) JobScheduleSetDiscarded(ctx context.Context, db DBTX, arg *Job const jobSetMetadataIfNotRunning = `-- name: JobSetMetadataIfNotRunning :one UPDATE /* TEMPLATE: schema */river_job -SET metadata = json_patch(metadata, json(cast(?1 AS blob))) +SET metadata = jsonb_patch(metadata, jsonb(?1)) WHERE id = ?2 AND state != 'running' -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobSetMetadataIfNotRunningParams struct { - MetadataUpdates []byte + MetadataUpdates interface{} ID int64 } @@ -1415,7 +1415,7 @@ SET THEN ?3 ELSE attempt END, errors = CASE WHEN cast(?4 AS boolean) - THEN json_insert(coalesce(errors, json('[]')), '$[#]', json(cast(?5 AS blob))) + THEN jsonb_insert(coalesce(errors, jsonb('[]')), '$[#]', jsonb(?5)) ELSE errors END, finalized_at = CASE WHEN /* should_cancel */((?1 = 'retryable' OR ?1 = 'scheduled') AND (metadata -> 'cancel_attempted_at') iS NOT NULL) THEN coalesce(cast(?6 AS text), datetime('now', 'subsec')) @@ -1423,7 +1423,7 @@ SET THEN ?8 ELSE finalized_at END, metadata = CASE WHEN cast(?9 AS boolean) - THEN json_patch(metadata, json(cast(?10 AS blob))) + THEN jsonb_patch(metadata, jsonb(?10)) ELSE metadata END, scheduled_at = CASE WHEN /* NOT should_cancel */(cast(?1 AS text) <> 'retryable' AND ?1 <> 'scheduled' OR (metadata -> 'cancel_attempted_at') IS NULL) AND cast(?11 AS boolean) THEN ?12 @@ -1433,7 +1433,7 @@ SET ELSE ?1 END WHERE id = ?13 AND state = 'running' -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobSetStateIfRunningParams struct { @@ -1441,12 +1441,12 @@ type JobSetStateIfRunningParams struct { AttemptDoUpdate bool Attempt int64 ErrorsDoUpdate bool - Error []byte + Error interface{} Now *string FinalizedAtDoUpdate bool FinalizedAt *time.Time MetadataDoMerge bool - MetadataUpdates []byte + MetadataUpdates interface{} ScheduledAtDoUpdate bool ScheduledAt time.Time ID int64 @@ -1498,14 +1498,14 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet const jobUpdate = `-- name: JobUpdate :one UPDATE /* TEMPLATE: schema */river_job SET - metadata = CASE WHEN cast(?1 AS boolean) THEN json_patch(metadata, json(cast(?2 AS blob))) ELSE metadata END + metadata = CASE WHEN cast(?1 AS boolean) THEN jsonb_patch(metadata, jsonb(?2)) ELSE metadata END WHERE id = ?3 -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobUpdateParams struct { MetadataDoMerge bool - Metadata []byte + Metadata interface{} ID int64 } @@ -1544,10 +1544,10 @@ SET errors = CASE WHEN cast(?7 AS boolean) THEN ?8 ELSE errors END, finalized_at = CASE WHEN cast(?9 AS boolean) THEN ?10 ELSE finalized_at END, max_attempts = CASE WHEN cast(?11 AS boolean) THEN ?12 ELSE max_attempts END, - metadata = CASE WHEN cast(?13 AS boolean) THEN json(cast(?14 AS blob)) ELSE metadata END, + metadata = CASE WHEN cast(?13 AS boolean) THEN jsonb(?14) ELSE metadata END, state = CASE WHEN cast(?15 AS boolean) THEN ?16 ELSE state END WHERE id = ?17 -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +RETURNING id, json(args), attempt, attempted_at, json(attempted_by), created_at, json(errors), finalized_at, kind, max_attempts, json(metadata), priority, queue, state, scheduled_at, json(tags), unique_key, unique_states ` type JobUpdateFullParams struct { @@ -1564,7 +1564,7 @@ type JobUpdateFullParams struct { MaxAttemptsDoUpdate bool MaxAttempts int64 MetadataDoUpdate bool - Metadata []byte + Metadata interface{} StateDoUpdate bool State string ID int64 diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql index 1aa7c5fe..90944127 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql @@ -1,7 +1,7 @@ CREATE TABLE river_queue ( name text PRIMARY KEY NOT NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - metadata blob NOT NULL DEFAULT (json('{}')), + metadata jsonb NOT NULL DEFAULT (jsonb('{}')), paused_at timestamp, updated_at timestamp NOT NULL ); @@ -15,7 +15,7 @@ INSERT INTO /* TEMPLATE: schema */river_queue ( updated_at ) VALUES ( coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')), - json(cast(@metadata AS blob)), + jsonb(@metadata), @name, cast(sqlc.narg('paused_at') AS text), coalesce(cast(sqlc.narg('updated_at') AS text), cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) @@ -73,7 +73,7 @@ WHERE CASE WHEN cast(@name AS text) = '*' THEN true ELSE name = @name END; -- name: QueueUpdate :one UPDATE /* TEMPLATE: schema */river_queue SET - metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END, + metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN jsonb(@metadata) ELSE metadata END, updated_at = datetime('now', 'subsec') WHERE name = @name RETURNING *; diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go index d9edc714..fa52751c 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_queue.sql.go @@ -20,19 +20,19 @@ INSERT INTO /* TEMPLATE: schema */river_queue ( updated_at ) VALUES ( coalesce(cast(?1 AS text), datetime('now', 'subsec')), - json(cast(?2 AS blob)), + jsonb(?2), ?3, cast(?4 AS text), coalesce(cast(?5 AS text), cast(?1 AS text), datetime('now', 'subsec')) ) ON CONFLICT (name) DO UPDATE SET updated_at = EXCLUDED.updated_at -RETURNING name, created_at, metadata, paused_at, updated_at +RETURNING name, created_at, json(metadata), paused_at, updated_at ` type QueueCreateOrSetUpdatedAtParams struct { Now *string - Metadata []byte + Metadata interface{} Name string PausedAt *string UpdatedAt *string @@ -66,7 +66,7 @@ WHERE name IN ( ORDER BY name ASC LIMIT ?2 ) -RETURNING name, created_at, metadata, paused_at, updated_at +RETURNING name, created_at, json(metadata), paused_at, updated_at ` type QueueDeleteExpiredParams struct { @@ -104,7 +104,7 @@ func (q *Queries) QueueDeleteExpired(ctx context.Context, db DBTX, arg *QueueDel } const queueGet = `-- name: QueueGet :one -SELECT name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, json(metadata), paused_at, updated_at FROM /* TEMPLATE: schema */river_queue WHERE name = ?1 ` @@ -123,7 +123,7 @@ func (q *Queries) QueueGet(ctx context.Context, db DBTX, name string) (*RiverQue } const queueList = `-- name: QueueList :many -SELECT name, created_at, metadata, paused_at, updated_at +SELECT name, created_at, json(metadata), paused_at, updated_at FROM /* TEMPLATE: schema */river_queue ORDER BY name ASC LIMIT ?1 @@ -257,15 +257,15 @@ func (q *Queries) QueueResume(ctx context.Context, db DBTX, arg *QueueResumePara const queueUpdate = `-- name: QueueUpdate :one UPDATE /* TEMPLATE: schema */river_queue SET - metadata = CASE WHEN cast(?1 AS boolean) THEN json(cast(?2 AS blob)) ELSE metadata END, + metadata = CASE WHEN cast(?1 AS boolean) THEN jsonb(?2) ELSE metadata END, updated_at = datetime('now', 'subsec') WHERE name = ?3 -RETURNING name, created_at, metadata, paused_at, updated_at +RETURNING name, created_at, json(metadata), paused_at, updated_at ` type QueueUpdateParams struct { MetadataDoUpdate bool - Metadata []byte + Metadata interface{} Name string } diff --git a/riverdriver/riversqlite/internal/dbsqlc/sqlc.yaml b/riverdriver/riversqlite/internal/dbsqlc/sqlc.yaml index dd86a267..60b11427 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/sqlc.yaml +++ b/riverdriver/riversqlite/internal/dbsqlc/sqlc.yaml @@ -32,3 +32,10 @@ sql: ttl: "TTL" overrides: + - db_type: "jsonb" + go_type: + type: "[]byte" + - db_type: "jsonb" + go_type: + type: "[]byte" + nullable: true diff --git a/riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.down.sql b/riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.down.sql new file mode 100644 index 00000000..8ed3d8ab --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.down.sql @@ -0,0 +1,60 @@ +-- +-- Convert JSONB binary columns back to JSON text format and restore json() +-- defaults. +-- + +-- +-- river_job +-- + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN args TO _args_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN args blob NOT NULL DEFAULT '{}'; +UPDATE /* TEMPLATE: schema */river_job SET args = json(_args_old); +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _args_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN attempted_by TO _attempted_by_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN attempted_by blob; +UPDATE /* TEMPLATE: schema */river_job SET attempted_by = json(_attempted_by_old) WHERE _attempted_by_old IS NOT NULL; +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _attempted_by_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN errors TO _errors_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN errors blob; +UPDATE /* TEMPLATE: schema */river_job SET errors = json(_errors_old) WHERE _errors_old IS NOT NULL; +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _errors_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN metadata blob NOT NULL DEFAULT (json('{}')); +UPDATE /* TEMPLATE: schema */river_job SET metadata = json(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _metadata_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN tags TO _tags_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN tags blob NOT NULL DEFAULT (json('[]')); +UPDATE /* TEMPLATE: schema */river_job SET tags = json(_tags_old); +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _tags_old; + +-- +-- river_queue +-- + +ALTER TABLE /* TEMPLATE: schema */river_queue RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_queue ADD COLUMN metadata blob NOT NULL DEFAULT (json('{}')); +UPDATE /* TEMPLATE: schema */river_queue SET metadata = json(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_queue DROP COLUMN _metadata_old; + +-- +-- river_client +-- + +ALTER TABLE /* TEMPLATE: schema */river_client RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_client ADD COLUMN metadata blob NOT NULL DEFAULT (json('{}')); +UPDATE /* TEMPLATE: schema */river_client SET metadata = json(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_client DROP COLUMN _metadata_old; + +-- +-- river_client_queue +-- + +ALTER TABLE /* TEMPLATE: schema */river_client_queue RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_client_queue ADD COLUMN metadata blob NOT NULL DEFAULT (json('{}')); +UPDATE /* TEMPLATE: schema */river_client_queue SET metadata = json(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_client_queue DROP COLUMN _metadata_old; diff --git a/riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.up.sql b/riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.up.sql new file mode 100644 index 00000000..97e98211 --- /dev/null +++ b/riverdriver/riversqlite/migration/main/007_sqlite_json_to_jsonb.up.sql @@ -0,0 +1,63 @@ +-- +-- Convert JSON text columns to JSONB binary format for more efficient storage +-- and processing, and update column defaults from json() to jsonb(). +-- +-- Uses a rename-and-replace strategy to avoid full table rebuilds: rename the +-- old column, add a new one with the correct default, copy data, drop the old. +-- + +-- +-- river_job +-- + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN args TO _args_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN args blob NOT NULL DEFAULT (jsonb('{}')); +UPDATE /* TEMPLATE: schema */river_job SET args = jsonb(_args_old); +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _args_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN attempted_by TO _attempted_by_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN attempted_by blob; +UPDATE /* TEMPLATE: schema */river_job SET attempted_by = jsonb(_attempted_by_old) WHERE _attempted_by_old IS NOT NULL; +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _attempted_by_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN errors TO _errors_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN errors blob; +UPDATE /* TEMPLATE: schema */river_job SET errors = jsonb(_errors_old) WHERE _errors_old IS NOT NULL; +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _errors_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN metadata blob NOT NULL DEFAULT (jsonb('{}')); +UPDATE /* TEMPLATE: schema */river_job SET metadata = jsonb(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _metadata_old; + +ALTER TABLE /* TEMPLATE: schema */river_job RENAME COLUMN tags TO _tags_old; +ALTER TABLE /* TEMPLATE: schema */river_job ADD COLUMN tags blob NOT NULL DEFAULT (jsonb('[]')); +UPDATE /* TEMPLATE: schema */river_job SET tags = jsonb(_tags_old); +ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN _tags_old; + +-- +-- river_queue +-- + +ALTER TABLE /* TEMPLATE: schema */river_queue RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_queue ADD COLUMN metadata blob NOT NULL DEFAULT (jsonb('{}')); +UPDATE /* TEMPLATE: schema */river_queue SET metadata = jsonb(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_queue DROP COLUMN _metadata_old; + +-- +-- river_client +-- + +ALTER TABLE /* TEMPLATE: schema */river_client RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_client ADD COLUMN metadata blob NOT NULL DEFAULT (jsonb('{}')); +UPDATE /* TEMPLATE: schema */river_client SET metadata = jsonb(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_client DROP COLUMN _metadata_old; + +-- +-- river_client_queue +-- + +ALTER TABLE /* TEMPLATE: schema */river_client_queue RENAME COLUMN metadata TO _metadata_old; +ALTER TABLE /* TEMPLATE: schema */river_client_queue ADD COLUMN metadata blob NOT NULL DEFAULT (jsonb('{}')); +UPDATE /* TEMPLATE: schema */river_client_queue SET metadata = jsonb(_metadata_old); +ALTER TABLE /* TEMPLATE: schema */river_client_queue DROP COLUMN _metadata_old; diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 132d0e46..9f14cba8 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -439,9 +439,9 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel // //nolint:gochecknoglobals var jobGetAvailableAttemptedBySQL = strings.TrimSpace(` - json_insert( + jsonb_insert( ( - SELECT json_group_array(value) + SELECT jsonb_group_array(value) FROM ( SELECT * FROM (