From c71f4c351511b8683b7b28528259774a7f11c3de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 8 Dec 2017 12:44:44 +0100 Subject: [PATCH] STORM-2850: Make ManualPartitionSubscription call rebalance listener on revoke hook before assigning new partitions to the consumer --- .../ManualPartitionSubscription.java | 2 +- .../ManualPartitionSubscriptionTest.java | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java index 32376d40483..ebfd30ca544 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java @@ -56,11 +56,11 @@ public void refreshAssignment() { Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); Set newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); if (!newAssignment.equals(currentAssignment)) { - consumer.assign(newAssignment); if (currentAssignment != null) { listener.onPartitionsRevoked(currentAssignment); } currentAssignment = newAssignment; + consumer.assign(newAssignment); listener.onPartitionsAssigned(newAssignment); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java new file mode 100644 index 00000000000..8a7b316d89b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; +import org.mockito.InOrder; + +public class ManualPartitionSubscriptionTest { + + @Test + public void testCanReassignPartitions() { + ManualPartitioner partitionerMock = mock(ManualPartitioner.class); + TopicFilter filterMock = mock(TopicFilter.class); + KafkaConsumer consumerMock = mock(KafkaConsumer.class); + ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class); + TopologyContext contextMock = mock(TopologyContext.class); + ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock); + + List onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + List twoPartitions = new ArrayList<>(); + twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0)); + twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1)); + when(partitionerMock.partition(anyList(), any(TopologyContext.class))) + .thenReturn(onePartition) + .thenReturn(twoPartitions); + + //Set the first assignment + subscription.subscribe(consumerMock, listenerMock, contextMock); + + InOrder inOrder = inOrder(consumerMock, listenerMock); + inOrder.verify(consumerMock).assign(new HashSet<>(onePartition)); + inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition)); + + clearInvocations(consumerMock, listenerMock); + + //Update to set the second assignment + subscription.refreshAssignment(); + + //The partition revocation hook must be called before the new partitions are assigned to the consumer, + //to allow the revocation hook to commit offsets for the revoked partitions. + inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition)); + inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions)); + inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions)); + } + +}