Skip to content

Commit

Permalink
Port auto-config changes from 0.2.x (spring-projects#402)
Browse files Browse the repository at this point in the history
* Port "Use builder to autoconfigure PulsarProducerFactory (spring-projects#397)" from 0.2.x

* Port "Use builder to autoconfigure PulsarConsumerFactory (spring-projects#399)" from 0.2.x

* Port "Use builder to autoconfigure PulsarReaderFactory (spring-projects#400)" from 0.2.x

* Port "Use builder to autoconfigure PulsarAdministration (spring-projects#401)" from 0.2.x
  • Loading branch information
onobc authored May 7, 2023
1 parent 5471b3a commit c043d8c
Show file tree
Hide file tree
Showing 28 changed files with 518 additions and 520 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
Expand All @@ -30,7 +28,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -107,7 +104,7 @@ public static class TopLevelConfig {

@Bean
public PulsarProducerFactory<String> pulsarProducerFactory(PulsarClient pulsarClient) {
return new DefaultPulsarProducerFactory<>(pulsarClient, new HashMap<>());
return new DefaultPulsarProducerFactory<>(pulsarClient);
}

@Bean
Expand Down Expand Up @@ -139,8 +136,7 @@ ReactivePulsarListenerContainerFactory<String> reactivePulsarListenerContainerFa

@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(
PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()));
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}

@Bean
Expand Down Expand Up @@ -347,8 +343,7 @@ class SchemaTestCases {

@Test
void jsonSchema() throws Exception {
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
PulsarTemplate<User> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("json-topic", new User("Jason", i), JSONSchema.of(User.class));
Expand All @@ -358,8 +353,7 @@ void jsonSchema() throws Exception {

@Test
void avroSchema() throws Exception {
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
PulsarTemplate<User> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("avro-topic", new User("Avi", i), AvroSchema.of(User.class));
Expand All @@ -370,7 +364,7 @@ void avroSchema() throws Exception {
@Test
void keyvalueSchema() throws Exception {
PulsarProducerFactory<KeyValue<String, Integer>> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
pulsarClient, Collections.emptyMap());
pulsarClient);
PulsarTemplate<KeyValue<String, Integer>> template = new PulsarTemplate<>(pulsarProducerFactory);
Schema<KeyValue<String, Integer>> kvSchema = Schema.KeyValue(Schema.STRING, Schema.INT32,
KeyValueEncodingType.INLINE);
Expand All @@ -382,8 +376,8 @@ void keyvalueSchema() throws Exception {

@Test
void protobufSchema() throws Exception {
PulsarProducerFactory<Proto.Person> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<Proto.Person> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
pulsarClient);
PulsarTemplate<Proto.Person> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("protobuf-topic", Proto.Person.newBuilder().setId(i).setName("Paul").build(),
Expand Down Expand Up @@ -494,8 +488,7 @@ class SchemaCustomMappingsTestCases {

@Test
void jsonSchema() throws Exception {
PulsarProducerFactory<User2> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<User2> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
PulsarTemplate<User2> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("json-custom-schema-topic", new User2("Jason", i), JSONSchema.of(User2.class));
Expand All @@ -505,8 +498,7 @@ void jsonSchema() throws Exception {

@Test
void avroSchema() throws Exception {
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<User> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
PulsarTemplate<User> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("avro-custom-schema-topic", new User("Avi", i), AvroSchema.of(User.class));
Expand All @@ -517,7 +509,7 @@ void avroSchema() throws Exception {
@Test
void keyvalueSchema() throws Exception {
PulsarProducerFactory<KeyValue<String, User2>> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
pulsarClient, Collections.emptyMap());
pulsarClient);
PulsarTemplate<KeyValue<String, User2>> template = new PulsarTemplate<>(pulsarProducerFactory);
Schema<KeyValue<String, User2>> kvSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(User2.class),
KeyValueEncodingType.INLINE);
Expand All @@ -530,8 +522,8 @@ void keyvalueSchema() throws Exception {

@Test
void protobufSchema() throws Exception {
PulsarProducerFactory<Proto.Person> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<Proto.Person> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(
pulsarClient);
PulsarTemplate<Proto.Person> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("protobuf-custom-schema-topic",
Expand Down Expand Up @@ -664,8 +656,7 @@ class TopicCustomMappingsTestCases {

@Test
void complexMessageTypeTopicMapping() throws Exception {
PulsarProducerFactory<User2> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<User2> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
PulsarTemplate<User2> template = new PulsarTemplate<>(pulsarProducerFactory);
Schema<User2> schema = Schema.JSON(User2.class);
for (int i = 0; i < 3; i++) {
Expand All @@ -676,8 +667,7 @@ void complexMessageTypeTopicMapping() throws Exception {

@Test
void primitiveMessageTypeTopicMapping() throws Exception {
PulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);
PulsarTemplate<String> template = new PulsarTemplate<>(pulsarProducerFactory);
for (int i = 0; i < 3; i++) {
template.send("rplt-topicMapping-string-topic", "Susan " + i, Schema.STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -67,16 +66,17 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor
* Construct a caching producer factory with the specified values for the cache
* configuration.
* @param pulsarClient the client used to create the producers
* @param producerConfig the configuration to use when creating a producer
* @param defaultTopic the default topic to use for the producers
* @param defaultConfigCustomizer the default configuration to apply to the producers
* @param topicResolver the topic resolver to use
* @param cacheExpireAfterAccess time period to expire unused entries in the cache
* @param cacheMaximumSize maximum size of cache (entries)
* @param cacheInitialCapacity the initial size of cache
*/
public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Object> producerConfig,
TopicResolver topicResolver, Duration cacheExpireAfterAccess, Long cacheMaximumSize,
Integer cacheInitialCapacity) {
super(pulsarClient, producerConfig, topicResolver);
public CachingPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String defaultTopic,
ProducerBuilderCustomizer<T> defaultConfigCustomizer, TopicResolver topicResolver,
Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity) {
super(pulsarClient, defaultTopic, defaultConfigCustomizer, topicResolver);
var cacheFactory = CacheProviderFactory.<ProducerCacheKey<T>, Producer<T>>load();
this.producerCache = cacheFactory.create(cacheExpireAfterAccess, cacheMaximumSize, cacheInitialCapacity,
(key, producer, cause) -> {
Expand All @@ -90,8 +90,8 @@ public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Objec
protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
Objects.requireNonNull(schema, "Schema must be specified");
String resolveTopicName = resolveTopicName(topic);
ProducerCacheKey<T> producerCacheKey = new ProducerCacheKey<>(schema, resolveTopicName,
var resolveTopicName = resolveTopicName(topic);
var producerCacheKey = new ProducerCacheKey<>(schema, resolveTopicName,
encryptionKeys == null ? null : new HashSet<>(encryptionKeys), customizers);
return this.producerCache.getOrCreateIfAbsent(producerCacheKey,
(st) -> createCacheableProducer(st.schema, st.topic, st.encryptionKeys, customizers));
Expand All @@ -100,7 +100,7 @@ protected Producer<T> doCreateProducer(Schema<T> schema, @Nullable String topic,
private Producer<T> createCacheableProducer(Schema<T> schema, String topic,
@Nullable Collection<String> encryptionKeys, @Nullable List<ProducerBuilderCustomizer<T>> customizers) {
try {
Producer<T> producer = super.doCreateProducer(schema, topic, encryptionKeys, customizers);
var producer = super.doCreateProducer(schema, topic, encryptionKeys, customizers);
return new ProducerWithCloseCallback<>(producer,
(p) -> this.logger.trace(() -> "Client closed producer %s but will skip actual closing"
.formatted(ProducerUtils.formatProducer(producer))));
Expand Down Expand Up @@ -174,7 +174,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ProducerCacheKey<?> that = (ProducerCacheKey<?>) o;
var that = (ProducerCacheKey<?>) o;
return this.topic.equals(that.topic) && this.schemaHash.equals(that.schemaHash)
&& Objects.equals(this.encryptionKeys, that.encryptionKeys)
&& Objects.equals(this.customizers, that.customizers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DefaultPulsarClientFactory implements PulsarClientFactory {
* @param serviceUrl the service url
*/
public DefaultPulsarClientFactory(String serviceUrl) {
this((clientBuilder -> clientBuilder.serviceUrl(serviceUrl)));
this((clientBuilder) -> clientBuilder.serviceUrl(serviceUrl));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -30,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
Expand All @@ -41,22 +41,25 @@
* @author Soby Chacko
* @author Alexander Preuß
* @author Christophe Bornet
* @author Chris Bono
*/
public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T> {

private final Map<String, Object> consumerConfig;

private final PulsarClient pulsarClient;

@Nullable
private final ConsumerBuilderCustomizer<T> defaultConfigCustomizer;

/**
* Construct a consumer factory instance.
* @param pulsarClient the client used to consume
* @param consumerConfig default configuration to apply to the created consumer or
* empty map to use no default configuration
* @param defaultConfigCustomizer the default configuration to apply to the consumers
* or null to use no default configuration
*/
public DefaultPulsarConsumerFactory(PulsarClient pulsarClient, Map<String, Object> consumerConfig) {
public DefaultPulsarConsumerFactory(PulsarClient pulsarClient,
ConsumerBuilderCustomizer<T> defaultConfigCustomizer) {
this.pulsarClient = pulsarClient;
this.consumerConfig = Collections.unmodifiableMap(consumerConfig);
this.defaultConfigCustomizer = defaultConfigCustomizer;
}

@Override
Expand All @@ -72,25 +75,35 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
@Nullable List<ConsumerBuilderCustomizer<T>> customizers) throws PulsarClientException {
Objects.requireNonNull(schema, "Schema must be specified");
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);
Map<String, Object> config = new HashMap<>(this.consumerConfig);
if (topics != null) {
config.put("topicNames", new HashSet<>(topics));

// Apply the default config customizer (preserve the topic)
if (this.defaultConfigCustomizer != null) {
this.defaultConfigCustomizer.customize(consumerBuilder);
}
if (metadataProperties != null) {
config.put("properties", new TreeMap<>(metadataProperties));
if (topics != null) {
replaceTopicsOnBuilder(consumerBuilder, topics);
}
if (subscriptionName != null) {
config.put("subscriptionName", subscriptionName);
consumerBuilder.subscriptionName(subscriptionName);
}
if (metadataProperties != null) {
replaceMetadataPropertiesOnBuilder(consumerBuilder, metadataProperties);
}
ConsumerBuilderConfigurationUtil.loadConf(consumerBuilder, config);
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
return consumerBuilder.subscribe();
}

public Map<String, Object> getConsumerConfig() {
return this.consumerConfig;
private void replaceTopicsOnBuilder(ConsumerBuilder<T> builder, Collection<String> topics) {
var builderImpl = (ConsumerBuilderImpl<T>) builder;
builderImpl.getConf().setTopicNames(new HashSet<>(topics));
}

private void replaceMetadataPropertiesOnBuilder(ConsumerBuilder<T> builder,
Map<String, String> metadataProperties) {
var builderImpl = (ConsumerBuilderImpl<T>) builder;
builderImpl.getConf().setProperties(new TreeMap<>(metadataProperties));
}

}
Loading

0 comments on commit c043d8c

Please sign in to comment.