Skip to content

Commit

Permalink
Minor cleanup and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
onobc committed Jul 11, 2022
1 parent fa606af commit 61d104e
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.config.PulsarListenerConfigUtils;
import org.springframework.pulsar.config.PulsarListenerContainerFactoryImpl;
import org.springframework.pulsar.config.DefaultPulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.PulsarContainerProperties;

Expand All @@ -44,9 +44,9 @@ public PulsarAnnotationDrivenConfiguration(PulsarProperties pulsarProperties) {

@Bean
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
PulsarListenerContainerFactoryImpl<?, ?> pulsarListenerContainerFactory(
DefaultPulsarListenerContainerFactory<?, ?> pulsarListenerContainerFactory(
ObjectProvider<PulsarConsumerFactory<Object>> pulsarConsumerFactory) {
PulsarListenerContainerFactoryImpl<Object, Object> factory = new PulsarListenerContainerFactoryImpl<>();
DefaultPulsarListenerContainerFactory<Object, Object> factory = new DefaultPulsarListenerContainerFactory<>();

final PulsarConsumerFactory<Object> pulsarConsumerFactory1 = pulsarConsumerFactory.getIfAvailable();
factory.setPulsarConsumerFactory(pulsarConsumerFactory1);
Expand All @@ -68,7 +68,7 @@ public PulsarAnnotationDrivenConfiguration(PulsarProperties pulsarProperties) {

@Configuration(proxyBeanMethods = false)
@EnablePulsar
@ConditionalOnMissingBean(name = PulsarListenerConfigUtils.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@ConditionalOnMissingBean(name = PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void listen(String foo) {
@Import(PulsarAutoConfiguration.class)
public static class BatchListenerConfig {

@PulsarListener(subscriptionName = "test-exclusive-sub-2", topics = "hello-pulsar-exclusive", batch = "true")
@PulsarListener(subscriptionName = "test-exclusive-sub-2", topics = "hello-pulsar-exclusive", batch = true)
public void listen(List<String> foo) {
foo.forEach(t -> latch2.countDown());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import org.springframework.context.annotation.Import;

/**
* Enable Pulsar listener annotated endpoints that are created under the covers by a
* {@link org.springframework.pulsar.config.AbstractPulsarListenerContainerFactory}.
* Enables detection of {@link PulsarListener} annotations on any Spring-managed bean in the container.
*
* @author Soby Chacko
*/
Expand All @@ -36,4 +35,3 @@
@Import(PulsarListenerConfigurationSelector.class)
public @interface EnablePulsar {
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.pulsar.config.PulsarListenerConfigUtils;
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;

/**
Expand All @@ -41,15 +41,13 @@ public class PulsarBootstrapConfiguration implements ImportBeanDefinitionRegistr

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
PulsarListenerConfigUtils.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

registry.registerBeanDefinition(PulsarListenerConfigUtils.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
if (!registry.containsBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(PulsarListenerAnnotationBeanPostProcessor.class));
}

if (!registry.containsBeanDefinition(PulsarListenerConfigUtils.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(PulsarListenerConfigUtils.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
if (!registry.containsBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(PulsarListenerBeanNames.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(PulsarListenerEndpointRegistry.class));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;

/**
* Annotation that marks a method to be the target of a Pulsar message listener on the
Expand Down Expand Up @@ -55,7 +57,7 @@
* <p>If none is specified an auto-generated id is used.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.pulsar.config.PulsarListenerEndpointRegistry#getListenerContainer(String)
* @see PulsarListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";

Expand All @@ -74,8 +76,13 @@
SchemaType schemaType() default SchemaType.NONE;

/**
* Specific container factory to use on this listener.
* @return {@code containerFactory} to use on this Pulsar listener.
* The bean name of the {@link PulsarListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the container factory bean name.
*/
String containerFactory() default "";

Expand Down Expand Up @@ -108,7 +115,7 @@
*
* @return whether this listener is in batch mode or not.
*/
String batch() default "";
boolean batch() default false;

/**
* A pseudo bean name used in SpEL expressions within this annotation to reference
Expand All @@ -120,5 +127,25 @@
*/
String beanRef() default "__listener";

/**
* Pulsar consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <p>
* <b>Supported Syntax</b>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* SpEL expressions must resolve to a {@link String}, a @{link String[]} or a
* {@code Collection<String>} where each member of the array or collection is a
* property name + value with the above formats.
* @return the properties.
*/
String[] properties() default {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
Expand All @@ -61,7 +60,6 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.OrderComparator;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
Expand All @@ -80,8 +78,9 @@
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.pulsar.config.MethodPulsarListenerEndpoint;
import org.springframework.pulsar.config.PulsarListenerConfigUtils;
import org.springframework.pulsar.config.PulsarListenerBeanNames;
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpoint;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.util.Assert;
Expand All @@ -92,12 +91,12 @@
/**
* Bean post-processor that registers methods annotated with {@link PulsarListener}
* to be invoked by a Pulsar message listener container created under the covers
* by a {@link org.springframework.pulsar.config.PulsarListenerContainerFactory}
* by a {@link PulsarListenerContainerFactory}
* according to the parameters of the annotation.
*
* <p>Annotated methods can use flexible arguments as defined by {@link PulsarListener}.
*
* <p>This post-processor is automatically registered by Spring's {@link EnablePulsar}
* <p>This post-processor is automatically registered by the {@link EnablePulsar}
* annotation.
*
* <p>Auto-detect any {@link PulsarListenerConfigurer} instances in the container,
Expand All @@ -115,7 +114,7 @@
* @see PulsarListenerConfigurer
* @see PulsarListenerEndpointRegistrar
* @see PulsarListenerEndpointRegistry
* @see org.springframework.pulsar.config.PulsarListenerEndpoint
* @see PulsarListenerEndpoint
* @see MethodPulsarListenerEndpoint
*/
public class PulsarListenerAnnotationBeanPostProcessor<K, V> implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
Expand All @@ -136,17 +135,20 @@ public class PulsarListenerAnnotationBeanPostProcessor<K, V> implements BeanPost
private static final String GENERATED_ID_PREFIX = "org.springframework.Pulsar.PulsarListenerEndpointContainer#";

private ApplicationContext applicationContext;

private BeanFactory beanFactory;

private BeanExpressionResolver resolver;

private BeanExpressionContext expressionContext;

private PulsarListenerEndpointRegistry endpointRegistry;

private String defaultContainerFactoryBeanName = DEFAULT_PULSAR_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

private final PulsarListenerEndpointRegistrar registrar = new PulsarListenerEndpointRegistrar();
private final PulsarHandlerMethodFactoryAdapter messageHandlerMethodFactory =
new PulsarHandlerMethodFactoryAdapter();

private final PulsarHandlerMethodFactoryAdapter messageHandlerMethodFactory = new PulsarHandlerMethodFactoryAdapter();

private Charset charset = StandardCharsets.UTF_8;

Expand Down Expand Up @@ -179,23 +181,20 @@ public void setCharset(Charset charset) {
}

@Override
public void afterPropertiesSet() throws Exception {
public void afterPropertiesSet() {
buildEnhancer();
}

private void buildEnhancer() {
if (this.applicationContext != null) {
Map<String, AnnotationEnhancer> enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
if (enhancersMap.size() > 0) {
List<AnnotationEnhancer> enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
.collect(Collectors.toList());
List<AnnotationEnhancer> enhancers = this.applicationContext
.getBeanProvider(AnnotationEnhancer.class, false)
.orderedStream()
.toList();
if (!enhancers.isEmpty()) {
this.enhancer = (attrs, element) -> {
Map<String, Object> newAttrs = attrs;
for (AnnotationEnhancer enh : enhancers) {
newAttrs = enh.apply(newAttrs, element);
attrs = enh.apply(attrs, element);
}
return attrs;
};
Expand All @@ -207,20 +206,15 @@ private void buildEnhancer() {
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);

if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, PulsarListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(PulsarListenerConfigurer.class);
for (PulsarListenerConfigurer configurer : instances.values()) {
configurer.configurePulsarListeners(this.registrar);
}
}
this.beanFactory.getBeanProvider(PulsarListenerConfigurer.class)
.forEach(c -> c.configurePulsarListeners(this.registrar));

if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
PulsarListenerConfigUtils.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
PulsarListenerBeanNames.PULSAR_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
PulsarListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
Expand Down Expand Up @@ -360,9 +354,7 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint<?> end
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
resolvePulsarProperties(endpoint, pulsarListener.properties());
if (StringUtils.hasText(pulsarListener.batch())) {
endpoint.setBatchListener(Boolean.parseBoolean(pulsarListener.batch()));
}
endpoint.setBatchListener(pulsarListener.batch());
endpoint.setBeanFactory(this.beanFactory);
}

Expand Down Expand Up @@ -588,26 +580,9 @@ private PulsarListener enhance(AnnotatedElement element, PulsarListener ann) {


private void addFormatters(FormatterRegistry registry) {
for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
registry.addConverter(converter);
}
for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
registry.addConverter(converter);
}
for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
registry.addFormatter(formatter);
}
}

private <T> Collection<T> getBeansOfType(Class<T> type) {
if (PulsarListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
return ((ListableBeanFactory) PulsarListenerAnnotationBeanPostProcessor.this.beanFactory)
.getBeansOfType(type)
.values();
}
else {
return Collections.emptySet();
}
this.beanFactory.getBeanProvider(Converter.class).forEach(registry::addConverter);
this.beanFactory.getBeanProvider(GenericConverter.class).forEach(registry::addConverter);
this.beanFactory.getBeanProvider(Formatter.class).forEach(registry::addFormatter);
}

@Override
Expand Down Expand Up @@ -802,4 +777,3 @@ public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, Anno


}

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,31 @@

package org.springframework.pulsar.annotation;

import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpoint;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;

/**
* Allow custom configuration on PulsarListener endpoint registry.
* Optional interface to be implemented by Spring managed bean willing to
* customize how Pulsar listener endpoints are configured. Typically used
* to define the default {@link PulsarListenerContainerFactory} to use or
* for registering Pulsar endpoints in a <em>programmatic</em> fashion as
* opposed to the <em>declarative</em> approach of using the
* {@link PulsarListener} annotation.
*
* @author Soby Chacko
*
* @see PulsarListenerEndpointRegistrar
*/
public interface PulsarListenerConfigurer {

/**
* Callback allowing a {@link PulsarListenerEndpointRegistry} and specific
* {@link PulsarListenerEndpoint} instances to be registered against the
* given {@link PulsarListenerEndpointRegistrar}. The default
* {@link PulsarListenerContainerFactory} can also be customized.
* @param registrar the registrar to be configured
*/
void configurePulsarListeners(PulsarListenerEndpointRegistrar registrar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
import java.lang.annotation.Target;

/**
* Container annotation for aggregating several {@link PulsarListener} annotations.
* Container annotation that aggregates several {@link PulsarListener} annotations.
* <p>
* Can be used natively, declaring several nested {@link PulsarListener} annotations.
* Can also be used in conjunction with Java 8's support for repeatable annotations,
* where {@link PulsarListener} can simply be declared several times on the same method
* (or class), implicitly generating this container annotation.
*
* @author Soby Chacko
*
* @see PulsarListener
*/
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*
* @author Soby Chacko
*/
public class PulsarListenerContainerFactoryImpl<C, T> extends AbstractPulsarListenerContainerFactory<DefaultPulsarMessageListenerContainer<T>, T> {
public class DefaultPulsarListenerContainerFactory<C, T> extends AbstractPulsarListenerContainerFactory<DefaultPulsarMessageListenerContainer<T>, T> {

@Override
protected DefaultPulsarMessageListenerContainer<T> createContainerInstance(PulsarListenerEndpoint endpoint) {
Expand Down
Loading

0 comments on commit 61d104e

Please sign in to comment.