-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathoption.go
347 lines (289 loc) · 9.89 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package broker
import "time"
// Option options functional option
type Option func(o *Options)
// Options broker option
type Options struct {
Addrs []string // client connection address list
Prefix string // client mq prefix
User string // user
Password string // password
// ========pulsar mq===============
// ListenerName Configure the net model for vpc user to connect the pulsar broker
ListenerName string
// AuthToken auth token
AuthToken string
// OperationTimeout operation timeout
OperationTimeout time.Duration
// ConnectionTimeout timeout for the establishment of a TCP connection (default: 10 seconds)
ConnectionTimeout time.Duration
// MaxConnectionsPerBroker the max number of connections to a single broker
// that will keep in the pool. (Default: 1 connection)
// this param for pulsar connection per broker
MaxConnectionsPerBroker int
// =======redis mq================
RedisConf *RedisConf
// graceful exit time
GracefulWait time.Duration
// no data wait second
NoDataWaitSec int
// ConsumerAutoCommitInterval consumer auto commit interval (default: 1s)
ConsumerAutoCommitInterval time.Duration
// Logger logger
Logger Logger
}
// WithBrokerAddress set broker address
func WithBrokerAddress(addrs ...string) Option {
return func(o *Options) {
o.Addrs = addrs
}
}
// WithBrokerPrefix set broker prefix
func WithBrokerPrefix(prefix string) Option {
return func(o *Options) {
o.Prefix = prefix
}
}
// WithUser set broker user
func WithUser(user string) Option {
return func(o *Options) {
o.User = user
}
}
// WithPassword set broker password
func WithPassword(pwd string) Option {
return func(o *Options) {
o.Password = pwd
}
}
// WithListenerName set broker listener name
func WithListenerName(name string) Option {
return func(o *Options) {
o.ListenerName = name
}
}
// WithAuthToken set broker token eg:pulsar broker
func WithAuthToken(token string) Option {
return func(o *Options) {
o.AuthToken = token
}
}
// WithOperationTimeout set broker op timeout
func WithOperationTimeout(t time.Duration) Option {
return func(o *Options) {
o.OperationTimeout = t
}
}
// WithConnectionTimeout set broker connection timeout
func WithConnectionTimeout(t time.Duration) Option {
return func(o *Options) {
o.ConnectionTimeout = t
}
}
// WithMaxConnectionsPerBroker set max connection
func WithMaxConnectionsPerBroker(num int) Option {
return func(o *Options) {
o.MaxConnectionsPerBroker = num
}
}
// WithGracefulWait set sub graceful exit time
func WithGracefulWait(t time.Duration) Option {
return func(s *Options) {
s.GracefulWait = t
}
}
// WithNoDataWaitSec no data wait second
func WithNoDataWaitSec(sec int) Option {
return func(o *Options) {
o.NoDataWaitSec = sec
}
}
// WithLogger set broker logger
func WithLogger(logger Logger) Option {
return func(o *Options) {
o.Logger = logger
}
}
// WithConsumerAutoCommitInterval set consumer auto commit interval.
func WithConsumerAutoCommitInterval(interval time.Duration) Option {
return func(o *Options) {
o.ConsumerAutoCommitInterval = interval
}
}
// PubOption publish option
type PubOption func(p *PublishOptions)
// PublishOptions publish message option
type PublishOptions struct {
// PublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
// if message is enabled. If set to a non zero value, messages will be queued until this time
// interval or until
PublishDelay time.Duration
// Name specifies a name for the producer.
// if you use pulsar mq,if not assigned, the system will generate
// a globally unique name which can be access with
// Producer.ProducerName().
//
// for kafka publish message key
// The partitioning key for this message. Pre-existing Encoders include
// StringEncoder and ByteEncoder.
Name string
// DisableBatching controls whether automatic batching of messages is enabled for the producer.
// By default batching is enabled.
// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
// messages will be compressed at the batch level, leading to a much better compression ratio
// for similar headers or contents.
// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
// Setting `DisableBatching: true` will make the producer to send messages individually
DisableBatching bool
// SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent.
// Send and SendAsync returns an error after timeout.
// Default is 30 seconds, negative such as -1 to disable.
SendTimeout time.Duration
}
// WithPublishDelay set publish delay time
func WithPublishDelay(t time.Duration) PubOption {
return func(p *PublishOptions) {
p.PublishDelay = t
}
}
// WithPublishName set publish script name
func WithPublishName(name string) PubOption {
return func(p *PublishOptions) {
p.Name = name
}
}
// WithDisableBatching disable batch publish
func WithDisableBatching() PubOption {
return func(p *PublishOptions) {
p.DisableBatching = true
}
}
// WithSendTimeout set publish send msg timeout
func WithSendTimeout(t time.Duration) PubOption {
return func(p *PublishOptions) {
p.SendTimeout = t
}
}
// SubOption subscribe option
type SubOption func(s *SubscribeOptions)
// SubscribeOptions subscribe message option
type SubscribeOptions struct {
// specifies the consumer name
Name string
// KeyHandlers for kafka consumer message key handler map
// for redis sub,you can specify different message subscriber functions to handle msg.
KeyHandlers map[string]SubHandler
// Receive messages from channel. The channel returns a struct which contains message and the consumer from where
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
// shared across multiple consumers as well
MessageChannel bool // default:false
MessageChannelSize int // default:100
// subscribe concurrency count,default:1
// Note: this param for redis or pulsar consumer message
ConcurrencySize int
Offset int64
// Commit the offset to the backend for kafka
// Note: calling Commit performs a blocking synchronous operation.
CommitOffsetBlock bool
// SubInterval subscribe interval,default:0
SubInterval time.Duration
// ===========pulsar mq=======
// subType specifies the subscription type to be used when subscribing to a topic.
// Default is `Shared` 1:N
// Exclusive there can be only 1 consumer on the same topic with the same subscription name
//
// Shared 1:N
// Shared subscription mode, multiple consumer will be able to use the same subscription name
// and the messages will be dispatched according to
//
// Failover subscription mode, multiple consumer will be able to use the same subscription name
// but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
SubType SubscriptionType
// ReceiverQueueSize sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int
// retryEnable for pulsar sub RetryEnable
RetryEnable bool
}
// WithSubName set sub name
func WithSubName(name string) SubOption {
return func(s *SubscribeOptions) {
s.Name = name
}
}
// WithSubKeyHandlers set sub key => subHandler map
func WithSubKeyHandlers(keyHandlers map[string]SubHandler) SubOption {
return func(s *SubscribeOptions) {
s.KeyHandlers = keyHandlers
}
}
// WithMessageChannel set sub message channel
func WithMessageChannel() SubOption {
return func(s *SubscribeOptions) {
s.MessageChannel = true
}
}
// WithMessageChannelSize set sub message channel size
func WithMessageChannelSize(size int) SubOption {
return func(s *SubscribeOptions) {
s.MessageChannelSize = size
}
}
// WithSubConcurrencySize set subscribe size
func WithSubConcurrencySize(size int) SubOption {
return func(s *SubscribeOptions) {
s.ConcurrencySize = size
}
}
// WithSubOffset set sub offset
func WithSubOffset(offset int64) SubOption {
return func(s *SubscribeOptions) {
s.Offset = offset
}
}
// WithSubInterval set sub interval
func WithSubInterval(t time.Duration) SubOption {
return func(s *SubscribeOptions) {
s.SubInterval = t
}
}
// WithSubType set subType
func WithSubType(t SubscriptionType) SubOption {
return func(s *SubscribeOptions) {
s.SubType = t
}
}
// WithSubRetryEnable set sub retry
func WithSubRetryEnable() SubOption {
return func(s *SubscribeOptions) {
s.RetryEnable = true
}
}
// WithCommitOffsetBlock commit offset block when message consumer.
func WithCommitOffsetBlock() SubOption {
return func(s *SubscribeOptions) {
s.CommitOffsetBlock = true
}
}
// SubscriptionType of subscription supported by Pulsar
type SubscriptionType int
const (
// Exclusive there can be only 1 consumer on the same topic with the same subscription name
Exclusive SubscriptionType = iota
// Shared subscription mode, multiple consumer will be able to use the same subscription name
// and the messages will be dispatched according to
// a round-robin rotation between the connected consumers
Shared
// Failover subscription mode, multiple consumer will be able to use the same subscription name
// but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
// KeyShared subscription mode, multiple consumer will be able to use the same
// subscription and all messages with the same key will be dispatched to only one consumer
KeyShared
)