From 0806d84a5dfc7b1606364ad50dd9c4e53a1ad7d6 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 11 Apr 2026 22:49:27 -0500 Subject: [PATCH] Add hook `HookQueueStateCount` + read middleware as hooks and vice versa Here, add a new hook called `HookQueueStateCount` which gets invoked to produce job queue count statistics. We do this by adding a new maintenance service which like other maintenance services, runs only on the leader, so we only have one client performing counts at any given time. Furthermore, in order to not introduce a potential operational problem without opt-in from River users, the counts only run if a `HookQueueStateCount` hook/middleware has been added to the client. The reason we do all this to to implement a feature requested by one of users: for `otelriver` in contrib to be able to emit queue count metrics, which seems like a pretty reasonable ask for the package to be able to do, and something that every River user would likely want access to in their ops charts. A slight oddity, but which I think is _probably_ okay, is that the new hook ideally stays a hook, but the existing `otelriver` middleware is a middleware. It'd be nice not to have to put `otelriver.Middleware` into both a client's `Hooks` and `Middleware` configuration, so we modify client to allow for hooks that middleware and middleware which are hooks. This lets `otelriver.Middleware` continue doing what it was already doing, but also to start producing new counts as a hook. --- CHANGELOG.md | 5 + client.go | 55 +++++- client_test.go | 69 +++++++ hook_defaults_funcs.go | 10 + internal/hooklookup/hook_lookup.go | 7 + internal/maintenance/queue_state_counter.go | 183 ++++++++++++++++++ .../maintenance/queue_state_counter_test.go | 161 +++++++++++++++ riverdriver/river_driver_interface.go | 5 +- .../internal/dbsqlc/river_job.sql.go | 60 ++---- .../river_database_sql_driver.go | 32 ++- riverdriver/riverdrivertest/job_read.go | 66 ++----- .../riverpgxv5/internal/dbsqlc/river_job.sql | 52 ++--- .../internal/dbsqlc/river_job.sql.go | 60 ++---- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 32 ++- .../riversqlite/internal/dbsqlc/river_job.sql | 27 ++- .../internal/dbsqlc/river_job.sql.go | 35 ++-- .../riversqlite/river_sqlite_driver.go | 56 +++--- rivertype/river_type.go | 29 +++ 18 files changed, 692 insertions(+), 252 deletions(-) create mode 100644 internal/maintenance/queue_state_counter.go create mode 100644 internal/maintenance/queue_state_counter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 159020b0..d2c03ff0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added new `HookQueueStateCount` hook which is run by a River leader to generate queue count statistics. [PR #1203](https://github.com/riverqueue/river/pull/1203). +- Middleware that implements `rivertype.Hook` can be looked up as hooks even if passed into `Config.Middleware`. Similarly, hooks that implement `rivertype.Middleware` can be looked up as middleware even if passed into `Config.Hooks`. [PR #1203](https://github.com/riverqueue/river/pull/1203). + ## [0.35.0] - 2026-04-18 ### Changed diff --git a/client.go b/client.go index b60addcd..dae727a1 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "regexp" + "slices" "strings" "sync" "time" @@ -657,6 +658,7 @@ type clientTestSignals struct { periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals queueCleaner *maintenance.QueueCleanerTestSignals queueMaintainerLeader *maintenance.QueueMaintainerLeaderTestSignals + queueStateCounter *maintenance.QueueStateCounterTestSignals reindexer *maintenance.ReindexerTestSignals } @@ -679,6 +681,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) { if ts.queueMaintainerLeader != nil { ts.queueMaintainerLeader.Init(tb) } + if ts.queueStateCounter != nil { + ts.queueStateCounter.Init(tb) + } if ts.reindexer != nil { ts.reindexer.Init(tb) } @@ -759,7 +764,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client config: config, driver: driver, hookLookupByJob: hooklookup.NewJobHookLookup(), - hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks), + hookLookupGlobal: nil, // initialized below after cross-referencing with middleware producersByQueueName: make(map[string]*producer), testSignals: clientTestSignals{}, workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up @@ -780,9 +785,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // the more abstract config.Middleware for middleware are set, but not both, // so in practice we never append all three of these to each other. { - middleware := config.Middleware + middlewares := config.Middleware for _, jobInsertMiddleware := range config.JobInsertMiddleware { - middleware = append(middleware, jobInsertMiddleware) + middlewares = append(middlewares, jobInsertMiddleware) } outerLoop: for _, workerMiddleware := range config.WorkerMiddleware { @@ -798,16 +803,44 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } } - middleware = append(middleware, workerMiddleware) + middlewares = append(middlewares, workerMiddleware) } - for _, middleware := range middleware { + for _, middleware := range middlewares { if withBaseService, ok := middleware.(baseservice.WithBaseService); ok { baseservice.Init(archetype, withBaseService) } } - client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware) + // Cross-reference hooks and middleware: any middleware that also + // implements Hook is added to hooks, and any hook that also implements + // Middleware is added to middleware. Deduplication prevents double + // registration when the same struct is passed to both Config.Hooks and + // Config.Middleware. + hooks := config.Hooks + + for _, middleware := range middlewares { + if hook, ok := middleware.(rivertype.Hook); ok { + // Only add if this middleware isn't already in hooks (it may + // have been passed to both config properties). + alreadyInHooks := slices.Contains(hooks, hook) + if !alreadyInHooks { + hooks = append(hooks, hook) + } + } + } + + for _, hook := range config.Hooks { + if middleware, ok := hook.(rivertype.Middleware); ok { + alreadyInMiddleware := slices.Contains(middlewares, middleware) + if !alreadyInMiddleware { + middlewares = append(middlewares, middleware) + } + } + } + + client.hookLookupGlobal = hooklookup.NewHookLookup(hooks) + client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middlewares) } pluginDriver, _ := driver.(driverPlugin[TTx]) @@ -961,6 +994,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.testSignals.queueCleaner = &queueCleaner.TestSignals } + { + queueStateCounter := maintenance.NewQueueStateCounter(archetype, &maintenance.QueueStateCounterConfig{ + HookLookupGlobal: client.hookLookupGlobal, + QueueNames: maputil.Keys(config.Queues), + Schema: config.Schema, + }, driver.GetExecutor()) + maintenanceServices = append(maintenanceServices, queueStateCounter) + client.testSignals.queueStateCounter = &queueStateCounter.TestSignals + } + { var scheduleFunc func(time.Time) time.Time if config.ReindexerSchedule != nil { diff --git a/client_test.go b/client_test.go index f8d035c4..ce1ff215 100644 --- a/client_test.go +++ b/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/middlewarelookup" @@ -7628,6 +7629,46 @@ func Test_NewClient_Validations(t *testing.T) { require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) }, }, + { + name: "Middleware implementing Hook is available in hook lookup", + configFunc: func(config *Config) { + config.Middleware = []rivertype.Middleware{&middlewareWithHook{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1) + }, + }, + { + name: "Hook implementing Middleware is available in middleware lookup", + configFunc: func(config *Config) { + config.Hooks = []rivertype.Hook{&hookWithMiddleware{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Hook implementing Middleware in both configs is deduplicated in middleware lookup", + configFunc: func(config *Config) { + hm := &hookWithMiddleware{} + config.Hooks = []rivertype.Hook{hm} + config.Middleware = []rivertype.Middleware{hm} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Middleware implementing Hook in both configs is deduplicated in hook lookup", + configFunc: func(config *Config) { + mh := &middlewareWithHook{} + config.Hooks = []rivertype.Hook{mh} + config.Middleware = []rivertype.Middleware{mh} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1) + }, + }, { name: "Middleware not allowed with JobInsertMiddleware", configFunc: func(config *Config) { @@ -8627,3 +8668,31 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook { func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil } func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil } + +// middlewareWithHook is a middleware that also implements HookWorkBegin, +// used to test cross-pollination from middleware to hooks. +type middlewareWithHook struct { + MiddlewareDefaults +} + +func (m *middlewareWithHook) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return doInner(ctx) +} + +func (m *middlewareWithHook) IsHook() bool { return true } + +func (m *middlewareWithHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + return nil +} + +// hookWithMiddleware is a hook that also implements WorkerMiddleware, +// used to test cross-pollination from hooks to middleware. +type hookWithMiddleware struct { + HookDefaults +} + +func (h *hookWithMiddleware) IsMiddleware() bool { return true } + +func (h *hookWithMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return doInner(ctx) +} diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 93a7f4fe..2b6aee8c 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -33,6 +33,16 @@ func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype. return f(ctx, params) } +// HookQueueStateCountFunc is a convenience helper for implementing +// rivertype.HookQueueStateCount using a simple function instead of a struct. +type HookQueueStateCountFunc func(ctx context.Context, params *rivertype.HookQueueStateCountParams) + +func (f HookQueueStateCountFunc) IsHook() bool { return true } + +func (f HookQueueStateCountFunc) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) { + f(ctx, params) +} + // HookWorkBeginFunc is a convenience helper for implementing // rivertype.HookWorkBegin using a simple function instead of a struct. type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error diff --git a/internal/hooklookup/hook_lookup.go b/internal/hooklookup/hook_lookup.go index 420debb2..9dfd3a04 100644 --- a/internal/hooklookup/hook_lookup.go +++ b/internal/hooklookup/hook_lookup.go @@ -15,6 +15,7 @@ type HookKind string const ( HookKindInsertBegin HookKind = "insert_begin" HookKindPeriodicJobsStart HookKind = "periodic_job_start" + HookKindQueueStateCount HookKind = "queue_state_count" HookKindWorkBegin HookKind = "work_begin" HookKindWorkEnd HookKind = "work_end" ) @@ -90,6 +91,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook { c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) } } + case HookKindQueueStateCount: + for _, hook := range c.hooks { + if typedHook, ok := hook.(rivertype.HookQueueStateCount); ok { + c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) + } + } case HookKindWorkBegin: for _, hook := range c.hooks { if typedHook, ok := hook.(rivertype.HookWorkBegin); ok { diff --git a/internal/maintenance/queue_state_counter.go b/internal/maintenance/queue_state_counter.go new file mode 100644 index 00000000..0b82b51e --- /dev/null +++ b/internal/maintenance/queue_state_counter.go @@ -0,0 +1,183 @@ +package maintenance + +import ( + "cmp" + "context" + "errors" + "log/slog" + "time" + + "github.com/riverqueue/river/internal/hooklookup" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riversharedmaintenance" + "github.com/riverqueue/river/rivershared/startstop" + "github.com/riverqueue/river/rivershared/testsignal" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivershared/util/timeutil" + "github.com/riverqueue/river/rivertype" +) + +const QueueStateCounterIntervalDefault = 10 * time.Second + +var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals + +// QueueStateCounterTestSignals are internal signals used exclusively in tests. +type QueueStateCounterTestSignals struct { + CountedOnce testsignal.TestSignal[struct{}] // notifies when a count pass finishes +} + +func (ts *QueueStateCounterTestSignals) Init(tb testutil.TestingTB) { + ts.CountedOnce.Init(tb) +} + +type QueueStateCounterConfig struct { + // HookLookupGlobal provides access to globally registered hooks. + HookLookupGlobal hooklookup.HookLookupInterface + + // Interval is the amount of time between count runs. + Interval time.Duration + + // QueueNames is the list of configured queue names. Counts are emitted for + // all of these queues even if they have no jobs, with zero counts for all + // states. + QueueNames []string + + // Schema where River tables are located. Empty string omits schema, causing + // Postgres to default to `search_path`. + Schema string +} + +func (c *QueueStateCounterConfig) mustValidate() *QueueStateCounterConfig { + if c.Interval <= 0 { + panic("QueueStateCounterConfig.Interval must be above zero") + } + + return c +} + +// QueueStateCounter periodically counts jobs by queue and state, logging the +// results. This provides visibility into queue health without requiring +// external monitoring queries. The maintenance service only runs if there is a +// HookQueueStateCount hook registered that consumes the counts. +type QueueStateCounter struct { + riversharedmaintenance.QueueMaintainerServiceBase + startstop.BaseStartStop + + // exported for test purposes + Config *QueueStateCounterConfig + TestSignals QueueStateCounterTestSignals + + exec riverdriver.Executor +} + +func NewQueueStateCounter(archetype *baseservice.Archetype, config *QueueStateCounterConfig, exec riverdriver.Executor) *QueueStateCounter { + return baseservice.Init(archetype, &QueueStateCounter{ + Config: (&QueueStateCounterConfig{ + HookLookupGlobal: config.HookLookupGlobal, + Interval: cmp.Or(config.Interval, QueueStateCounterIntervalDefault), + QueueNames: config.QueueNames, + Schema: config.Schema, + }).mustValidate(), + exec: exec, + }) +} + +func (s *QueueStateCounter) Start(ctx context.Context) error { + ctx, shouldStart, started, stopped := s.StartInit(ctx) + if !shouldStart { + return nil + } + + s.StaggerStart(ctx) + + go func() { + started() + defer stopped() // this defer should come first so it's last out + + s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) + defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) + + // If no hooks are registered, there's no one to send counts to, so + // start, but don't do anything. + if len(s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount)) < 1 { + <-ctx.Done() + return + } + + ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if err := s.runOnce(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + s.Logger.ErrorContext(ctx, s.Name+": Error counting queue states", slog.String("error", err.Error())) + } + } + } + }() + + return nil +} + +func (s *QueueStateCounter) runOnce(ctx context.Context) error { + ctx, cancelFunc := context.WithTimeout(ctx, riversharedmaintenance.TimeoutDefault) + defer cancelFunc() + + rawResults, err := s.exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: s.Config.QueueNames, + Schema: s.Config.Schema, + }) + if err != nil { + return err + } + + byQueue := s.buildResults(ctx, rawResults) + + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount) { + hook.(rivertype.HookQueueStateCount).QueueStateCount(ctx, &rivertype.HookQueueStateCountParams{ //nolint:forcetypeassert + ByQueue: byQueue, + }) + } + + s.TestSignals.CountedOnce.Signal(struct{}{}) + + return nil +} + +// buildResults converts raw driver counts into results with all configured +// queues and all job states filled in (zeroed where needed), logging one line +// per queue. +func (s *QueueStateCounter) buildResults(ctx context.Context, rawResults []*riverdriver.JobCountByQueueAndStateResult) map[string]rivertype.HookQueueStateCountResult { + countsByQueue := make(map[string]rivertype.HookQueueStateCountResult, len(rawResults)) + + for _, result := range rawResults { + stateCounts := result.States + + attrs := make([]slog.Attr, 0, 2+len(jobStateAll)) + attrs = append(attrs, slog.String("queue", result.Queue)) + total := 0 + + for _, state := range jobStateAll { + if _, ok := stateCounts[state]; !ok { + stateCounts[state] = 0 + } + total += stateCounts[state] + attrs = append(attrs, slog.Int(string(state), stateCounts[state])) + } + + attrs = append(attrs, slog.Int("total", total)) + s.Logger.LogAttrs(ctx, slog.LevelInfo, s.Name+": Queue state counts", attrs...) + + countsByQueue[result.Queue] = rivertype.HookQueueStateCountResult{ + ByState: stateCounts, + Total: total, + } + } + + return countsByQueue +} diff --git a/internal/maintenance/queue_state_counter_test.go b/internal/maintenance/queue_state_counter_test.go new file mode 100644 index 00000000..257776ad --- /dev/null +++ b/internal/maintenance/queue_state_counter_test.go @@ -0,0 +1,161 @@ +package maintenance + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/hooklookup" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/startstoptest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivertype" +) + +func TestQueueStateCounter(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + } + + setup := func(t *testing.T, hooks []rivertype.Hook, queueNames []string) (*QueueStateCounter, *testBundle) { + t.Helper() + + var ( + tx = riverdbtest.TestTxPgx(ctx, t) + exec = riverpgxv5.New(nil).UnwrapExecutor(tx) + ) + + svc := NewQueueStateCounter( + riversharedtest.BaseServiceArchetype(t), + &QueueStateCounterConfig{ + HookLookupGlobal: hooklookup.NewHookLookup(hooks), + QueueNames: queueNames, + }, + exec, + ) + svc.StaggerStartupDisable(true) + svc.TestSignals.Init(t) + t.Cleanup(svc.Stop) + + return svc, &testBundle{exec: exec} + } + + t.Run("CountsJobsByQueueAndState", func(t *testing.T) { + t.Parallel() + + hook := &capturingQueueStateCountHook{} + svc, bundle := setup(t, []rivertype.Hook{hook}, []string{"queue1", "queue2", "queue_empty"}) + + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) + + require.NoError(t, svc.Start(ctx)) + svc.TestSignals.CountedOnce.WaitOrTimeout() + + counts := hook.LastParams().ByQueue + + // All three configured queues are present, including queue_empty + // which has no jobs. + require.Len(t, counts, 3) + + // All job states are present for each queue, even those with zero jobs. + for _, queue := range []string{"queue1", "queue2", "queue_empty"} { + require.Len(t, counts[queue].ByState, len(rivertype.JobStates()), "queue %q should have all states", queue) + } + + require.Equal(t, 2, counts["queue1"].ByState[rivertype.JobStateAvailable]) + require.Equal(t, 1, counts["queue1"].ByState[rivertype.JobStateRunning]) + require.Equal(t, 0, counts["queue1"].ByState[rivertype.JobStateCompleted]) + require.Equal(t, 3, counts["queue1"].Total) + + require.Equal(t, 1, counts["queue2"].ByState[rivertype.JobStateAvailable]) + require.Equal(t, 2, counts["queue2"].ByState[rivertype.JobStateCompleted]) + require.Equal(t, 1, counts["queue2"].ByState[rivertype.JobStateDiscarded]) + require.Equal(t, 0, counts["queue2"].ByState[rivertype.JobStateRunning]) + require.Equal(t, 4, counts["queue2"].Total) + + // queue_empty has all states present, all zero. + for _, state := range rivertype.JobStates() { + require.Equal(t, 0, counts["queue_empty"].ByState[state], "queue_empty[%s] should be 0", state) + } + require.Equal(t, 0, counts["queue_empty"].Total) + }) + + t.Run("NoopsWithoutHooks", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, nil, nil) + + // With no hooks, Start returns immediately without starting the service. + require.NoError(t, svc.Start(ctx)) + + // Service should not have started, so calling Start again should also + // succeed (StartInit would return false if it were already running). + require.NoError(t, svc.Start(ctx)) + }) + + t.Run("StartStop", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, []rivertype.Hook{&capturingQueueStateCountHook{}}, nil) + + require.NoError(t, svc.Start(ctx)) + svc.TestSignals.CountedOnce.WaitOrTimeout() + }) + + t.Run("StartStopStress", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, []rivertype.Hook{&capturingQueueStateCountHook{}}, nil) + svc.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + svc.TestSignals = QueueStateCounterTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, svc) + }) + + t.Run("StartStopStressNoHooks", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, nil, nil) + svc.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + svc.TestSignals = QueueStateCounterTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, svc) + }) +} + +// capturingQueueStateCountHook captures the last params received from the hook +// invocation. +type capturingQueueStateCountHook struct { + mu sync.Mutex + lastParams *rivertype.HookQueueStateCountParams +} + +func (h *capturingQueueStateCountHook) IsHook() bool { return true } + +func (h *capturingQueueStateCountHook) QueueStateCount(_ context.Context, params *rivertype.HookQueueStateCountParams) { + h.mu.Lock() + defer h.mu.Unlock() + h.lastParams = params +} + +func (h *capturingQueueStateCountHook) LastParams() *rivertype.HookQueueStateCountParams { + h.mu.Lock() + defer h.mu.Unlock() + return h.lastParams +} diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 8727a6e3..efcb7cc6 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -360,9 +360,8 @@ type JobCountByQueueAndStateParams struct { } type JobCountByQueueAndStateResult struct { - CountAvailable int64 - CountRunning int64 - Queue string + Queue string + States map[rivertype.JobState]int } type JobCountByStateParams struct { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index ac01941a..d4fe28ce 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -132,48 +132,28 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest($1::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'available' - GROUP BY queue -) - -SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC +SELECT queue, 'available'::text AS state, COUNT(*) AS count FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'available' GROUP BY queue +UNION ALL +SELECT queue, 'cancelled', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'cancelled' GROUP BY queue +UNION ALL +SELECT queue, 'completed', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'completed' GROUP BY queue +UNION ALL +SELECT queue, 'discarded', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'discarded' GROUP BY queue +UNION ALL +SELECT queue, 'pending', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'pending' GROUP BY queue +UNION ALL +SELECT queue, 'retryable', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'retryable' GROUP BY queue +UNION ALL +SELECT queue, 'running', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'running' GROUP BY queue +UNION ALL +SELECT queue, 'scheduled', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'scheduled' GROUP BY queue +ORDER BY queue, state ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State string + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -185,7 +165,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index d8a2d445..ad0f89c0 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -227,14 +227,34 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - Queue: row.Queue, + + rowsByQueue := make(map[string]*riverdriver.JobCountByQueueAndStateResult, len(params.QueueNames)) + for _, row := range rows { + result, ok := rowsByQueue[row.Queue] + if !ok { + result = &riverdriver.JobCountByQueueAndStateResult{ + Queue: row.Queue, + States: make(map[rivertype.JobState]int), + } + rowsByQueue[row.Queue] = result + } + result.States[rivertype.JobState(row.State)] = int(row.Count) + } + + // Build results in the order of requested queue names, filling in zero + // counts for queues with no jobs. + results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(params.QueueNames)) + for _, queueName := range params.QueueNames { + if result, ok := rowsByQueue[queueName]; ok { + results = append(results, result) + } else { + results = append(results, &riverdriver.JobCountByQueueAndStateResult{ + Queue: queueName, + States: make(map[rivertype.JobState]int), + }) } } + return results, nil } diff --git a/riverdriver/riverdrivertest/job_read.go b/riverdriver/riverdrivertest/job_read.go index 5649d910..b9a2c408 100644 --- a/riverdriver/riverdrivertest/job_read.go +++ b/riverdriver/riverdrivertest/job_read.go @@ -86,7 +86,7 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx t.Run("JobCountByQueueAndState", func(t *testing.T) { t.Parallel() - t.Run("CountsJobsInAvailableAndRunningForEachOfTheSpecifiedQueues", func(t *testing.T) { + t.Run("CountsAllStatesForSpecifiedQueues", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) @@ -96,75 +96,47 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue3"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue3"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) - countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + results, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ QueueNames: []string{"queue1", "queue2"}, Schema: "", }) require.NoError(t, err) - require.Len(t, countsByQueue, 2) + require.Len(t, results, 2) - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(2), countsByQueue[0].CountAvailable) - require.Equal(t, int64(3), countsByQueue[0].CountRunning) - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) - }) - - t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) + require.Equal(t, "queue1", results[0].Queue) + require.Equal(t, 2, results[0].States[rivertype.JobStateAvailable]) + require.Equal(t, 3, results[0].States[rivertype.JobStateRunning]) + require.Equal(t, 1, results[0].States[rivertype.JobStateCompleted]) + require.Equal(t, 0, results[0].States[rivertype.JobStateCancelled]) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) - - countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ - QueueNames: []string{"queue1", "queue2"}, - Schema: "", - }) - require.NoError(t, err) - - require.Len(t, countsByQueue, 2) - - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(0), countsByQueue[0].CountAvailable) - require.Equal(t, int64(0), countsByQueue[0].CountRunning) - - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) + require.Equal(t, "queue2", results[1].Queue) + require.Equal(t, 1, results[1].States[rivertype.JobStateAvailable]) + require.Equal(t, 1, results[1].States[rivertype.JobStateRunning]) }) - t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) { + t.Run("ExcludesQueuesNotRequested", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) - countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ - QueueNames: []string{"queue2", "queue1", "queue1"}, + results, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: []string{"queue1"}, Schema: "", }) require.NoError(t, err) - require.Len(t, countsByQueue, 2) - - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(0), countsByQueue[0].CountAvailable) - require.Equal(t, int64(0), countsByQueue[0].CountRunning) - - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) + require.Len(t, results, 1) + require.Equal(t, "queue1", results[0].Queue) + require.Equal(t, 1, results[0].States[rivertype.JobStateAvailable]) }) }) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index dde85de4..e6a30fe1 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -86,42 +86,22 @@ FROM /* TEMPLATE: schema */ river_job GROUP BY state; -- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest(@queue_names::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY(@queue_names::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY(@queue_names::text[]) - AND state = 'available' - GROUP BY queue -) - -SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC; +SELECT queue, 'available'::text AS state, COUNT(*) AS count FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'available' GROUP BY queue +UNION ALL +SELECT queue, 'cancelled', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'cancelled' GROUP BY queue +UNION ALL +SELECT queue, 'completed', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'completed' GROUP BY queue +UNION ALL +SELECT queue, 'discarded', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'discarded' GROUP BY queue +UNION ALL +SELECT queue, 'pending', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'pending' GROUP BY queue +UNION ALL +SELECT queue, 'retryable', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'retryable' GROUP BY queue +UNION ALL +SELECT queue, 'running', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'running' GROUP BY queue +UNION ALL +SELECT queue, 'scheduled', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY(@queue_names::text[]) AND state = 'scheduled' GROUP BY queue +ORDER BY queue, state; -- name: JobCountByState :one SELECT count(*) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 0be29561..d383fbfb 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -129,48 +129,28 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest($1::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'available' - GROUP BY queue -) - -SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC +SELECT queue, 'available'::text AS state, COUNT(*) AS count FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'available' GROUP BY queue +UNION ALL +SELECT queue, 'cancelled', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'cancelled' GROUP BY queue +UNION ALL +SELECT queue, 'completed', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'completed' GROUP BY queue +UNION ALL +SELECT queue, 'discarded', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'discarded' GROUP BY queue +UNION ALL +SELECT queue, 'pending', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'pending' GROUP BY queue +UNION ALL +SELECT queue, 'retryable', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'retryable' GROUP BY queue +UNION ALL +SELECT queue, 'running', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'running' GROUP BY queue +UNION ALL +SELECT queue, 'scheduled', COUNT(*) FROM /* TEMPLATE: schema */river_job WHERE queue = ANY($1::text[]) AND state = 'scheduled' GROUP BY queue +ORDER BY queue, state ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State string + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -182,7 +162,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index e0201a0e..d9356a5e 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -235,14 +235,34 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - Queue: row.Queue, + + rowsByQueue := make(map[string]*riverdriver.JobCountByQueueAndStateResult, len(params.QueueNames)) + for _, row := range rows { + result, ok := rowsByQueue[row.Queue] + if !ok { + result = &riverdriver.JobCountByQueueAndStateResult{ + Queue: row.Queue, + States: make(map[rivertype.JobState]int), + } + rowsByQueue[row.Queue] = result + } + result.States[rivertype.JobState(row.State)] = int(row.Count) + } + + // Build results in the order of requested queue names, filling in zero + // counts for queues with no jobs. + results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(params.QueueNames)) + for _, queueName := range params.QueueNames { + if result, ok := rowsByQueue[queueName]; ok { + results = append(results, result) + } else { + results = append(results, &riverdriver.JobCountByQueueAndStateResult{ + Queue: queueName, + States: make(map[rivertype.JobState]int), + }) } } + return results, nil } diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..7fca6b2e 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -55,23 +55,20 @@ SELECT state, count(*) FROM /* TEMPLATE: schema */river_job GROUP BY state; +-- The Postgres drivers use a UNION ALL with one sub-select per state to +-- leverage the (state, queue) index, but sqlc's SQLite parser can't handle +-- sqlc.slice referenced multiple times (only the first is expanded) or +-- json_each (column resolution fails). GROUP BY queue, state is the best we +-- can do through sqlc for SQLite, and is fine given SQLite is local. -- name: JobCountByQueueAndState :many -WITH queue_stats AS ( - SELECT - river_job.queue, - COUNT(CASE WHEN river_job.state = 'available' THEN 1 END) AS count_available, - COUNT(CASE WHEN river_job.state = 'running' THEN 1 END) AS count_running - FROM /* TEMPLATE: schema */river_job - WHERE river_job.queue IN (sqlc.slice('queue_names')) - GROUP BY river_job.queue -) - SELECT - cast(queue AS text) AS queue, - count_available, - count_running -FROM queue_stats -ORDER BY queue ASC; + cast(river_job.queue AS text) AS queue, + cast(river_job.state AS text) AS state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE river_job.queue IN (sqlc.slice('queue_names')) +GROUP BY river_job.queue, river_job.state +ORDER BY river_job.queue ASC, river_job.state ASC; -- name: JobCountByState :one SELECT count(*) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 1963523d..75ceb469 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -103,30 +103,27 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH queue_stats AS ( - SELECT - river_job.queue, - COUNT(CASE WHEN river_job.state = 'available' THEN 1 END) AS count_available, - COUNT(CASE WHEN river_job.state = 'running' THEN 1 END) AS count_running - FROM /* TEMPLATE: schema */river_job - WHERE river_job.queue IN (/*SLICE:queue_names*/?) - GROUP BY river_job.queue -) - SELECT - cast(queue AS text) AS queue, - count_available, - count_running -FROM queue_stats -ORDER BY queue ASC + cast(river_job.queue AS text) AS queue, + cast(river_job.state AS text) AS state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE river_job.queue IN (/*SLICE:queue_names*/?) +GROUP BY river_job.queue, river_job.state +ORDER BY river_job.queue ASC, river_job.state ASC ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State string + Count int64 } +// The Postgres drivers use a UNION ALL with one sub-select per state to +// leverage the (state, queue) index, but sqlc's SQLite parser can't handle +// sqlc.slice referenced multiple times (only the first is expanded) or +// json_each (column resolution fails). GROUP BY queue, state is the best we +// can do through sqlc for SQLite, and is fine given SQLite is local. func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { query := jobCountByQueueAndState var queryParams []interface{} @@ -146,7 +143,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 132d0e46..944300c1 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -296,43 +296,31 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri return nil, interpretError(err) } - // The PostgreSQL drivers implement this query with an `all_queues` CTE and - // LEFT JOINs, so they return one row per requested queue, including queues - // that currently have no jobs. The input queue list is deduplicated in SQL. - // The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`, - // and we haven't found a workable way to bind a parameterized list through - // `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore - // returns only queues with matching rows, and this wrapper fills in missing - // queues to match PostgreSQL behavior. - countsByQueue := make(map[string]struct { - CountAvailable int64 - CountRunning int64 - }, len(rows)) + rowsByQueue := make(map[string]*riverdriver.JobCountByQueueAndStateResult, len(params.QueueNames)) for _, row := range rows { - countsByQueue[row.Queue] = struct { - CountAvailable int64 - CountRunning int64 - }{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - } - } - - queueNames := slices.Clone(params.QueueNames) - slices.Sort(queueNames) - queueNames = slices.Compact(queueNames) - - results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames)) - for _, queueName := range queueNames { - result := &riverdriver.JobCountByQueueAndStateResult{ - Queue: queueName, + result, ok := rowsByQueue[row.Queue] + if !ok { + result = &riverdriver.JobCountByQueueAndStateResult{ + Queue: row.Queue, + States: make(map[rivertype.JobState]int), + } + rowsByQueue[row.Queue] = result } - if counts, ok := countsByQueue[queueName]; ok { - result.CountAvailable = counts.CountAvailable - result.CountRunning = counts.CountRunning + result.States[rivertype.JobState(row.State)] = int(row.Count) + } + + // Build results in the order of requested queue names, filling in zero + // counts for queues with no jobs. + results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(params.QueueNames)) + for _, queueName := range params.QueueNames { + if result, ok := rowsByQueue[queueName]; ok { + results = append(results, result) + } else { + results = append(results, &riverdriver.JobCountByQueueAndStateResult{ + Queue: queueName, + States: make(map[rivertype.JobState]int), + }) } - - results = append(results, result) } return results, nil diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 13d07f63..0233eca4 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -351,6 +351,35 @@ type HookPeriodicJobsStartParams struct { DurableJobs []*DurablePeriodicJob } +// HookQueueStateCount is an interface to a hook that runs each time the queue +// state counter maintenance service performs a count. +type HookQueueStateCount interface { + Hook + + // QueueStateCount is invoked after the queue state counter has completed a + // count of jobs by queue and state. + QueueStateCount(ctx context.Context, params *HookQueueStateCountParams) +} + +// HookQueueStateCountParams are parameters for HookQueueStateCount. +type HookQueueStateCountParams struct { + // ByQueue is a map of queue name to its count results. All queues + // configured with this client are included, even if they don't currently + // contain jobs. + ByQueue map[string]HookQueueStateCountResult +} + +// HookQueueStateCountResult contains coutns for a single queue. +type HookQueueStateCountResult struct { + // ByState is a map of job state to the number of jobs in that state for the + // queue. All job states are included, even if they have no jobs in that + // state. + ByState map[JobState]int + + // Total is the total number of jobs across all states for the queue. + Total int +} + // HookWorkBegin is an interface to a hook that runs after a job has been locked // for work and before it's worked. type HookWorkBegin interface {