Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ jobs:
# latest Postgres version:
- go-version: "1.25"
postgres-version: 18

# Run with MySQL enabled on the latest Go + Postgres to exercise the
# MySQL driver in CI. MySQL tests are opt-in via RIVER_MYSQL_TESTS_ENABLED.
- go-version: "1.26"
postgres-version: 18
mysql-enabled: true
fail-fast: false
timeout-minutes: 5

Expand Down Expand Up @@ -128,7 +134,17 @@ jobs:
- name: Set up database
run: psql -c "CREATE DATABASE river_test" $ADMIN_DATABASE_URL

# MySQL is pre-installed on GitHub-hosted Ubuntu runners. Start the
# service and configure passwordless root access for MySQL-enabled runs.
- name: Start MySQL
if: matrix.mysql-enabled
run: |
sudo systemctl start mysql
mysql -u root -proot -e "ALTER USER 'root'@'localhost' IDENTIFIED BY ''; FLUSH PRIVILEGES;"

- name: Test
env:
RIVER_MYSQL_TESTS_ENABLED: ${{ matrix.mysql-enabled && 'true' || '' }}
run: make test/race

cli:
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ generate/migrations: ## Sync changes of pgxv5 migrations to database/sql
.PHONY: generate/sqlc
generate/sqlc: ## Generate sqlc
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc generate
cd riverdriver/rivermysql/internal/dbsqlc && sqlc generate
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate
cd riverdriver/riversqlite/internal/dbsqlc && sqlc generate

Expand Down Expand Up @@ -101,5 +102,6 @@ verify/migrations: ## Verify synced migrations
.PHONY: verify/sqlc
verify/sqlc: ## Verify generated sqlc
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc diff
cd riverdriver/rivermysql/internal/dbsqlc && sqlc diff
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc diff
cd riverdriver/riversqlite/internal/dbsqlc && sqlc diff
15 changes: 9 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2281,9 +2281,12 @@ type JobListResult struct {
LastCursor *JobListCursor
}

const databaseNameSQLite = "sqlite"
const (
databaseNameMySQL = "mysql"
databaseNameSQLite = "sqlite"
)

var errJobListParamsMetadataNotSupportedSQLite = errors.New("JobListParams.Metadata is not supported on SQLite")
var errJobListParamsMetadataNotSupported = errors.New("JobListParams.Metadata is not supported on MySQL or SQLite")

// JobList returns a paginated list of jobs matching the provided filters. The
// provided context is used for the underlying Postgres query and can be used to
Expand All @@ -2304,8 +2307,8 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
}
params.schema = c.config.Schema

if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
return nil, errJobListParamsMetadataNotSupportedSQLite
if (c.driver.DatabaseName() == databaseNameMySQL || c.driver.DatabaseName() == databaseNameSQLite) && params.metadataCalled {
return nil, errJobListParamsMetadataNotSupported
}

dbParams, err := params.toDBParams()
Expand Down Expand Up @@ -2345,8 +2348,8 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
}
params.schema = c.config.Schema

if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
return nil, errJobListParamsMetadataNotSupportedSQLite
if (c.driver.DatabaseName() == databaseNameMySQL || c.driver.DatabaseName() == databaseNameSQLite) && params.metadataCalled {
return nil, errJobListParamsMetadataNotSupported
}

dbParams, err := params.toDBParams()
Expand Down
3 changes: 3 additions & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use (
./riverdriver/riverdatabasesql
./riverdriver/riverdrivertest
./riverdriver/riverpgxv5
./riverdriver/rivermysql
./riverdriver/riversqlite
./rivershared
./rivertype
)

replace github.com/riverqueue/river/riverdriver/rivermysql v0.35.0 => ./riverdriver/rivermysql
8 changes: 4 additions & 4 deletions job_list_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ func (p *JobListParams) Kinds(kinds ...string) *JobListParams {
//
// https://www.postgresql.org/docs/current/functions-json.html
//
// This function isn't supported in SQLite due to SQLite not having an
// equivalent operator to use, so there's no efficient way to implement it. We
// recommend the use of Where using a condition with a comparison on the `->>`
// operator instead.
// This function isn't supported in MySQL or SQLite due to neither having a
// direct equivalent to Postgres's `@>` operator. We recommend the use of
// [JobListParams.Where] with a database-specific JSON comparison instead (e.g.
// `JSON_EXTRACT` for MySQL, `->>` for SQLite).
func (p *JobListParams) Metadata(json string) *JobListParams {
paramsCopy := p.copy()
paramsCopy.metadataCalled = true
Expand Down
3 changes: 2 additions & 1 deletion riverdbtest/riverdbtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ type TestTxOpts struct {
// run using TestSchema. This is meant for environments where parallelism
// doesn't work as well, like SQLite, which will emit "busy" errors when
// multiple clients try to share a schema, even when they're in separate
// transactions.
// transactions. Also applies to MySQL, where InnoDB deadlocks are common
// when multiple transactions are sharing a database.
DisableSchemaSharing bool

// IsTestTxHelper should be set to true for if TestTx is being called from
Expand Down
7 changes: 7 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ type Driver[TTx any] interface {
// API is not stable. DO NOT USE.
PoolSet(dbPool any) error

// SafeIdentifier returns a safely quoted identifier (e.g. a table or
// schema name) for use in SQL queries. Each driver quotes using its
// native syntax: double quotes for Postgres/SQLite, backticks for MySQL.
//
// API is not stable. DO NOT USE.
SafeIdentifier(ident string) string

// SQLFragmentColumnIn generates an SQL fragment to be included as a
// predicate in a `WHERE` query for the existence of a set of values in a
// column like `id IN (...)`. The actual implementation depends on support
Expand Down
5 changes: 3 additions & 2 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func New(dbPool *sql.DB) *Driver {

const argPlaceholder = "$"

func (d *Driver) ArgPlaceholder() string { return argPlaceholder }
func (d *Driver) DatabaseName() string { return "postgres" }
func (d *Driver) ArgPlaceholder() string { return argPlaceholder }
func (d *Driver) DatabaseName() string { return "postgres" }
func (d *Driver) SafeIdentifier(ident string) string { return dbutil.SafeIdentifier(ident) }

func (d *Driver) GetExecutor() riverdriver.Executor {
return &Executor{d.dbPool, templateReplaceWrapper{d.dbPool, &d.replacer}, d}
Expand Down
41 changes: 33 additions & 8 deletions riverdriver/riverdrivertest/driver_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
"github.com/lib/pq"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/riverdriver/rivermysql"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/riverdriver/riversqlite"
"github.com/riverqueue/river/rivershared/riversharedtest"
Expand Down Expand Up @@ -109,6 +111,26 @@ func TestClientWithDriverRiverLibSQL(t *testing.T) {
)
}

func TestClientWithDriverRiverMySQL(t *testing.T) {
t.Parallel()

riversharedtest.SkipIfMySQLNotEnabled(t)

var (
ctx = context.Background()
dbPool = riversharedtest.DBPoolMySQL(ctx, t)
driver = rivermysql.New(dbPool)
)

ExerciseClient(ctx, t,
func(ctx context.Context, t *testing.T) (riverdriver.Driver[*sql.Tx], string) {
t.Helper()

return driver, riverdbtest.TestSchema(ctx, t, driver, nil)
},
)
}

func TestClientWithDriverRiverSQLiteModernC(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -519,9 +541,9 @@ func ExerciseClient[TTx any](ctx context.Context, t *testing.T,
})

listRes, err := client.JobList(ctx, river.NewJobListParams().Metadata(`{"foo":"bar"}`))
if bundle.driver.DatabaseName() == databaseNameSQLite {
t.Logf("Ignoring unsupported JobListResult.Metadata on SQLite")
require.EqualError(t, err, "JobListParams.Metadata is not supported on SQLite")
if bundle.driver.DatabaseName() == databaseNameMySQL || bundle.driver.DatabaseName() == databaseNameSQLite {
t.Logf("Ignoring unsupported JobListResult.Metadata on %s", bundle.driver.DatabaseName())
require.EqualError(t, err, "JobListParams.Metadata is not supported on MySQL or SQLite")
return
}
require.NoError(t, err)
Expand Down Expand Up @@ -583,9 +605,9 @@ func ExerciseClient[TTx any](ctx context.Context, t *testing.T,
})

listRes, err := client.JobListTx(ctx, tx, river.NewJobListParams().Metadata(`{"foo":"bar"}`))
if bundle.driver.DatabaseName() == databaseNameSQLite {
t.Logf("Ignoring unsupported JobListTxResult.Metadata on SQLite")
require.EqualError(t, err, "JobListParams.Metadata is not supported on SQLite")
if bundle.driver.DatabaseName() == databaseNameMySQL || bundle.driver.DatabaseName() == databaseNameSQLite {
t.Logf("Ignoring unsupported JobListTxResult.Metadata on %s", bundle.driver.DatabaseName())
require.EqualError(t, err, "JobListParams.Metadata is not supported on MySQL or SQLite")
return
}
require.NoError(t, err)
Expand All @@ -607,9 +629,12 @@ func ExerciseClient[TTx any](ctx context.Context, t *testing.T,

listParams := river.NewJobListParams()

if bundle.driver.DatabaseName() == databaseNameSQLite {
switch bundle.driver.DatabaseName() {
case databaseNameSQLite:
listParams = listParams.Where("metadata ->> @json_path = @json_val", river.NamedArgs{"json_path": "$.foo", "json_val": "bar"})
} else {
case databaseNameMySQL:
listParams = listParams.Where("JSON_UNQUOTE(JSON_EXTRACT(metadata, @json_path)) = @json_val", river.NamedArgs{"json_path": "$.foo", "json_val": "bar"})
default:
// "bar" is quoted in this branch because `jsonb_path_query_first` needs to be compared to a JSON value
listParams = listParams.Where("jsonb_path_query_first(metadata, @json_path) = @json_val", river.NamedArgs{"json_path": "$.foo", "json_val": `"bar"`})
}
Expand Down
39 changes: 39 additions & 0 deletions riverdriver/riverdrivertest/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/riverdriver/riverdrivertest"
"github.com/riverqueue/river/riverdriver/rivermysql"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/riverdriver/riversqlite"
"github.com/riverqueue/river/rivershared/riversharedtest"
Expand Down Expand Up @@ -203,6 +205,43 @@ func TestDriverRiverLiteLibSQL(t *testing.T) { //nolint:dupl
})
}

func TestDriverRiverMySQL(t *testing.T) {
t.Parallel()

riversharedtest.SkipIfMySQLNotEnabled(t)

var (
ctx = context.Background()
dbPool = riversharedtest.DBPoolMySQL(ctx, t)
driver = rivermysql.New(dbPool)
)

riverdrivertest.Exercise(ctx, t,
func(ctx context.Context, t *testing.T, opts *riverdbtest.TestSchemaOpts) (riverdriver.Driver[*sql.Tx], string) {
t.Helper()

return driver, riverdbtest.TestSchema(ctx, t, driver, opts)
},
func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[*sql.Tx]) {
t.Helper()

tx, schema := riverdbtest.TestTx(ctx, t, driver, &riverdbtest.TestTxOpts{
// Disable schema sharing to reduce InnoDB deadlocks from
// parallel tests contending on the same database.
DisableSchemaSharing: true,
})

// MySQL has no search_path equivalent, so USE the test schema
// database so that unqualified queries resolve correctly.
if schema != "" {
_, err := tx.ExecContext(ctx, "USE "+schema)
require.NoError(t, err)
}

return driver.UnwrapExecutor(tx), driver
})
}

func TestDriverRiverSQLiteModernC(t *testing.T) { //nolint:dupl
t.Parallel()

Expand Down
13 changes: 9 additions & 4 deletions riverdriver/riverdrivertest/executor_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,13 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,

exec := setup(ctx, t)

require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
_, driver := executorWithTx(ctx, t)
switch driver.DatabaseName() {
case databaseNameMySQL:
require.NoError(t, exec.Exec(ctx, "SELECT CONCAT(?, ?)", "foo", "bar"))
default:
require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
}
})
})

Expand All @@ -166,9 +172,8 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,

{
driver, _ := driverWithSchema(ctx, t, nil)
if driver.DatabaseName() == databaseNameSQLite {
t.Logf("Skipping PGAdvisoryXactLock test for SQLite")
return
if driver.DatabaseName() == databaseNameSQLite || driver.DatabaseName() == databaseNameMySQL {
t.Skipf("Skipping PGAdvisoryXactLock test for %s", driver.DatabaseName())
}
}

Expand Down
3 changes: 3 additions & 0 deletions riverdriver/riverdrivertest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ require (
github.com/lib/pq v1.12.3
github.com/riverqueue/river v0.35.0
github.com/riverqueue/river/riverdriver v0.35.0
github.com/go-sql-driver/mysql v1.9.3
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.35.0
github.com/riverqueue/river/riverdriver/rivermysql v0.35.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.35.0
github.com/riverqueue/river/riverdriver/riversqlite v0.35.0
github.com/riverqueue/river/rivershared v0.35.0
Expand All @@ -27,6 +29,7 @@ require (
require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions riverdriver/riverdrivertest/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
Expand Down
4 changes: 2 additions & 2 deletions riverdriver/riverdrivertest/job_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func exerciseJobDelete[TTx any](ctx context.Context, t *testing.T, executorWithT
// since we only expect to need `queues_excluded` on SQLite (and not
// `queues_included` for the foreseeable future), I've just set
// SQLite to not support `queues_included` for the time being.
if bundle.driver.DatabaseName() == databaseNameSQLite {
t.Logf("Skipping JobDeleteBefore with QueuesIncluded test for SQLite")
if bundle.driver.DatabaseName() == databaseNameSQLite || bundle.driver.DatabaseName() == databaseNameMySQL {
t.Skipf("Skipping JobDeleteBefore with QueuesIncluded test for %s", bundle.driver.DatabaseName())
return
}

Expand Down
4 changes: 2 additions & 2 deletions riverdriver/riverdrivertest/job_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func exerciseJobInsert[TTx any](ctx context.Context, t *testing.T,
_, err := exec.JobInsertFull(ctx, params)
require.Error(t, err)
// two separate error messages here for Postgres and SQLite
require.Regexp(t, `(CHECK constraint failed: finalized_or_finalized_at_null|violates check constraint "finalized_or_finalized_at_null")`, err.Error())
require.Regexp(t, `(CHECK constraint failed: finalized_or_finalized_at_null|violates check constraint "finalized_or_finalized_at_null"|Check constraint 'finalized_or_finalized_at_null' is violated)`, err.Error())
})

t.Run(fmt.Sprintf("CanSetState%sWithFinalizedAt", capitalizeJobState(state)), func(t *testing.T) {
Expand Down Expand Up @@ -749,7 +749,7 @@ func exerciseJobInsert[TTx any](ctx context.Context, t *testing.T,
}))
require.Error(t, err)
// two separate error messages here for Postgres and SQLite
require.Regexp(t, `(CHECK constraint failed: finalized_or_finalized_at_null|violates check constraint "finalized_or_finalized_at_null")`, err.Error())
require.Regexp(t, `(CHECK constraint failed: finalized_or_finalized_at_null|violates check constraint "finalized_or_finalized_at_null"|Check constraint 'finalized_or_finalized_at_null' is violated)`, err.Error())
})
}
})
Expand Down
Loading
Loading