-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhooks.go
More file actions
238 lines (210 loc) · 8.43 KB
/
hooks.go
File metadata and controls
238 lines (210 loc) · 8.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package dispatch
import (
"context"
"time"
)
// OnParseFunc is called after a source successfully parses a message.
// Use this to enrich the context with logging fields or trace spans.
// The returned context is used for the rest of the request.
type OnParseFunc func(ctx context.Context, source, key string) context.Context
// OnDispatchFunc is called just before the handler executes.
type OnDispatchFunc func(ctx context.Context, source, key string)
// OnSuccessFunc is called after the handler completes successfully.
type OnSuccessFunc func(ctx context.Context, source, key string, duration time.Duration)
// OnFailureFunc is called after the handler fails.
type OnFailureFunc func(ctx context.Context, source, key string, err error, duration time.Duration)
// OnNoSourceFunc is called when no source can parse the message.
// Return nil to skip the message, return an error to fail.
type OnNoSourceFunc func(ctx context.Context, raw []byte) error
// OnParseErrorFunc is called when a source's Parse method returns an error.
// Return nil to skip the message, return an error to fail.
type OnParseErrorFunc func(ctx context.Context, source string, err error) error
// OnNoHandlerFunc is called when no handler is registered for the routing key.
// Return nil to skip, return an error to fail.
type OnNoHandlerFunc func(ctx context.Context, source, key string) error
// OnUnmarshalErrorFunc is called when JSON unmarshaling fails.
// Return nil to skip, return an error to fail.
type OnUnmarshalErrorFunc func(ctx context.Context, source, key string, err error) error
// OnValidationErrorFunc is called when payload validation fails.
// Return nil to skip, return an error to fail.
type OnValidationErrorFunc func(ctx context.Context, source, key string, err error) error
// hooks holds all configured hook functions.
type hooks struct {
onParse []OnParseFunc
onDispatch []OnDispatchFunc
onSuccess []OnSuccessFunc
onFailure []OnFailureFunc
onNoSource []OnNoSourceFunc
onParseError []OnParseErrorFunc
onNoHandler []OnNoHandlerFunc
onUnmarshalError []OnUnmarshalErrorFunc
onValidationError []OnValidationErrorFunc
}
// Option configures Router behavior.
type Option func(*Router)
// WithOnParse adds a hook called after a source successfully parses a message.
// Multiple hooks are called in order, with context chaining through each.
//
// Example:
//
// dispatch.WithOnParse(func(ctx context.Context, source, key string) context.Context {
// return logx.WithCtx(ctx, slog.String("source", source))
// })
func WithOnParse(fn OnParseFunc) Option {
return func(r *Router) {
r.hooks.onParse = append(r.hooks.onParse, fn)
}
}
// WithOnDispatch adds a hook called just before the handler executes.
// Multiple hooks are called in order.
//
// Example:
//
// dispatch.WithOnDispatch(func(ctx context.Context, source, key string) {
// logger.Info(ctx, "dispatching event", "key", key)
// })
func WithOnDispatch(fn OnDispatchFunc) Option {
return func(r *Router) {
r.hooks.onDispatch = append(r.hooks.onDispatch, fn)
}
}
// WithOnSuccess adds a hook called after the handler completes successfully.
// Multiple hooks are called in order.
//
// Example:
//
// dispatch.WithOnSuccess(func(ctx context.Context, source, key string, d time.Duration) {
// metrics.Timing("dispatch.success", d, "source:"+source)
// })
func WithOnSuccess(fn OnSuccessFunc) Option {
return func(r *Router) {
r.hooks.onSuccess = append(r.hooks.onSuccess, fn)
}
}
// WithOnFailure adds a hook called after the handler fails.
// Multiple hooks are called in order.
//
// Example:
//
// dispatch.WithOnFailure(func(ctx context.Context, source, key string, err error, d time.Duration) {
// metrics.Incr("dispatch.failure", "source:"+source)
// logger.Error(ctx, "handler failed", "error", err)
// })
func WithOnFailure(fn OnFailureFunc) Option {
return func(r *Router) {
r.hooks.onFailure = append(r.hooks.onFailure, fn)
}
}
// WithOnNoSource adds a hook called when no source can parse the message.
// Return nil to skip, return an error to fail.
// Multiple hooks are called in order; first error wins.
//
// Example:
//
// dispatch.WithOnNoSource(func(ctx context.Context, raw []byte) error {
// logger.Warn(ctx, "unknown message format")
// return nil // skip to DLQ
// })
func WithOnNoSource(fn OnNoSourceFunc) Option {
return func(r *Router) {
r.hooks.onNoSource = append(r.hooks.onNoSource, fn)
}
}
// WithOnParseError adds a hook called when a source's Parse method returns an error.
// Return nil to skip, return an error to fail.
// Multiple hooks are called in order; first error wins.
//
// Example:
//
// dispatch.WithOnParseError(func(ctx context.Context, source string, err error) error {
// logger.Error(ctx, "parse failed", "source", source, "error", err)
// return nil // skip bad messages
// })
func WithOnParseError(fn OnParseErrorFunc) Option {
return func(r *Router) {
r.hooks.onParseError = append(r.hooks.onParseError, fn)
}
}
// WithOnNoHandler adds a hook called when no handler is registered for the key.
// Return nil to skip, return an error to fail.
// Multiple hooks are called in order; first error wins.
//
// Example:
//
// dispatch.WithOnNoHandler(func(ctx context.Context, source, key string) error {
// logger.Warn(ctx, "no handler", "key", key)
// return nil // skip
// })
func WithOnNoHandler(fn OnNoHandlerFunc) Option {
return func(r *Router) {
r.hooks.onNoHandler = append(r.hooks.onNoHandler, fn)
}
}
// WithOnUnmarshalError adds a hook called when JSON unmarshaling fails.
// Return nil to skip, return an error to fail.
// Multiple hooks are called in order; first error wins.
//
// Example:
//
// dispatch.WithOnUnmarshalError(func(ctx context.Context, source, key string, err error) error {
// logger.Error(ctx, "bad payload", "error", err)
// return nil // skip bad payloads
// })
func WithOnUnmarshalError(fn OnUnmarshalErrorFunc) Option {
return func(r *Router) {
r.hooks.onUnmarshalError = append(r.hooks.onUnmarshalError, fn)
}
}
// WithOnValidationError adds a hook called when payload validation fails.
// Return nil to skip, return an error to fail.
// Multiple hooks are called in order; first error wins.
//
// Example:
//
// dispatch.WithOnValidationError(func(ctx context.Context, source, key string, err error) error {
// logger.Error(ctx, "validation failed", "error", err)
// return nil // skip invalid payloads
// })
func WithOnValidationError(fn OnValidationErrorFunc) Option {
return func(r *Router) {
r.hooks.onValidationError = append(r.hooks.onValidationError, fn)
}
}
// OnParseHook is an optional interface that sources can implement to add
// source-specific context enrichment. Called after global OnParse hooks.
type OnParseHook interface {
OnParse(ctx context.Context, key string) context.Context
}
// OnDispatchHook is an optional interface that sources can implement to add
// source-specific pre-dispatch behavior. Called after global OnDispatch hooks.
type OnDispatchHook interface {
OnDispatch(ctx context.Context, key string)
}
// OnSuccessHook is an optional interface that sources can implement to add
// source-specific behavior on handler success. Called after global OnSuccess hooks.
type OnSuccessHook interface {
OnSuccess(ctx context.Context, key string, duration time.Duration)
}
// OnFailureHook is an optional interface that sources can implement to add
// source-specific behavior on handler failure. Called after global OnFailure hooks.
type OnFailureHook interface {
OnFailure(ctx context.Context, key string, err error, duration time.Duration)
}
// OnNoHandlerHook is an optional interface that sources can implement to add
// source-specific behavior when no handler is found. Called after global hooks;
// if either returns an error, that error is used.
type OnNoHandlerHook interface {
OnNoHandler(ctx context.Context, key string) error
}
// OnUnmarshalErrorHook is an optional interface that sources can implement to
// add source-specific behavior on unmarshal errors. Called after global hooks;
// if either returns an error, that error is used.
type OnUnmarshalErrorHook interface {
OnUnmarshalError(ctx context.Context, key string, err error) error
}
// OnValidationErrorHook is an optional interface that sources can implement to
// add source-specific behavior on validation errors. Called after global hooks;
// if either returns an error, that error is used.
type OnValidationErrorHook interface {
OnValidationError(ctx context.Context, key string, err error) error
}