diff --git a/client.go b/client.go index 3f29e545..839d8a01 100644 --- a/client.go +++ b/client.go @@ -206,9 +206,11 @@ type Config struct { // The effect of hooks in this list will depend on the specific hook // interfaces they implement, so for example implementing // rivertype.HookInsertBegin will cause the hook to be invoked before a job - // is inserted, or implementing rivertype.HookWorkBegin will cause it to be - // invoked before a job is worked. Hook structs may implement multiple hook - // interfaces. + // is inserted, implementing rivertype.HookMetricEmit will cause the hook to + // be invoked when River emits a metric, or implementing + // rivertype.HookWorkBegin will cause it to be invoked before a job is + // worked. Hook structs may + // implement multiple hook interfaces. // // Order in this list is significant. A hook that appears first will be // entered before a hook that appears later. For any particular phase, order diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 93a7f4fe..0e0fc93a 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -23,6 +23,16 @@ func (f HookInsertBeginFunc) InsertBegin(ctx context.Context, params *rivertype. func (f HookInsertBeginFunc) IsHook() bool { return true } +// HookMetricEmitFunc is a convenience helper for implementing +// rivertype.HookMetricEmit using a simple function instead of a struct. +type HookMetricEmitFunc func(ctx context.Context, params *rivertype.HookMetricEmitParams) + +func (f HookMetricEmitFunc) IsHook() bool { return true } + +func (f HookMetricEmitFunc) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) { + f(ctx, params) +} + // HookPeriodicJobsStartFunc is a convenience helper for implementing // rivertype.HookPeriodicJobsStart using a simple function instead of a struct. type HookPeriodicJobsStartFunc func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error diff --git a/hook_defaults_funcs_test.go b/hook_defaults_funcs_test.go index 43a20297..32eb3e07 100644 --- a/hook_defaults_funcs_test.go +++ b/hook_defaults_funcs_test.go @@ -11,9 +11,15 @@ var ( _ rivertype.Hook = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil }) _ rivertype.HookInsertBegin = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil }) + _ rivertype.Hook = HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {}) + _ rivertype.HookMetricEmit = HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {}) + _ rivertype.Hook = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil }) _ rivertype.HookPeriodicJobsStart = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil }) _ rivertype.Hook = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil }) _ rivertype.HookWorkBegin = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil }) + + _ rivertype.Hook = HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { return err }) + _ rivertype.HookWorkEnd = HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { return err }) ) diff --git a/internal/hooklookup/hook_lookup.go b/internal/hooklookup/hook_lookup.go index 420debb2..abd53165 100644 --- a/internal/hooklookup/hook_lookup.go +++ b/internal/hooklookup/hook_lookup.go @@ -14,6 +14,7 @@ type HookKind string const ( HookKindInsertBegin HookKind = "insert_begin" + HookKindMetricEmit HookKind = "metric_emit" HookKindPeriodicJobsStart HookKind = "periodic_job_start" HookKindWorkBegin HookKind = "work_begin" HookKindWorkEnd HookKind = "work_end" @@ -84,6 +85,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook { c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) } } + case HookKindMetricEmit: + for _, hook := range c.hooks { + if typedHook, ok := hook.(rivertype.HookMetricEmit); ok { + c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) + } + } case HookKindPeriodicJobsStart: for _, hook := range c.hooks { if typedHook, ok := hook.(rivertype.HookPeriodicJobsStart); ok { diff --git a/internal/hooklookup/hook_lookup_test.go b/internal/hooklookup/hook_lookup_test.go index 4390ca6c..5fc330af 100644 --- a/internal/hooklookup/hook_lookup_test.go +++ b/internal/hooklookup/hook_lookup_test.go @@ -21,6 +21,7 @@ func TestHookLookup(t *testing.T) { return NewHookLookup([]rivertype.Hook{ //nolint:forcetypeassert &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, + &testHookMetricEmit{}, &testHookWorkBegin{}, &testHookWorkEnd{}, }).(*hookLookup), &testBundle{} @@ -35,6 +36,9 @@ func TestHookLookup(t *testing.T) { &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, }, hookLookup.ByHookKind(HookKindInsertBegin)) + require.Equal(t, []rivertype.Hook{ + &testHookMetricEmit{}, + }, hookLookup.ByHookKind(HookKindMetricEmit)) require.Equal(t, []rivertype.Hook{ &testHookInsertAndWorkBegin{}, &testHookWorkBegin{}, @@ -43,13 +47,16 @@ func TestHookLookup(t *testing.T) { &testHookWorkEnd{}, }, hookLookup.ByHookKind(HookKindWorkEnd)) - require.Len(t, hookLookup.hooksByKind, 3) + require.Len(t, hookLookup.hooksByKind, 4) // Repeat lookups to make sure we get the same result. require.Equal(t, []rivertype.Hook{ &testHookInsertAndWorkBegin{}, &testHookInsertBegin{}, }, hookLookup.ByHookKind(HookKindInsertBegin)) + require.Equal(t, []rivertype.Hook{ + &testHookMetricEmit{}, + }, hookLookup.ByHookKind(HookKindMetricEmit)) require.Equal(t, []rivertype.Hook{ &testHookInsertAndWorkBegin{}, &testHookWorkBegin{}, @@ -75,6 +82,7 @@ func TestHookLookup(t *testing.T) { } parallelLookupLoop(HookKindInsertBegin) + parallelLookupLoop(HookKindMetricEmit) parallelLookupLoop(HookKindWorkBegin) parallelLookupLoop(HookKindInsertBegin) parallelLookupLoop(HookKindWorkBegin) @@ -100,6 +108,7 @@ func TestEmptyHookLookup(t *testing.T) { hookLookup, _ := setup(t) require.Nil(t, hookLookup.ByHookKind(HookKindInsertBegin)) + require.Nil(t, hookLookup.ByHookKind(HookKindMetricEmit)) require.Nil(t, hookLookup.ByHookKind(HookKindWorkBegin)) }) } @@ -241,6 +250,17 @@ func (t *testHookInsertBegin) InsertBegin(ctx context.Context, params *rivertype return nil } +// +// testHookMetricEmit +// + +var _ rivertype.HookMetricEmit = &testHookMetricEmit{} + +type testHookMetricEmit struct{ rivertype.Hook } + +func (t *testHookMetricEmit) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) { +} + // // testHookWorkBegin // diff --git a/producer.go b/producer.go index dbd40653..f7285225 100644 --- a/producer.go +++ b/producer.go @@ -181,15 +181,16 @@ type producer struct { // Jobs which are currently being worked. Only used by main goroutine. activeJobs map[int64]*jobexecutor.JobExecutor - completer jobcompleter.JobCompleter - config *producerConfig - id atomic.Int64 // atomic because it's written at startup and read during shutdown - exec riverdriver.Executor - errorHandler jobexecutor.ErrorHandler - fetchLimiter *chanutil.DebouncedChan - state riverpilot.ProducerState - pilot riverpilot.Pilot - workers *Workers + completer jobcompleter.JobCompleter + config *producerConfig + id atomic.Int64 // atomic because it's written at startup and read during shutdown + exec riverdriver.Executor + errorHandler jobexecutor.ErrorHandler + fetchLimiter *chanutil.DebouncedChan + metricEmitHooks []rivertype.HookMetricEmit // memoized hooks of type HookMetricEmit for reuse in dispatchWork + state riverpilot.ProducerState + pilot riverpilot.Pilot + workers *Workers // Receives job IDs to cancel. Written by notifier goroutine, only read from // main goroutine. @@ -233,7 +234,7 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi errorHandler = &errorHandlerAdapter{config.ErrorHandler} } - return baseservice.Init(archetype, &producer{ + producer := baseservice.Init(archetype, &producer{ activeJobs: make(map[int64]*jobexecutor.JobExecutor), cancelCh: make(chan int64, 1000), completer: config.Completer, @@ -247,6 +248,10 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi retryPolicy: config.RetryPolicy, workers: config.Workers, }) + + producer.metricEmitHooks = producer.metricEmitHooksFromLookup() + + return producer } // Start starts the producer. It backgrounds a goroutine which is stopped when @@ -743,6 +748,25 @@ func (p *producer) maybeCancelJob(ctx context.Context, id int64) { executor.Cancel(ctx) } +func (p *producer) metricEmitHooksFromLookup() []rivertype.HookMetricEmit { + hookLookup := p.config.HookLookupGlobal + if hookLookup == nil { + return nil + } + + hooks := hookLookup.ByHookKind(hooklookup.HookKindMetricEmit) + if len(hooks) < 1 { + return nil + } + + metricEmitHooks := make([]rivertype.HookMetricEmit, len(hooks)) + for i, hook := range hooks { + metricEmitHooks[i] = hook.(rivertype.HookMetricEmit) //nolint:forcetypeassert + } + + return metricEmitHooks +} + func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) { // This intentionally removes any deadlines or cancellation from the parent // context because we don't want it to get cancelled if the producer is asked @@ -757,6 +781,11 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC // rarely hit, but exists to protect against degenerate cases. const maxAttemptedBy = 100 + var startedAt time.Time + if len(p.metricEmitHooks) > 0 { + startedAt = time.Now() + } + jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{ ClientID: p.config.ClientID, MaxAttemptedBy: maxAttemptedBy, @@ -771,9 +800,30 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC return } + if len(p.metricEmitHooks) > 0 { + p.emitMetric(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableDurationMetric{ + Duration: time.Since(startedAt), + Queue: p.config.Queue, + }, + }) + p.emitMetric(ctx, &rivertype.HookMetricEmitParams{ + Metric: &rivertype.JobGetAvailableCountMetric{ + Count: len(jobs), + Queue: p.config.Queue, + }, + }) + } + fetchResultCh <- producerFetchResult{jobs: jobs} } +func (p *producer) emitMetric(ctx context.Context, params *rivertype.HookMetricEmitParams) { + for _, hook := range p.metricEmitHooks { + hook.MetricEmit(ctx, params) + } +} + // Periodically logs an informational log line giving some insight into the // current state of the producer. func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) { diff --git a/producer_test.go b/producer_test.go index c03bd766..ace3a7c9 100644 --- a/producer_test.go +++ b/producer_test.go @@ -34,6 +34,17 @@ import ( const testClientID = "test-client-id" +type countingHookLookup struct { + hooklookup.HookLookupInterface + + count int +} + +func (l *countingHookLookup) ByHookKind(kind hooklookup.HookKind) []rivertype.Hook { + l.count++ + return l.HookLookupInterface.ByHookKind(kind) +} + func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { // We have encountered previous data races with the list of active jobs on // Producer because we need to know the count of active jobs in order to @@ -161,6 +172,125 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { } } +func TestProducer_MetricEmitHook(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + archetype *baseservice.Archetype + config *Config + exec riverdriver.Executor + hookLookup *countingHookLookup + metrics chan *rivertype.HookMetricEmitParams + producer *producer + queue string + schema string + } + + setup := func(t *testing.T) *testBundle { + t.Helper() + + var ( + archetype = riversharedtest.BaseServiceArchetype(t) + driver = riverpgxv5.New(riversharedtest.DBPool(ctx, t)) + exec = driver.GetExecutor() + jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10) + metrics = make(chan *rivertype.HookMetricEmitParams, 10) + pilot = &riverpilot.StandardPilot{} + queueName = "test_producer_metric_hook" + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + + t.Cleanup(riverinternaltest.DiscardContinuously(jobUpdates)) + + completer := jobcompleter.NewInlineCompleter(archetype, schema, exec, pilot, jobUpdates) + t.Cleanup(completer.Stop) + + metricHook := HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) { + paramsCopy := *params + metrics <- ¶msCopy + }) + hookLookup := &countingHookLookup{ + HookLookupInterface: hooklookup.NewHookLookup([]rivertype.Hook{metricHook}), + } + + producer := newProducer(archetype, exec, pilot, &producerConfig{ + ClientID: testClientID, + Completer: completer, + ErrorHandler: newTestErrorHandler(), + FetchCooldown: FetchCooldownDefault, + FetchPollInterval: 50 * time.Millisecond, + HookLookupByJob: hooklookup.NewJobHookLookup(), + HookLookupGlobal: hookLookup, + JobTimeout: JobTimeoutDefault, + MaxWorkers: 1_000, + MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil), + Queue: queueName, + QueuePollInterval: queuePollIntervalDefault, + QueueReportInterval: queueReportIntervalDefault, + RetryPolicy: &DefaultClientRetryPolicy{}, + SchedulerInterval: riverinternaltest.SchedulerShortInterval, + Schema: schema, + StaleProducerRetentionPeriod: time.Minute, + Workers: NewWorkers(), + }) + + return &testBundle{ + archetype: archetype, + config: newTestConfig(t, schema), + exec: exec, + hookLookup: hookLookup, + metrics: metrics, + producer: producer, + queue: queueName, + schema: schema, + } + } + + bundle := setup(t) + + scheduledAt := time.Now().UTC().Add(-time.Second) + insertParams := make([]*riverdriver.JobInsertFastParams, 2) + for i := range insertParams { + params, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, noOpArgs{}, &InsertOpts{ + Queue: bundle.queue, + }) + require.NoError(t, err) + params.ScheduledAt = &scheduledAt + insertParams[i] = (*riverdriver.JobInsertFastParams)(params) + } + + _, err := bundle.exec.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{ + Jobs: insertParams, + Schema: bundle.schema, + }) + require.NoError(t, err) + + fetchResultCh := make(chan producerFetchResult, 1) + bundle.producer.dispatchWork(ctx, 2, fetchResultCh) + + fetchResult := riversharedtest.WaitOrTimeout(t, fetchResultCh) + require.NoError(t, fetchResult.err) + require.Len(t, fetchResult.jobs, 2) + require.Equal(t, 1, bundle.hookLookup.count) + + metricsByName := make(map[rivertype.MetricName]rivertype.Metric) + for _, metric := range riversharedtest.WaitOrTimeoutN(t, bundle.metrics, 2) { + metricsByName[metric.Metric.Name()] = metric.Metric + } + + durationMetric, durationMetricFound := metricsByName[rivertype.MetricNameJobGetAvailableDuration].(*rivertype.JobGetAvailableDurationMetric) + require.True(t, durationMetricFound) + require.Equal(t, bundle.queue, durationMetric.Queue) + require.GreaterOrEqual(t, durationMetric.Duration, time.Duration(0)) + + countMetric, countMetricFound := metricsByName[rivertype.MetricNameJobGetAvailableCount].(*rivertype.JobGetAvailableCountMetric) + require.True(t, countMetricFound) + require.Equal(t, bundle.queue, countMetric.Queue) + require.Equal(t, 2, countMetric.Count) +} + func TestProducer_PollOnly(t *testing.T) { t.Parallel() diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 13d07f63..1df21c2f 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -235,6 +235,30 @@ func JobStates() []JobState { } } +// MetricName identifies a metric emitted through HookMetricEmit. +type MetricName string + +const ( + // MetricNameJobGetAvailableDuration is the duration of a successful + // JobGetAvailable call. + MetricNameJobGetAvailableDuration MetricName = "job_get_available_duration" + + // MetricNameJobGetAvailableCount is the number of jobs locked by a + // successful JobGetAvailable call. + MetricNameJobGetAvailableCount MetricName = "job_get_available_count" +) + +// Metric is a strongly typed metric payload emitted through HookMetricEmit. +// +// River provides all Metric implementations. New metric types may be added in +// future versions without changing HookMetricEmit's method signature. +type Metric interface { + // Name identifies the emitted metric. + Name() MetricName + + isMetric() +} + // AttemptError is an error from a single job attempt that failed due to an // error or a panic. type AttemptError struct { @@ -303,6 +327,8 @@ type JobInsertParams struct { // // List of hook interfaces that may be implemented: // - HookInsertBegin +// - HookMetricEmit +// - HookPeriodicJobsStart // - HookWorkBegin // - HookWorkEnd // @@ -323,6 +349,56 @@ type HookInsertBegin interface { InsertBegin(ctx context.Context, params *JobInsertParams) error } +// HookMetricEmit is an interface to a hook that receives metrics emitted by +// River. +type HookMetricEmit interface { + Hook + + // MetricEmit is invoked each time River emits a metric. It should not block + // on network I/O, and should usually pass metrics through to an asynchronous + // instrumentation package like OpenTelemetry. + MetricEmit(ctx context.Context, params *HookMetricEmitParams) +} + +// HookMetricEmitParams are parameters for HookMetricEmit. +type HookMetricEmitParams struct { + // Metric is the emitted metric payload. Use a type switch to access + // metric-specific fields. + Metric Metric +} + +// JobGetAvailableDurationMetric is emitted after a successful JobGetAvailable +// call with the call's duration. +type JobGetAvailableDurationMetric struct { + // Duration is how long the JobGetAvailable call took. + Duration time.Duration + + // Queue is the queue that jobs were locked from. + Queue string +} + +func (m *JobGetAvailableDurationMetric) Name() MetricName { + return MetricNameJobGetAvailableDuration +} + +func (m *JobGetAvailableDurationMetric) isMetric() {} + +// JobGetAvailableCountMetric is emitted after a successful JobGetAvailable +// call with the number of jobs locked. +type JobGetAvailableCountMetric struct { + // Count is the number of jobs locked. + Count int + + // Queue is the queue that jobs were locked from. + Queue string +} + +func (m *JobGetAvailableCountMetric) Name() MetricName { + return MetricNameJobGetAvailableCount +} + +func (m *JobGetAvailableCountMetric) isMetric() {} + // HookPeriodicJobsStart is an interface to a hook that runs when the periodic // job enqueuer starts on a newly elected leader. type HookPeriodicJobsStart interface {