-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-4076 KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time #3679
Conversation
@anand-h-codes Thanks for your PR. Since it is marked WIP is it already ready for review or are you planning to update it? |
@rzo1 the PR is functionally complete. But I will need to modify the test cases accordingly. I shall proceed if you think the change is good enough. |
@rzo1 the PR is ready for review. |
@avermeer Can you have a look? I think, that Kafka is in your area of expertise :) (I am not a kafka user) |
I don't have experience with Trident, but I do with Kafka. I'm having a look. |
...afka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
Outdated
Show resolved
Hide resolved
refactor emitNewBatch() to emitBatchNew()
@anand-h-codes I don't have experience with Trident, so my review those specific classes is therefore not very consequential. |
@reiabreu Yes. Even KafkaSpout uses this strategy of polling all partitions at once. Plus, With this, we can have better control over the trident batch size with consumer properties like |
|
@reiabreu I understand your concern. The main problem with the existing method is, even though if we set With the changes in the PR it is possible to guarantee that one batch will not contain more than Say if I have 1000 partitions assigned to one KafkaTridentSpout and my topology is optimised to process upto 500 records per batch and not more. |
The same logic is kept on the changes, with the difference you will be emitting records for all the partitions instead of just one.
Since the assignment of partitions to the consumer is kept, the number of returned records will be the same under the same conditions. Your change is more efficient in the sense that you perform fewer pollings for the same partitions of a task. Is my understanding of this correct? |
@reiabreu You are correct. More than efficiency, the important feature this PR gives is the control over the batch size.
Yes. The number of tuples associated with the currBatch will not exceed the |
@anand-h-codes thanks for taking the time to go over the changes with me. |
What is the purpose of the change
Currently 'KafkaTridentTransactionalSpoutEmitter' and 'KafkaTridentOpaqueEmitter' polls every partition assigned to the spout one by one while emitting new batches.
But this can be improved by leveraging Kafka Consumer's usual polling strategy. That is Kafka Broker will take care of choosing the right partition.
Advantages of this are
Note: This change will affect only when the batch is emitted for the first time.
How was the change tested
Tested by running a Kafka Trident topology locally.