Skip to content

Commit

Permalink
Add topic resolver (spring-projects#293)
Browse files Browse the repository at this point in the history
* Add topic resolver

- Topic resolver with configurable
message type to topic name mappings

See spring-projects#269

* Sample app uses topic and schema resolver

* Add topic resolver docs
  • Loading branch information
onobc authored Jan 30, 2023
1 parent 82e055a commit 7294580
Show file tree
Hide file tree
Showing 36 changed files with 1,089 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void documentConfigurationProperties() throws IOException {
c.accept("spring.pulsar.consumer");
c.accept("spring.pulsar.listener");
});
snippets.add("application-properties.pulsar-defaults", "Pulsar Defaults Properties", (c) -> c.accept("spring.pulsar.defaults"));
snippets.add("application-properties.pulsar-function", "Pulsar Function Properties", (c) -> c.accept("spring.pulsar.function"));
snippets.add("application-properties.pulsar-administration", "Pulsar Administration Properties", (c) -> c.accept("spring.pulsar.administration"));
snippets.add("application-properties.pulsar-reactive-sender", "Pulsar Reactive Sender Properties", (c) -> c.accept("spring.pulsar.reactive.sender"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ include::application-properties/pulsar-producer.adoc[]

include::application-properties/pulsar-consumer.adoc[]

include::application-properties/pulsar-defaults.adoc[]

include::application-properties/pulsar-function.adoc[]

include::application-properties/pulsar-administration.adoc[]
Expand Down
2 changes: 2 additions & 0 deletions spring-pulsar-docs/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ In addition to this reference documentation, we recommend a number of other reso

include::application-properties.adoc[leveloffset=+2]

include::topic-resolution.adoc[leveloffset=+2]

include::non-ga-versions.adoc[leveloffset=+2]

include::native-image.adoc[leveloffset=+2]
16 changes: 11 additions & 5 deletions spring-pulsar-docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ They return the `MessageId` of the message that was published once the message i
The `sendAsync` method calls are asynchronous calls that are non-blocking.
They return a `CompletableFuture`, which you can use to asynchronously receive the message ID once the messages are published.

==== Simple API
The template provides a handful of methods ({javadocs}/org/springframework/pulsar/core/PulsarOperations.html[prefixed with _'send'_]) for simple send requests that contain only a message or a destination topic. For more complicated send requests, a fluent API lets you configure more options.
NOTE: For the API variants that do not include a topic parameter, a <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> is used to determine the destination topic.

TIP: Both `send` and `sendAsync` methods have a variety that allows publishing with only the message.
When you do that, the application must provide the topic name by setting the property `spring.pulsar.producer.topic-name`.
==== Simple API
The template provides a handful of methods ({javadocs}/org/springframework/pulsar/core/PulsarOperations.html[prefixed with _'send'_]) for simple send requests. For more complicated send requests, a fluent API lets you configure more options.

==== Fluent API
The template provides a {javadocs}/org/springframework/pulsar/core/PulsarOperations.html#newMessage(T)[fluent builder] to handle more complicated send requests.
Expand Down Expand Up @@ -94,6 +93,8 @@ include::schema-info/schema-info-template.adoc[leveloffset=+1]
The `PulsarTemplate` relies on a `PulsarProducerFactory` to actually create the underlying producer. Spring Boot auto-configuration also provides this producer factory. Additionally, you can configure the factory by specifying any of the available producer-centric application properties.
See the <<application-properties.adoc#appendix.application-properties.pulsar-producer,Appendix>>.

NOTE: If topic information is not specified when using the producer factory APIs directly, the same <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> used by the `PulsarTemplate` is used with the one exception that the "Message type default" step is **omitted**.

[[producer-caching]]
==== Pulsar Producer Caching
Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the producer factory caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period. The link:{github}/blob/8e33ac0b122bc0e75df299919c956cacabcc9809/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java#L159[cache key] is composed of just enough information to ensure that callers are returned the same producer on subsequent creation requests.
Expand Down Expand Up @@ -169,10 +170,13 @@ In this most basic form, you must provide the following two properties with thei
[source,yaml,indent=0,subs="verbatim"]
----
spring.pulsar.consumer:
topic-names: hello-pulsar
topics: hello-pulsar
subscription-name: hello-pulsar-subscription
----

NOTE: If the topic information is not directly provided, a <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> is used to determine the destination topic.


In the `PulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
The framework detects that you expect the `String` type and then infers the schema type based on that information.
Expand Down Expand Up @@ -410,6 +414,8 @@ return pulsarListenerContainer;
----
====

NOTE: If topic information is not specified when using the listener containers directly, the same <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> used by the `PulsarListener` is used with the one exception that the "Message type default" step is **omitted**.

`DefaultPulsarMessageListenerContainer` creates only a single consumer.
If you want to have multiple consumers managed through multiple threads, you need to use `ConcurrentPulsarMessageListenerContainer`.

Expand Down
11 changes: 9 additions & 2 deletions spring-pulsar-docs/src/main/asciidoc/reactive-pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ On the Pulsar producer side, Spring Boot auto-configuration provides a `Reactive
The template provides send methods that accept a single message and return a `Mono<MessageId>`.
It also provides send methods that accept multiple messages (in the form of the ReactiveStreams `Publisher` type) and return a `Flux<MessageId>`.

TIP: The send methods that do not have a topic input parameter require the topic name to be provided via the property `spring.pulsar.reactive.sender.topic-name`.
NOTE: For the API variants that do not include a topic parameter, a <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> is used to determine the destination topic.

==== Fluent API
The template provides a {javadocs}/org/springframework/pulsar/reactive/core/ReactivePulsarOperations.html#newMessage(T)[fluent builder] to handle more complicated send requests.
Expand Down Expand Up @@ -110,6 +110,8 @@ The `ReactivePulsarTemplate` relies on a `ReactivePulsarSenderFactory` to actual

Spring Boot provides this sender factory which can be configured with any of the <<application-properties.adoc#appendix.application-properties.pulsar-reactive-sender,`spring.pulsar.reactive.sender`>> prefixed application properties.

NOTE: If topic information is not specified when using the sender factory APIs directly, the same <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> used by the `ReactivePulsarTemplate` is used with the one exception that the "Message type default" step is **omitted**.

==== Producer Caching
Each underlying Pulsar producer consumes resources.
To improve performance and avoid continual creation of producers, the `ReactiveMessageSenderCache` in the underlying Apache Pulsar Reactive client caches the producers that it creates.
Expand Down Expand Up @@ -162,7 +164,10 @@ spring.pulsar.reactive.consumer:
topic-names: hello-pulsar-topic
----

NOTE: If `subscription-name` is not provided an auto-generated subscription name will be used.
When `subscription-name` is not provided an auto-generated subscription name will be used.

NOTE: If the topic information is not directly provided, a <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> is used to determine the destination topic.


In the `ReactivePulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
Expand Down Expand Up @@ -320,6 +325,8 @@ The "listener" aspect is provided by the `ReactivePulsarMessageHandler` of which
* `ReactivePulsarOneByOneMessageHandler` - handles a single message one-by-one
* `ReactivePulsarStreamingHandler` - handles multiple messages via a `Flux`

NOTE: If topic information is not specified when using the listener containers directly, the same <<topic-resolution.adoc#appendix.topic-resolution,topic resolution process>> used by the `ReactivePulsarListener` is used with the one exception that the "Message type default" step is **omitted**.

[[reactive-concurrency]]
=== Concurrency
When consuming records in streaming mode (`stream = true`) concurrency comes naturally via the underlying Reactive support in the client implementation.
Expand Down
57 changes: 57 additions & 0 deletions spring-pulsar-docs/src/main/asciidoc/topic-resolution.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
[appendix]
[[appendix.topic-resolution]]
= Topic Resolution

A destination topic is needed when producing or consuming messages.
The framework looks in the following ordered locations to determine a topic (stopping at the first find):

* User specified
* Message type default
* Global default

When a topic is found via one of the default mechanisms, there is no need to specify the topic on the produce or consume API.

When a topic is not found, the API will throw an exception accordingly.

== User specified
A topic passed into the API being used has the highest precedence (eg. `PulsarTemplate.send("my-topic", myMessage)` or `@PulsarListener(topics = "my-topic"`).

== Message type default
When no topic is passed into the API, the system looks for a message type to topic mapping configured for the type of the message being produced or consumed.

Mappings can be configured with the `spring.pulsar.defaults.type-mappings` property.
The following example uses `application.yml` to configure default topics to use when consuming or producing `Foo` or `Bar` messages:

[source,yaml,indent=0,subs="verbatim"]
----
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
----

NOTE: The `message-type` is the fully-qualified name of the message class.

=== Custom topic resolver
The preferred method of adding mappings is via the property mentioned above.
However, if more control is needed you can replace the default resolver by proving your own implementation, for example:

[source,java,indent=0,subs="verbatim"]
----
@Bean
public MyTopicResolver topicResolver() {
return new MyTopicResolver();
}
----

== Producer global default
The final location consulted (when producing) is the system-wide producer default topic.
It is configured via the `spring.pulsar.producer.topic-name` property when using the imperative API and the `spring.pulsar.reactive.sender.topic-name` property when using the reactive API.

== Consumer global default
The final location consulted (when consuming) is the system-wide consumer default topic.
It is configured via the `spring.pulsar.consumer.topics` or `spring.pulsar.consumer.topics-pattern` property when using the imperative API and one of the `spring.pulsar.reactive.consumer.topics` or `spring.pulsar.reactive.consumer.topics-pattern` property when using the reactive API.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(

ReactivePulsarContainerProperties<T> properties = new ReactivePulsarContainerProperties<>();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());

if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
properties.setTopics(endpoint.getTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
Expand All @@ -31,14 +31,14 @@

import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.log.LogAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.Acknowledgement;
import org.springframework.pulsar.listener.adapter.HandlerAdapter;
import org.springframework.pulsar.listener.adapter.PulsarMessagingMessageListenerAdapter;
Expand All @@ -51,6 +51,7 @@
import org.springframework.pulsar.support.MessageConverter;
import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

import reactor.core.publisher.Flux;

Expand All @@ -64,8 +65,6 @@
*/
public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePulsarListenerEndpoint<V> {

private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));

private Object bean;

private Method method;
Expand Down Expand Up @@ -149,6 +148,15 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageHandler(
pulsarContainerProperties.setSchemaType(type);
}

// If no topic info is set on endpoint attempt to resolve via message type
TopicResolver topicResolver = pulsarContainerProperties.getTopicResolver();
boolean hasTopicInfo = pulsarContainerProperties.getTopicsPattern() != null
|| !ObjectUtils.isEmpty(pulsarContainerProperties.getTopics());
if (!hasTopicInfo) {
topicResolver.resolveTopic(null, messageType.getRawClass(), () -> null)
.ifPresent((topic) -> pulsarContainerProperties.setTopics(Collections.singleton(topic)));
}

ReactiveMessageConsumerBuilderCustomizer<V> customizer1 = b -> b.deadLetterPolicy(this.deadLetterPolicy);
container.setConsumerCustomizer(b -> {
if (this.consumerCustomizer != null) {
Expand Down Expand Up @@ -191,10 +199,10 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageListenerInstance

PulsarMessagingMessageListenerAdapter<V> listener;
if (isFluxListener()) {
listener = new PulsarReactiveStreamingMessagingMessageListenerAdapter<V>(this.bean, this.method);
listener = new PulsarReactiveStreamingMessagingMessageListenerAdapter<>(this.bean, this.method);
}
else {
listener = new PulsarReactiveOneByOneMessagingMessageListenerAdapter<V>(this.bean, this.method);
listener = new PulsarReactiveOneByOneMessagingMessageListenerAdapter<>(this.bean, this.method);
}

if (messageConverter instanceof PulsarRecordMessageConverter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ public interface ReactivePulsarOperations<T> {
*/
SendMessageBuilder<T> newMessages(Publisher<T> messages);

/**
* Create a {@link SendMessageBuilder builder} for configuring and sending multiple
* messages reactively.
* @param messages the messages to send
* @param messageType the type of messages being sent - helpful for topic resolution
* when schema is not specified during a {@code sendMany} operation
* @return the builder to configure and send the message
*/
SendMessageBuilder<T> newMessages(Publisher<T> messages, Class<T> messageType);

/**
* Builder that can be used to configure and send a message. Provides more options
* than the send methods provided by {@link ReactivePulsarOperations}.
Expand Down
Loading

0 comments on commit 7294580

Please sign in to comment.