-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #4556] Implement retry strategy based on pulsar message middleware #4769
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4769 +/- ##
============================================
- Coverage 17.59% 16.37% -1.23%
+ Complexity 1774 1734 -40
============================================
Files 797 856 +59
Lines 29786 31265 +1479
Branches 2573 2700 +127
============================================
- Hits 5242 5120 -122
- Misses 24063 25665 +1602
+ Partials 481 480 -1 ☔ View full report in Codecov by Sentry. |
This PR only implements a part of issue #4556, so please create a new issue and "Fixes it". |
CloudEvent event = configuration.getEvent(); | ||
String topic = configuration.getTopic(); | ||
String consumerGroupName = configuration.getConsumerGroupName(); | ||
String retryTopicName = consumerGroupName + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with Pulsar, but I have two questions about your code:
1 Pulsar's message retry feature is turned off by default and needs to be turned on manually, how did you turn it on?
2 I don't understand how you realize that Pulsar automatically retries messages in the "xxx-RETRY" queue. Could you explain?
@HarshSawarkar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @pandaapo, @xwm1992 the retry feature is not turned on. For turning it on, can the retryAsynchronously(Supplier<CompletableFuture> supplier, Backoff backoff, ScheduledExecutorService scheduledExecutorService, CompletableFuture callback) method of RetryUtil class of pulsar-client-2.11.1.jar library be considered as it provides with the method scheduledExecutorService.execute() to schedule and execute a retry.
As in eventmesh-storage-RocketMQ the class ConsumeMessageConcurrentlyService has the implemenetation for retrying failed message of RocketMQ. Can you please guide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HarshSawarkar
RocketMQ can automatically retry messages in the "% RETRY%..."queue, so RocketMQRetryStrategyImpl
utilizes RocketMQ's retry queue feature and sends failed messages to a queue with the prefix "% RETRY%".
When it comes to Pulsar, we also need Pulsar to automatically retry failed messages. Currently your code does not implement this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you provide some implementation ideas on how to implement the feature for automatically retrying failed messages of Pulsar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HarshSawarkar I wish I could give more useful help, but as I said earlier, I'm not familiar with Pulsar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HarshSawarkar This PR has hit a standstill. After reading the conversation, it appears that Pulsar doesn't have a built-in retry queue, so you'll need to resend the messages through EventMesh. You can achieve this using an in-memory queue and a timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you'll need to resend the messages through EventMesh. You can achieve this using an in-memory queue and a timer.
This suggestion is incorrect. Because the task of this PR is to implement retry in Pulsar without relying on EventMesh's memory. Otherwise, the PR would be redundant since EventMesh retries within the memory by default.
Fixes #4556
Motivation
Earlier the retry strategy based on pulsar middleware were only supported
Modifications
Implemented retry strategy based on the Pulsar message middleware.
Documentation