Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-4076 KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time #3679

Merged
merged 5 commits into from
Sep 7, 2024

Conversation

anand-h-codes
Copy link
Contributor

What is the purpose of the change

Currently 'KafkaTridentTransactionalSpoutEmitter' and 'KafkaTridentOpaqueEmitter' polls every partition assigned to the spout one by one while emitting new batches. 

But this can be improved by leveraging Kafka Consumer's usual polling strategy. That is Kafka Broker will take care of choosing the right partition.

Advantages of this are

  1. If a spout is assigned multiple Topic Partitions, the consumer doesn't have to waste time on polling partitions with no data.
  2. This change will give better control over the Trident batch size, by adjusting Kafka Consumer properties.

Note: This change will affect only when the batch is emitted for the first time.

How was the change tested

Tested by running a Kafka Trident topology locally.

@anand-h-codes anand-h-codes changed the title STORM-4076 KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time STORM-4076 [WIP] KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time Aug 17, 2024
@rzo1
Copy link
Contributor

rzo1 commented Aug 23, 2024

@anand-h-codes Thanks for your PR. Since it is marked WIP is it already ready for review or are you planning to update it?

@anand-h-codes
Copy link
Contributor Author

anand-h-codes commented Aug 23, 2024

@rzo1 the PR is functionally complete. But I will need to modify the test cases accordingly. I shall proceed if you think the change is good enough.

@anand-h-codes
Copy link
Contributor Author

@rzo1 the PR is ready for review.

@anand-h-codes anand-h-codes changed the title STORM-4076 [WIP] KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time STORM-4076 KafkaTridentSpoutEmitters can poll all partitions at once instead of one at a time Aug 25, 2024
@rzo1
Copy link
Contributor

rzo1 commented Aug 26, 2024

@avermeer Can you have a look? I think, that Kafka is in your area of expertise :) (I am not a kafka user)

@rzo1 rzo1 requested a review from avermeer August 26, 2024 11:39
@reiabreu
Copy link
Contributor

reiabreu commented Sep 5, 2024

I don't have experience with Trident, but I do with Kafka. I'm having a look.

refactor emitNewBatch() to emitBatchNew()
@reiabreu
Copy link
Contributor

reiabreu commented Sep 6, 2024

@anand-h-codes I don't have experience with Trident, so my review those specific classes is therefore not very consequential.
The Kafka related changes seem fine, but I do have a question about the strategy being applied. It seems the biggest gain is that we only poll the partitions once inside the spout emitter, instead of once per partition

@anand-h-codes
Copy link
Contributor Author

@reiabreu Yes. Even KafkaSpout uses this strategy of polling all partitions at once. Plus, With this, we can have better control over the trident batch size with consumer properties like max.poll.records and max.partition.fetch.bytes. Currently there is no direct way to control the batch size.

@reiabreu
Copy link
Contributor

reiabreu commented Sep 6, 2024

@reiabreu Yes. Even KafkaSpout uses this strategy of polling all partitions at once. Plus, With this, we can have better control over the trident batch size with consumer properties like max.poll.records and max.partition.fetch.bytes. Currently there is no direct way to control the batch size.

KafkaTridentSpoutEmitter is presently using Consumer<K, V> consumer; to poll the partitions, so that control is already there, correct?
Sorry for all the questions, just trying to better understand the changes.

@anand-h-codes
Copy link
Contributor Author

anand-h-codes commented Sep 6, 2024

@reiabreu I understand your concern. The main problem with the existing method is, even though if we set max.poll.records, we can't guarantee that one trident batch will have not more than max.poll.records number of tuples. It will only guarantee that it is true per partition. So currently one trident batch can have a maximum of #of partitions assigned * max.poll.records number of tuple.

With the changes in the PR it is possible to guarantee that one batch will not contain more than max.poll.records.

Say if I have 1000 partitions assigned to one KafkaTridentSpout and my topology is optimised to process upto 500 records per batch and not more.

@reiabreu
Copy link
Contributor

reiabreu commented Sep 6, 2024

emitter.refreshPartitions(taskPartitions); is assigning the task's partitions to the emitter object.
Then the emitter will poll all the assigned partitions and just emit forward the ones related with currBatchTp

  final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
            LOG.debug("Polled [{}] records from Kafka.", records.size());

            if (!records.isEmpty()) {
                for (ConsumerRecord<K, V> record : records) {
                    emitTuple(collector, record);
                }

The same logic is kept on the changes, with the difference you will be emitting records for all the partitions instead of just one.

 ConsumerRecords<K, V> poll = consumer.poll(Duration.ofMillis(pollTimeoutMs));
        for (KafkaTridentSpoutTopicPartition partition : partitions) {
            final List<ConsumerRecord<K, V>> records  = poll.records(partition.getTopicPartition());
            if (!records.isEmpty()) {
                for (ConsumerRecord<K, V> record : records) {
                    emitTuple(collector, record);
                }

Since the assignment of partitions to the consumer is kept, the number of returned records will be the same under the same conditions.

Your change is more efficient in the sense that you perform fewer pollings for the same partitions of a task.

Is my understanding of this correct?

@anand-h-codes
Copy link
Contributor Author

@reiabreu You are correct. More than efficiency, the important feature this PR gives is the control over the batch size.

Since the assignment of partitions to the consumer is kept, the number of returned records will be the same under the same conditions.

Yes. The number of tuples associated with the currBatch will not exceed the max.poll.records now. Previously it could be more than that.

@reiabreu reiabreu merged commit 9227ad9 into apache:master Sep 7, 2024
18 checks passed
@reiabreu
Copy link
Contributor

reiabreu commented Sep 7, 2024

@anand-h-codes thanks for taking the time to go over the changes with me.
And thank you for your PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants