Documentation ¶
Index ¶
- Variables
- func ParseMessage(msg interface{}) ([]byte, error)
- func Recovery(logger Logger)
- type Broker
- type Logger
- type LoggerFunc
- type Option
- func WithAuthToken(token string) Option
- func WithBrokerAddress(addrs ...string) Option
- func WithBrokerPrefix(prefix string) Option
- func WithConnectionTimeout(t time.Duration) Option
- func WithConsumerAutoCommitInterval(interval time.Duration) Option
- func WithGracefulWait(t time.Duration) Option
- func WithListenerName(name string) Option
- func WithLogger(logger Logger) Option
- func WithMaxConnectionsPerBroker(num int) Option
- func WithNoDataWaitSec(sec int) Option
- func WithOperationTimeout(t time.Duration) Option
- func WithPassword(pwd string) Option
- func WithRedisConf(conf *RedisConf) Option
- func WithUser(user string) Option
- type Options
- type PubOption
- type PublishOptions
- type RedisConf
- type SubHandler
- type SubOption
- func WithCommitOffsetBlock() SubOption
- func WithMessageChannel() SubOption
- func WithMessageChannelSize(size int) SubOption
- func WithSubConcurrencySize(size int) SubOption
- func WithSubInterval(t time.Duration) SubOption
- func WithSubKeyHandlers(keyHandlers map[string]SubHandler) SubOption
- func WithSubName(name string) SubOption
- func WithSubOffset(offset int64) SubOption
- func WithSubRetryEnable() SubOption
- func WithSubType(t SubscriptionType) SubOption
- type SubscribeOptions
- type SubscriptionType
Constants ¶
This section is empty.
Variables ¶
var DummyLogger = LoggerFunc(func(string, ...interface{}) {})
DummyLogger dummy logger writes nothing.
Functions ¶
Types ¶
type Broker ¶
type Broker interface { // Publish pub message to topic Publish(ctx context.Context, topic string, msg interface{}, opts ...PubOption) error // Subscribe sub message from topic + channel Subscribe(ctx context.Context, topic string, channel string, subHandler SubHandler, opts ...SubOption) error // Shutdown graceful shutdown broker Shutdown(ctx context.Context) error }
Broker broker interface
type LoggerFunc ¶
type LoggerFunc func(string, ...interface{})
LoggerFunc is a bridge between Logger and any third party logger.
func (LoggerFunc) Printf ¶
func (f LoggerFunc) Printf(msg string, args ...interface{})
Printf implements Logger interface.
type Option ¶
type Option func(o *Options)
Option options functional option
func WithAuthToken ¶
WithAuthToken set broker token eg:pulsar broker
func WithBrokerAddress ¶
WithBrokerAddress set broker address
func WithBrokerPrefix ¶
WithBrokerPrefix set broker prefix
func WithConnectionTimeout ¶
WithConnectionTimeout set broker connection timeout
func WithConsumerAutoCommitInterval ¶ added in v1.3.8
WithConsumerAutoCommitInterval set consumer auto commit interval.
func WithGracefulWait ¶
WithGracefulWait set sub graceful exit time
func WithListenerName ¶
WithListenerName set broker listener name
func WithMaxConnectionsPerBroker ¶
WithMaxConnectionsPerBroker set max connection
func WithNoDataWaitSec ¶
WithNoDataWaitSec no data wait second
func WithOperationTimeout ¶
WithOperationTimeout set broker op timeout
type Options ¶
type Options struct { Addrs []string // client connection address list Prefix string // client mq prefix User string // user Password string // password // ========pulsar mq=============== // ListenerName Configure the net model for vpc user to connect the pulsar broker ListenerName string // AuthToken auth token AuthToken string // OperationTimeout operation timeout OperationTimeout time.Duration // ConnectionTimeout timeout for the establishment of a TCP connection (default: 10 seconds) ConnectionTimeout time.Duration // MaxConnectionsPerBroker the max number of connections to a single broker // that will keep in the pool. (Default: 1 connection) // this param for pulsar connection per broker MaxConnectionsPerBroker int // =======redis mq================ RedisConf *RedisConf // graceful exit time GracefulWait time.Duration // no data wait second NoDataWaitSec int // ConsumerAutoCommitInterval consumer auto commit interval (default: 1s) ConsumerAutoCommitInterval time.Duration // Logger logger Logger Logger }
Options broker option
type PubOption ¶
type PubOption func(p *PublishOptions)
PubOption publish option
func WithDisableBatching ¶
func WithDisableBatching() PubOption
WithDisableBatching disable batch publish
func WithPublishDelay ¶
WithPublishDelay set publish delay time
func WithPublishName ¶
WithPublishName set publish script name
func WithSendTimeout ¶
WithSendTimeout set publish send msg timeout
type PublishOptions ¶
type PublishOptions struct { // PublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) // if message is enabled. If set to a non zero value, messages will be queued until this time // interval or until PublishDelay time.Duration // Name specifies a name for the producer. // if you use pulsar mq,if not assigned, the system will generate // a globally unique name which can be access with // Producer.ProducerName(). // // for kafka publish message key // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Name string // DisableBatching controls whether automatic batching of messages is enabled for the producer. // By default batching is enabled. // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the // broker, leading to better throughput, especially when publishing small messages. If compression is enabled, // messages will be compressed at the batch level, leading to a much better compression ratio // for similar headers or contents. // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages // Setting `DisableBatching: true` will make the producer to send messages individually DisableBatching bool // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. // Send and SendAsync returns an error after timeout. // Default is 30 seconds, negative such as -1 to disable. SendTimeout time.Duration }
PublishOptions publish message option
type RedisConf ¶
type RedisConf struct { // host:port address. Address string // Optional password. Must match the password specified in the // require pass server configuration option. Password string // Database to be selected after connecting to the server. DB int // Maximum number of retries before giving up. // Default is to not retry failed commands. MaxRetries int // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. If reached, commands will fail // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. If reached, commands will fail // with a timeout instead of blocking. // Default is ReadTimeout. WriteTimeout time.Duration // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // Amount of time client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. PoolTimeout time.Duration // Minimum number of idle connections which is useful when establishing // new connection is slow. MinIdleConns int // Amount of time after which client closes idle connections. // Should be less than server's timeout. // Default is 5 minutes. -1 disables idle timeout check. IdleTimeout time.Duration // Connection age at which client retires (closes) the connection. // go redis Default is to not close aged connections // but 1800s is recommended. MaxConnAge time.Duration }
RedisConf redis client config
type SubHandler ¶
SubHandler subscribe func
type SubOption ¶
type SubOption func(s *SubscribeOptions)
SubOption subscribe option
func WithCommitOffsetBlock ¶ added in v1.3.7
func WithCommitOffsetBlock() SubOption
WithCommitOffsetBlock commit offset block when message consumer.
func WithMessageChannel ¶
func WithMessageChannel() SubOption
WithMessageChannel set sub message channel
func WithMessageChannelSize ¶
WithMessageChannelSize set sub message channel size
func WithSubConcurrencySize ¶
WithSubConcurrencySize set subscribe size
func WithSubInterval ¶
WithSubInterval set sub interval
func WithSubKeyHandlers ¶ added in v1.3.6
func WithSubKeyHandlers(keyHandlers map[string]SubHandler) SubOption
WithSubKeyHandlers set sub key => subHandler map
type SubscribeOptions ¶
type SubscribeOptions struct { // specifies the consumer name Name string // KeyHandlers for kafka consumer message key handler map // for redis sub,you can specify different message subscriber functions to handle msg. KeyHandlers map[string]SubHandler // Receive messages from channel. The channel returns a struct which contains message and the consumer from where // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be // shared across multiple consumers as well MessageChannel bool // default:false MessageChannelSize int // default:100 // subscribe concurrency count,default:1 // Note: this param for redis or pulsar consumer message ConcurrencySize int Offset int64 // Commit the offset to the backend for kafka // Note: calling Commit performs a blocking synchronous operation. CommitOffsetBlock bool // SubInterval subscribe interval,default:0 SubInterval time.Duration // ===========pulsar mq======= // subType specifies the subscription type to be used when subscribing to a topic. // Default is `Shared` 1:N // Exclusive there can be only 1 consumer on the same topic with the same subscription name // // Shared 1:N // Shared subscription mode, multiple consumer will be able to use the same subscription name // and the messages will be dispatched according to // // Failover subscription mode, multiple consumer will be able to use the same subscription name // but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. SubType SubscriptionType // ReceiverQueueSize sets the size of the consumer receive queue. // The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the // application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer // throughput at the expense of bigger memory utilization. // Default value is `1000` messages and should be good for most use cases. ReceiverQueueSize int // retryEnable for pulsar sub RetryEnable RetryEnable bool }
SubscribeOptions subscribe message option
type SubscriptionType ¶
type SubscriptionType int
SubscriptionType of subscription supported by Pulsar
const ( // Exclusive there can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // and the messages will be dispatched according to // a round-robin rotation between the connected consumers Shared // Failover subscription mode, multiple consumer will be able to use the same subscription name // but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // subscription and all messages with the same key will be dispatched to only one consumer KeyShared )