Skip to content

Commit

Permalink
Hide observation API from public API on template and listener (spring…
Browse files Browse the repository at this point in the history
…-projects#389)

* Remove observation registry and convention from
  constructors on PulsarTemplate and CPLCF

* Lookup observation registry and convention from
  app context at start time
  • Loading branch information
onobc authored Apr 11, 2023
1 parent 86c95f8 commit 9e00c2b
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
containerProperties.setSubscriptionType(this.pulsarProperties.getConsumer().getSubscriptionType());
containerProperties.setObservationConvention(observationConventionProvider.getIfUnique());
containerProperties.setObservationEnabled(this.pulsarProperties.getListener().isObservationsEnabled());

PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
PulsarProperties.Listener listenerProperties = this.pulsarProperties.getListener();
Expand All @@ -79,8 +79,7 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
map.from(listenerProperties::getMaxNumMessages).to(containerProperties::setMaxNumMessages);

return new ConcurrentPulsarListenerContainerFactory<>(consumerFactoryProvider.getIfAvailable(),
containerProperties, this.pulsarProperties.getListener().isObservationsEnabled()
? observationRegistryProvider.getIfUnique() : null);
containerProperties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
import org.springframework.pulsar.function.PulsarFunctionAdministration;
import org.springframework.pulsar.function.PulsarSink;
import org.springframework.pulsar.function.PulsarSource;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;

import io.micrometer.observation.ObservationRegistry;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
Expand Down Expand Up @@ -101,12 +98,9 @@ public PulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsar
@ConditionalOnMissingBean
public PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
ObjectProvider<ProducerInterceptor> interceptorsProvider, SchemaResolver schemaResolver,
TopicResolver topicResolver, ObjectProvider<ObservationRegistry> observationRegistryProvider,
ObjectProvider<PulsarTemplateObservationConvention> observationConventionProvider) {
TopicResolver topicResolver) {
return new PulsarTemplate<>(pulsarProducerFactory, interceptorsProvider.orderedStream().toList(),
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled()
? observationRegistryProvider.getIfUnique() : null,
observationConventionProvider.getIfUnique());
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@
import org.springframework.pulsar.function.PulsarFunctionAdministration;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.observation.PulsarListenerObservationConvention;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;

import com.github.benmanes.caffeine.cache.Caffeine;
import io.micrometer.observation.ObservationRegistry;

/**
* Autoconfiguration tests for {@link PulsarAutoConfiguration}.
Expand Down Expand Up @@ -420,61 +417,42 @@ class ObservationAutoConfigurationTests {

@Test
void templateObservationsEnabledByDefault() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class)
.extracting("observationRegistry").isSameAs(observationRegistry)));
contextRunner.run((context -> assertThat(context).getBean(PulsarTemplate.class)
.hasFieldOrPropertyWithValue("observationEnabled", true)));
}

@Test
void templateObservationsCanBeDisabled() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withPropertyValues("spring.pulsar.template.observations-enabled=false")
.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class)
.extracting("observationRegistry").isNull()));
void templateObservationsEnabledExplicitly() {
contextRunner.withPropertyValues("spring.pulsar.template.observations-enabled=true")
.run((context -> assertThat(context).getBean(PulsarTemplate.class)
.hasFieldOrPropertyWithValue("observationEnabled", true)));
}

@Test
void templateObservationsWithCustomConvention() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
PulsarTemplateObservationConvention customConvention = mock(PulsarTemplateObservationConvention.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.withBean("customConvention", PulsarTemplateObservationConvention.class, () -> customConvention)
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarTemplate.class)
.extracting("observationConvention").isSameAs(customConvention)));
void templateObservationsCanBeDisabled() {
contextRunner.withPropertyValues("spring.pulsar.template.observations-enabled=false")
.run((context -> assertThat(context).getBean(PulsarTemplate.class)
.hasFieldOrPropertyWithValue("observationEnabled", false)));
}

@Test
void listenerObservationsEnabledByDefault() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed()
.getBean(ConcurrentPulsarListenerContainerFactory.class).extracting("observationRegistry")
.isSameAs(observationRegistry)));
contextRunner.run((context -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class)
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", true)));
}

@Test
void listenerObservationsCanBeDisabled() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
contextRunner.withPropertyValues("spring.pulsar.listener.observations-enabled=false")
.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.run((context -> assertThat(context).hasNotFailed()
.getBean(ConcurrentPulsarListenerContainerFactory.class).extracting("observationRegistry")
.isNull()));
void listenerObservationsEnabledExplicitly() {
contextRunner.withPropertyValues("spring.pulsar.listener.observations-enabled=true")
.run((context -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class)
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", true)));
}

@Test
void listenerObservationsWithCustomConvention() {
ObservationRegistry observationRegistry = mock(ObservationRegistry.class);
PulsarListenerObservationConvention customConvention = mock(PulsarListenerObservationConvention.class);
contextRunner.withBean("observationRegistry", ObservationRegistry.class, () -> observationRegistry)
.withBean("customConvention", PulsarListenerObservationConvention.class, () -> customConvention)
.run((context -> assertThat(context).hasNotFailed()
.getBean(ConcurrentPulsarListenerContainerFactory.class)
.extracting(ConcurrentPulsarListenerContainerFactory<Object>::getContainerProperties)
.extracting(PulsarContainerProperties::getObservationConvention)
.isSameAs(customConvention)));
void listenerObservationsCanBeDisabled() {
contextRunner.withPropertyValues("spring.pulsar.listener.observations-enabled=false")
.run((context -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class)
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false)));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.support.JavaUtils;
import org.springframework.pulsar.support.MessageConverter;

import io.micrometer.observation.ObservationRegistry;

/**
* Base {@link PulsarListenerContainerFactory} implementation.
*
Expand All @@ -51,8 +48,6 @@ public abstract class AbstractPulsarListenerContainerFactory<C extends AbstractP

private final PulsarContainerProperties containerProperties;

private final ObservationRegistry observationRegistry;

private Boolean autoStartup;

private Integer phase;
Expand All @@ -66,20 +61,15 @@ public abstract class AbstractPulsarListenerContainerFactory<C extends AbstractP
private ApplicationContext applicationContext;

protected AbstractPulsarListenerContainerFactory(PulsarConsumerFactory<? super T> consumerFactory,
PulsarContainerProperties containerProperties, @Nullable ObservationRegistry observationRegistry) {
PulsarContainerProperties containerProperties) {
this.consumerFactory = consumerFactory;
this.containerProperties = containerProperties;
this.observationRegistry = observationRegistry;
}

protected PulsarConsumerFactory<? super T> getConsumerFactory() {
return this.consumerFactory;
}

protected ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

public PulsarContainerProperties getContainerProperties() {
return this.containerProperties;
}
Expand Down Expand Up @@ -171,7 +161,7 @@ else if (this.autoStartup != null) {
instanceProperties.setMaxNumMessages(this.containerProperties.getMaxNumMessages());
instanceProperties.setMaxNumBytes(this.containerProperties.getMaxNumBytes());
instanceProperties.setBatchTimeoutMillis(this.containerProperties.getBatchTimeoutMillis());
instanceProperties.setObservationConvention(this.containerProperties.getObservationConvention());
instanceProperties.setObservationEnabled(this.containerProperties.isObservationEnabled());

JavaUtils.INSTANCE.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@
import java.util.Collection;
import java.util.HashSet;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.ObservationRegistry;

/**
* Concrete implementation for {@link PulsarListenerContainerFactory}.
*
Expand All @@ -43,8 +40,8 @@ public class ConcurrentPulsarListenerContainerFactory<T>
private Integer concurrency;

public ConcurrentPulsarListenerContainerFactory(PulsarConsumerFactory<? super T> consumerFactory,
PulsarContainerProperties containerProperties, @Nullable ObservationRegistry observationRegistry) {
super(consumerFactory, containerProperties, observationRegistry);
PulsarContainerProperties containerProperties) {
super(consumerFactory, containerProperties);
}

/**
Expand All @@ -55,6 +52,22 @@ public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}

@Override
public ConcurrentPulsarMessageListenerContainer<T> createContainer(String... topics) {
PulsarListenerEndpoint endpoint = new PulsarListenerEndpointAdapter() {

@Override
public Collection<String> getTopics() {
return Arrays.asList(topics);
}

};
ConcurrentPulsarMessageListenerContainer<T> container = createContainerInstance(endpoint);
initializeContainer(container, endpoint);
// customizeContainer(container);
return container;
}

@Override
protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(PulsarListenerEndpoint endpoint) {

Expand Down Expand Up @@ -84,8 +97,7 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu

properties.setSchemaType(endpoint.getSchemaType());

return new ConcurrentPulsarMessageListenerContainer<>(this.getConsumerFactory(), properties,
this.getObservationRegistry());
return new ConcurrentPulsarMessageListenerContainer<>(this.getConsumerFactory(), properties);
}

@Override
Expand All @@ -100,20 +112,4 @@ else if (this.concurrency != null) {
}
}

@Override
public ConcurrentPulsarMessageListenerContainer<T> createContainer(String... topics) {
PulsarListenerEndpoint endpoint = new PulsarListenerEndpointAdapter() {

@Override
public Collection<String> getTopics() {
return Arrays.asList(topics);
}

};
ConcurrentPulsarMessageListenerContainer<T> container = createContainerInstance(endpoint);
initializeContainer(container, endpoint);
// customizeContainer(container);
return container;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention;
Expand All @@ -51,7 +54,8 @@
* @author Alexander Preuß
* @author Christophe Bornet
*/
public class PulsarTemplate<T> implements PulsarOperations<T>, BeanNameAware {
public class PulsarTemplate<T>
implements PulsarOperations<T>, ApplicationContextAware, BeanNameAware, SmartInitializingSingleton {

private final LogAccessor logger = new LogAccessor(this.getClass());

Expand All @@ -63,11 +67,25 @@ public class PulsarTemplate<T> implements PulsarOperations<T>, BeanNameAware {

private final TopicResolver topicResolver;

/**
* Whether to record observations.
*/
private boolean observationEnabled;

/**
* The registry to record observations with.
*/
@Nullable
private final ObservationRegistry observationRegistry;
private ObservationRegistry observationRegistry;

/**
* The optional custom observation convention to use when recording observations.
*/
@Nullable
private final PulsarTemplateObservationConvention observationConvention;
private PulsarTemplateObservationConvention observationConvention;

@Nullable
private ApplicationContext applicationContext;

private String beanName = "";

Expand All @@ -82,12 +100,12 @@ public PulsarTemplate(PulsarProducerFactory<T> producerFactory) {

/**
* Construct a template instance with interceptors that uses the default schema
* resolver and default topic resolver.
* resolver and default topic resolver and enables observation recording.
* @param producerFactory the factory used to create the backing Pulsar producers.
* @param interceptors the interceptors to add to the producer.
*/
public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInterceptor> interceptors) {
this(producerFactory, interceptors, new DefaultSchemaResolver(), new DefaultTopicResolver(), null, null);
this(producerFactory, interceptors, new DefaultSchemaResolver(), new DefaultTopicResolver(), true);
}

/**
Expand All @@ -96,21 +114,40 @@ public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInt
* @param interceptors the list of interceptors to add to the producer
* @param schemaResolver the schema resolver to use
* @param topicResolver the topic resolver to use
* @param observationRegistry the registry to record observations with or {@code null}
* to not record observations
* @param observationConvention the optional custom observation convention to use when
* recording observations
* @param observationEnabled whether to record observations
*/
public PulsarTemplate(PulsarProducerFactory<T> producerFactory, List<ProducerInterceptor> interceptors,
SchemaResolver schemaResolver, TopicResolver topicResolver,
@Nullable ObservationRegistry observationRegistry,
@Nullable PulsarTemplateObservationConvention observationConvention) {
SchemaResolver schemaResolver, TopicResolver topicResolver, boolean observationEnabled) {
this.producerFactory = producerFactory;
this.interceptors = interceptors;
this.schemaResolver = schemaResolver;
this.topicResolver = topicResolver;
this.observationRegistry = observationRegistry;
this.observationConvention = observationConvention;
this.observationEnabled = observationEnabled;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

/**
* If observations are enabled, attempt to obtain the Observation registry and
* convention.
*/
@Override
public void afterSingletonsInstantiated() {
if (!this.observationEnabled) {
this.logger.debug(() -> "Observations are not enabled - not recording");
return;
}
if (this.applicationContext == null) {
this.logger.warn(() -> "Observations enabled but application context null - not recording");
return;
}
this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
.getIfUnique(() -> this.observationRegistry);
this.observationConvention = this.applicationContext.getBeanProvider(PulsarTemplateObservationConvention.class)
.getIfUnique(() -> this.observationConvention);
}

@Override
Expand Down
Loading

0 comments on commit 9e00c2b

Please sign in to comment.