Skip to content

Commit

Permalink
STORM-4055 Kafka Consumer ConcurrentModificationException fix (#3691)
Browse files Browse the repository at this point in the history
* used kafka Admin client instead of consumer object to collect metrics

* return mock admin client instead of null in tests
  • Loading branch information
anand-h-codes authored Sep 4, 2024
1 parent e7eb179 commit 14f0944
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -50,9 +51,9 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
import org.apache.storm.kafka.spout.internal.ClientFactory;
import org.apache.storm.kafka.spout.internal.ClientFactoryDefault;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics2.KafkaOffsetMetricManager;
Expand All @@ -77,10 +78,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {

// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private final ConsumerFactory<K, V> kafkaConsumerFactory;
private final ClientFactory<K, V> kafkaClientFactory;
private final TopicAssigner topicAssigner;
private transient Consumer<K, V> consumer;

private transient Admin admin;
// Bookkeeping
// Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
Expand Down Expand Up @@ -108,12 +110,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;

public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new ConsumerFactoryDefault<>(), new TopicAssigner());
this(kafkaSpoutConfig, new ClientFactoryDefault<>(), new TopicAssigner());
}

@VisibleForTesting
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ClientFactory<K, V> kafkaClientFactory, TopicAssigner topicAssigner) {
this.kafkaClientFactory = kafkaClientFactory;
this.topicAssigner = topicAssigner;
this.kafkaSpoutConfig = kafkaSpoutConfig;
}
Expand Down Expand Up @@ -146,11 +148,13 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC

rebalanceListener = new KafkaSpoutConsumerRebalanceListener();

consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
consumer = kafkaClientFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
admin = kafkaClientFactory.createAdmin(kafkaSpoutConfig.getKafkaProps());


tupleListener.open(conf, context);
this.kafkaOffsetMetricManager
= new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer, context);
= new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, context);

LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
Expand Down Expand Up @@ -663,6 +667,7 @@ private void shutdown() {
commitIfNecessary();
} finally {
//remove resources
admin.close();
consumer.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import java.io.Serializable;
import java.util.Map;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;

/**
* This is here to enable testing.
*/
public interface ConsumerFactory<K, V> extends Serializable {
public interface ClientFactory<K, V> extends Serializable {
Consumer<K, V> createConsumer(Map<String, Object> consumerProps);

Admin createAdmin(Map<String, Object> adminProps);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
package org.apache.storm.kafka.spout.internal;

import java.util.Map;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerFactoryDefault<K, V> implements ConsumerFactory<K, V> {
public class ClientFactoryDefault<K, V> implements ClientFactory<K, V> {

@Override
public KafkaConsumer<K, V> createConsumer(Map<String, Object> consumerProps) {
return new KafkaConsumer<>(consumerProps);
}


@Override
public Admin createAdmin(Map<String, Object> adminProps) {
return Admin.create(adminProps);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.task.TopologyContext;
Expand All @@ -36,17 +36,17 @@
public class KafkaOffsetMetricManager<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetricManager.class);
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
private final Supplier<Consumer<K, V>> consumerSupplier;
private final Supplier<Admin> adminSupplier;
private TopologyContext topologyContext;

private Map<String, KafkaOffsetTopicMetrics> topicMetricsMap;
private Map<TopicPartition, KafkaOffsetPartitionMetrics> topicPartitionMetricsMap;

public KafkaOffsetMetricManager(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
Supplier<Consumer<K, V>> consumerSupplier,
Supplier<Admin> adminSupplier,
TopologyContext topologyContext) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.consumerSupplier = consumerSupplier;
this.adminSupplier = adminSupplier;
this.topologyContext = topologyContext;

this.topicMetricsMap = new HashMap<>();
Expand All @@ -68,7 +68,7 @@ public void registerMetricsForNewTopicPartitions(Set<TopicPartition> newAssignme
}

KafkaOffsetPartitionMetrics topicPartitionMetricSet
= new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, consumerSupplier, topicPartition, topicMetrics);
= new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, adminSupplier, topicPartition, topicMetrics);
topicPartitionMetricsMap.put(topicPartition, topicPartitionMetricSet);
topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
Expand All @@ -49,17 +53,17 @@
public class KafkaOffsetPartitionMetrics<K, V> implements MetricSet {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionMetrics.class);
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
private final Supplier<Consumer<K, V>> consumerSupplier;
private final Supplier<Admin> adminSupplier;

private TopicPartition topicPartition;
private KafkaOffsetTopicMetrics topicMetrics;

public KafkaOffsetPartitionMetrics(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
Supplier<Consumer<K, V>> consumerSupplier,
Supplier<Admin> adminSupplier,
TopicPartition topicPartition,
KafkaOffsetTopicMetrics topicMetrics) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.consumerSupplier = consumerSupplier;
this.adminSupplier = adminSupplier;
this.topicPartition = topicPartition;
this.topicMetrics = topicMetrics;

Expand Down Expand Up @@ -170,36 +174,54 @@ public Long getValue() {
}

private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition> topicPartitions) {
Consumer<K, V> consumer = consumerSupplier.get();
if (consumer == null) {
LOG.error("Kafka consumer object is null, returning 0.");
Admin admin = adminSupplier.get();
if (admin == null) {
LOG.error("Kafka admin object is null, returning 0.");
return Collections.EMPTY_MAP;
}

Map<TopicPartition, Long> beginningOffsets;
try {
beginningOffsets = consumer.beginningOffsets(topicPartitions);
} catch (RetriableException e) {
beginningOffsets = getOffsets(admin, topicPartitions, OffsetSpec.earliest());
} catch (RetriableException | ExecutionException | InterruptedException e) {
LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartition, e);
return Collections.EMPTY_MAP;
}
return beginningOffsets;
}

private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> topicPartitions) {
Consumer<K, V> consumer = consumerSupplier.get();
if (consumer == null) {
LOG.error("Kafka consumer object is null, returning 0.");
Admin admin = adminSupplier.get();
if (admin == null) {
LOG.error("Kafka admin object is null, returning 0.");
return Collections.EMPTY_MAP;
}

Map<TopicPartition, Long> endOffsets;
try {
endOffsets = consumer.endOffsets(topicPartitions);
} catch (RetriableException e) {
endOffsets = getOffsets(admin, topicPartitions, OffsetSpec.latest());
} catch (RetriableException | ExecutionException | InterruptedException e) {
LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartition, e);
return Collections.EMPTY_MAP;
}
return endOffsets;
}

private static Map<TopicPartition, Long> getOffsets(Admin admin, Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec)
throws InterruptedException, ExecutionException {

Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>();
for (TopicPartition topicPartition : topicPartitions) {
offsetSpecMap.put(topicPartition, offsetSpec);
}
Map<TopicPartition, Long> ret = new HashMap<>();
ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
KafkaFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = all.get();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry :
topicPartitionListOffsetsResultInfoMap.entrySet()) {
ret.put(entry.getKey(), entry.getValue().offset());
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.ClientFactory;
import org.apache.storm.kafka.spout.internal.ClientFactoryDefault;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
Expand All @@ -55,13 +54,13 @@ public class KafkaTridentSpoutCoordinator<K, V> implements
* @param kafkaSpoutConfig The spout config to use
*/
public KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new ConsumerFactoryDefault<>());
this(kafkaSpoutConfig, new ClientFactoryDefault<>());
}

KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> consumerFactory) {
KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, ClientFactory<K, V> clientFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
this.consumer = clientFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
LOG.debug("Created {}", this.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.ClientFactory;
import org.apache.storm.kafka.spout.internal.ClientFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
Expand Down Expand Up @@ -82,14 +82,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
* @param topologyContext The topology context
*/
public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
this(kafkaSpoutConfig, topologyContext, new ConsumerFactoryDefault<>(), new TopicAssigner());
this(kafkaSpoutConfig, topologyContext, new ClientFactoryDefault<>(), new TopicAssigner());
}

@VisibleForTesting
KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
ClientFactory<K, V> clientFactory, TopicAssigner topicAssigner) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
this.consumer = clientFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
this.topologyContext = topologyContext;
this.translator = kafkaSpoutConfig.getTranslator();
this.topicAssigner = topicAssigner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitExtension;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.ClientFactory;
import org.apache.storm.kafka.spout.internal.ClientFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
Expand All @@ -60,6 +63,8 @@ public abstract class KafkaSpoutAbstractTest {
final long commitOffsetPeriodMs;

private KafkaConsumer<String, String> consumerSpy;

private Admin adminSpy;
KafkaSpout<String, String> spout;

@Captor
Expand All @@ -81,6 +86,8 @@ public void setUp() {

consumerSpy = createConsumerSpy();

adminSpy = createAdminSpy();

spout = new KafkaSpout<>(spoutConfig, createConsumerFactory(), new TopicAssigner());

simulatedTime = new Time.SimulatedTime();
Expand All @@ -90,19 +97,26 @@ protected KafkaConsumer<String, String> getKafkaConsumer() {
return consumerSpy;
}

private ConsumerFactory<String, String> createConsumerFactory() {
private ClientFactory<String, String> createConsumerFactory() {

return new ConsumerFactory<String, String>() {
return new ClientFactory<String, String>() {
@Override
public KafkaConsumer<String, String> createConsumer(Map<String, Object> consumerProps) {
return consumerSpy;
}

@Override
public Admin createAdmin(Map<String, Object> adminProps) {
return adminSpy;
}
};
}

KafkaConsumer<String, String> createConsumerSpy() {
return spy(new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig.getKafkaProps()));
return spy(new ClientFactoryDefault<String, String>().createConsumer(spoutConfig.getKafkaProps()));
}

Admin createAdminSpy(){
return spy(new ClientFactoryDefault<String,String>().createAdmin(spoutConfig.getKafkaProps()));
}

@AfterEach
Expand Down
Loading

0 comments on commit 14f0944

Please sign in to comment.