Skip to content

Commit

Permalink
feat: support consumer consume tps option (apache#1101)
Browse files Browse the repository at this point in the history
* feat: support consumer consume tps option

* feat: support consumer consume tps option
  • Loading branch information
tuweizhong authored Oct 16, 2023
1 parent da20ee7 commit 3493a47
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
8 changes: 8 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,14 @@ func (dc *defaultConsumer) doBalance() {
return (mqAll[i].QueueId - mqAll[j].QueueId) < 0
})
allocateResult := dc.allocate(dc.consumerGroup, dc.client.ClientID(), mqAll, cidAll)

// Principle of flow control: pull TPS = 1000ms/PullInterval * BatchSize * len(allocateResult)
if consumeTPS := dc.option.ConsumeTPS.Load(); consumeTPS > 0 && len(allocateResult) > 0 {
pullBatchSize := dc.option.PullBatchSize.Load()
pullTimesPerSecond := float64(consumeTPS) / float64(pullBatchSize*int32(len(allocateResult)))
dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond))
}

changed := dc.updateProcessQueueTable(topic, allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
Expand Down
28 changes: 20 additions & 8 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"go.uber.org/atomic"

"github.com/apache/rocketmq-client-go/v2/hooks"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand Down Expand Up @@ -55,30 +57,33 @@ type consumerOptions struct {

// Flow control threshold on topic level, default value is -1(Unlimited)
//
// The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
// {@code pullThresholdForTopic} if it is't unlimited
// The value of {@code pullThresholdForQueue} will be overwritten and calculated based on
// {@code pullThresholdForTopic} if it isn't unlimited
//
// For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
// then pullThresholdForQueue will be set to 100
PullThresholdForTopic int

// Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
//
// The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
// {@code pullThresholdSizeForTopic} if it is't unlimited
// The value of {@code pullThresholdSizeForQueue} will be overwritten and calculated based on
// {@code pullThresholdSizeForTopic} if it isn't unlimited
//
// For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
// assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
PullThresholdSizeForTopic int

// Message pull Interval
PullInterval time.Duration
PullInterval atomic.Duration

// Message consumer tps
ConsumeTPS atomic.Int32

// Batch consumption size
ConsumeMessageBatchMaxSize int

// Batch pull size
PullBatchSize int32
PullBatchSize atomic.Int32

// Whether update subscription relationship when every pull
PostSubscriptionWhenPull bool
Expand Down Expand Up @@ -283,7 +288,7 @@ func WithStrategy(strategy AllocateStrategy) Option {

func WithPullBatchSize(batchSize int32) Option {
return func(options *consumerOptions) {
options.PullBatchSize = batchSize
options.PullBatchSize.Store(batchSize)
}
}

Expand All @@ -307,7 +312,14 @@ func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option {

func WithPullInterval(interval time.Duration) Option {
return func(options *consumerOptions) {
options.PullInterval = interval
options.PullInterval.Store(interval)
}
}

// WithConsumeTPS set single-machine consumption tps
func WithConsumeTPS(tps int32) Option {
return func(options *consumerOptions) {
options.ConsumeTPS.Store(tps)
}
}

Expand Down
4 changes: 2 additions & 2 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
Expand Down Expand Up @@ -736,7 +736,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
SubExpression: sd.SubString,
Expand Down
12 changes: 6 additions & 6 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (pc *pushConsumer) validate() error {
}
}

if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535*time.Millisecond {
if interval := pc.option.PullInterval.Load(); interval < 0 || interval > 65535*time.Millisecond {
return errors.New("option.PullInterval out of range [0, 65535]")
}

Expand All @@ -608,9 +608,9 @@ func (pc *pushConsumer) validate() error {
}
}

if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 || pullBatchSize > 1024 {
if pullBatchSize == 0 {
pc.option.PullBatchSize.Store(32)
} else {
return errors.New("option.PullBatchSize out of range [1, 1024]")
}
Expand Down Expand Up @@ -674,7 +674,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
time.Sleep(sleepTime)
}
// reset time
sleepTime = pc.option.PullInterval
sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
Expand Down Expand Up @@ -813,7 +813,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: subExpression,
Expand Down

0 comments on commit 3493a47

Please sign in to comment.