Skip to content

Commit

Permalink
Add more settings to Consumer in PulsarProperties (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Nov 19, 2022
1 parent 9baa080 commit f577e6e
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down Expand Up @@ -132,6 +135,16 @@ public static class Consumer {
*/
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

/**
* Map of properties to add to the subscription.
*/
private Map<String, String> subscriptionProperties = new HashMap<>();

/**
* Subscription mode to be used when subscribing to the topic.
*/
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

/**
* Number of messages that can be accumulated before the consumer calls "receive".
*/
Expand Down Expand Up @@ -205,17 +218,60 @@ public static class Consumer {
*/
private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;

/**
* Dead letter policy to use.
*/
@Nullable
private DeadLetterPolicy deadLetterPolicy;

/**
* Whether to auto retry messages.
*/
private Boolean retryEnable = false;

/**
* Whether the consumer auto-subscribes for partition increase. This is only for
* partitioned consumers.
*/
private Boolean autoUpdatePartitions = true;

/**
* Interval of partitions discovery updates.
*/
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);

/**
* Whether to replicate subscription state.
*/
private Boolean replicateSubscriptionState = false;

/**
* Whether to include the given position of any reset operation like
* {@link org.apache.pulsar.client.api.Consumer#seek(long) or
* {@link Consumer#seek(MessageId)}}.
*/
private Boolean resetIncludeHead = false;

/**
* Whether the batch index acknowledgment is enabled.
*/
private Boolean batchIndexAckEnabled = false;

/**
* Whether an acknowledgement receipt is enabled.
*/
private Boolean ackReceiptEnabled = false;

/**
* Whether pooling of messages and the underlying data buffers is enabled.
*/
private Boolean poolMessages = false;

/**
* Whether to start the consumer in a paused state.
*/
private Boolean startPaused = false;

/**
* Whether to automatically drop outstanding un-acked messages if the queue is
* full.
Expand Down Expand Up @@ -257,6 +313,22 @@ public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public Map<String, String> getSubscriptionProperties() {
return this.subscriptionProperties;
}

public void setSubscriptionProperties(Map<String, String> subscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;
}

public SubscriptionMode getSubscriptionMode() {
return this.subscriptionMode;
}

public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}

public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
Expand Down Expand Up @@ -345,7 +417,7 @@ public void setProperties(SortedMap<String, String> properties) {
this.properties = properties;
}

public Boolean isReadCompacted() {
public Boolean getReadCompacted() {
return this.readCompacted;
}

Expand Down Expand Up @@ -377,23 +449,88 @@ public void setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode
this.regexSubscriptionMode = regexSubscriptionMode;
}

public Boolean isAutoUpdatePartitions() {
@Nullable
public DeadLetterPolicy getDeadLetterPolicy() {
return this.deadLetterPolicy;
}

public void setDeadLetterPolicy(@Nullable DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

public Boolean getRetryEnable() {
return this.retryEnable;
}

public void setRetryEnable(Boolean retryEnable) {
this.retryEnable = retryEnable;
}

public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}

public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}

public Boolean isReplicateSubscriptionState() {
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}

public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
}

public Boolean getReplicateSubscriptionState() {
return this.replicateSubscriptionState;
}

public void setReplicateSubscriptionState(Boolean replicateSubscriptionState) {
this.replicateSubscriptionState = replicateSubscriptionState;
}

public Boolean isAutoAckOldestChunkedMessageOnQueueFull() {
public Boolean getResetIncludeHead() {
return this.resetIncludeHead;
}

public void setResetIncludeHead(Boolean resetIncludeHead) {
this.resetIncludeHead = resetIncludeHead;
}

public Boolean getBatchIndexAckEnabled() {
return this.batchIndexAckEnabled;
}

public void setBatchIndexAckEnabled(Boolean batchIndexAckEnabled) {
this.batchIndexAckEnabled = batchIndexAckEnabled;
}

public Boolean getAckReceiptEnabled() {
return this.ackReceiptEnabled;
}

public void setAckReceiptEnabled(Boolean ackReceiptEnabled) {
this.ackReceiptEnabled = ackReceiptEnabled;
}

public Boolean getPoolMessages() {
return this.poolMessages;
}

public void setPoolMessages(Boolean poolMessages) {
this.poolMessages = poolMessages;
}

public Boolean getStartPaused() {
return this.startPaused;
}

public void setStartPaused(Boolean startPaused) {
this.startPaused = startPaused;
}

public Boolean getAutoAckOldestChunkedMessageOnQueueFull() {
return this.autoAckOldestChunkedMessageOnQueueFull;
}

Expand Down Expand Up @@ -426,6 +563,8 @@ public Map<String, Object> buildProperties() {
map.from(this::getTopicsPattern).to(properties.in("topicsPattern"));
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));
map.from(this::getSubscriptionType).to(properties.in("subscriptionType"));
map.from(this::getSubscriptionProperties).to(properties.in("subscriptionProperties"));
map.from(this::getSubscriptionMode).to(properties.in("subscriptionMode"));
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
map.from(this::getAcknowledgementsGroupTime).as(it -> it.toNanos() / 1000)
.to(properties.in("acknowledgementsGroupTimeMicros"));
Expand All @@ -439,13 +578,22 @@ public Map<String, Object> buildProperties() {
map.from(this::getPriorityLevel).to(properties.in("priorityLevel"));
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(this::getProperties).to(properties.in("properties"));
map.from(this::isReadCompacted).to(properties.in("readCompacted"));
map.from(this::getReadCompacted).to(properties.in("readCompacted"));
map.from(this::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition"));
map.from(this::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod"));
map.from(this::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode"));
map.from(this::isAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::isReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(this::isAutoAckOldestChunkedMessageOnQueueFull)
map.from(this::getDeadLetterPolicy).to(properties.in("deadLetterPolicy"));
map.from(this::getRetryEnable).to(properties.in("retryEnable"));
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(this::getReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(this::getResetIncludeHead).to(properties.in("resetIncludeHead"));
map.from(this::getBatchIndexAckEnabled).to(properties.in("batchIndexAckEnabled"));
map.from(this::getAckReceiptEnabled).to(properties.in("ackReceiptEnabled"));
map.from(this::getPoolMessages).to(properties.in("poolMessages"));
map.from(this::getStartPaused).to(properties.in("startPaused"));
map.from(this::getAutoAckOldestChunkedMessageOnQueueFull)
.to(properties.in("autoAckOldestChunkedMessageOnQueueFull"));
map.from(this::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage"));
map.from(this::getExpireTimeOfIncompleteChunkedMessage).as(Duration::toMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatNoException;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -27,14 +28,17 @@

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -145,8 +149,8 @@ void producerProperties() {
Map<String, Object> producerProps = properties.buildProducerProperties();

// Verify that the props can be loaded in a ProducerBuilder
ConfigurationDataUtils.loadData(producerProps, new ProducerConfigurationData(),
ProducerConfigurationData.class);
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps,
new ProducerConfigurationData(), ProducerConfigurationData.class));

assertThat(producerProps).containsEntry("topicName", "my-topic")
.containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000)
Expand Down Expand Up @@ -181,6 +185,8 @@ void consumerProperties() {
props.put("spring.pulsar.consumer.topics-pattern", "my-pattern");
props.put("spring.pulsar.consumer.subscription-name", "my-subscription");
props.put("spring.pulsar.consumer.subscription-type", "Shared");
props.put("spring.pulsar.consumer.subscription-properties[my-sub-prop]", "my-sub-prop-value");
props.put("spring.pulsar.consumer.subscription-mode", "NonDurable");
props.put("spring.pulsar.consumer.receiver-queue-size", "1");
props.put("spring.pulsar.consumer.acknowledgements-group-time", "2s");
props.put("spring.pulsar.consumer.negative-ack-redelivery-delay", "3s");
Expand All @@ -195,22 +201,40 @@ void consumerProperties() {
props.put("spring.pulsar.consumer.subscription-initial-position", "Earliest");
props.put("spring.pulsar.consumer.pattern-auto-discovery-period", "9");
props.put("spring.pulsar.consumer.regex-subscription-mode", "AllTopics");
props.put("spring.pulsar.consumer.dead-letter-policy.max-redeliver-count", "4");
props.put("spring.pulsar.consumer.dead-letter-policy.retry-letter-topic", "my-retry-topic");
props.put("spring.pulsar.consumer.dead-letter-policy.dead-letter-topic", "my-dlt-topic");
props.put("spring.pulsar.consumer.dead-letter-policy.initial-subscription-name", "my-initial-subscription");
props.put("spring.pulsar.consumer.retry-enable", "true");
props.put("spring.pulsar.consumer.auto-update-partitions", "false");
props.put("spring.pulsar.consumer.auto-update-partitions-interval", "10s");
props.put("spring.pulsar.consumer.replicate-subscription-state", "true");
props.put("spring.pulsar.consumer.reset-include-head", "true");
props.put("spring.pulsar.consumer.batch-index-ack-enabled", "true");
props.put("spring.pulsar.consumer.ack-receipt-enabled", "true");
props.put("spring.pulsar.consumer.pool-messages", "true");
props.put("spring.pulsar.consumer.start-paused", "true");
props.put("spring.pulsar.consumer.auto-ack-oldest-chunked-message-on-queue-full", "false");
props.put("spring.pulsar.consumer.max-pending-chunked-message", "11");
props.put("spring.pulsar.consumer.expire-time-of-incomplete-chunked-message", "12s");

bind(props);
Map<String, Object> consumerProps = properties.buildConsumerProperties();

// Verify that the props can be loaded in a ConsumerBuilder
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps,
new ConsumerConfigurationData<>(), ConsumerConfigurationData.class));

assertThat(consumerProps)
.hasEntrySatisfying("topicNames",
n -> assertThat((Collection<String>) n).containsExactly("my-topic"))
.hasEntrySatisfying("topicsPattern", p -> assertThat(p.toString()).isEqualTo("my-pattern"))
.containsEntry("subscriptionName", "my-subscription")
.containsEntry("subscriptionType", SubscriptionType.Shared).containsEntry("receiverQueueSize", 1)
.containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L)
.containsEntry("subscriptionType", SubscriptionType.Shared)
.hasEntrySatisfying("subscriptionProperties",
p -> assertThat((Map<String, String>) p).containsEntry("my-sub-prop", "my-sub-prop-value"))
.containsEntry("subscriptionMode", SubscriptionMode.NonDurable)
.containsEntry("receiverQueueSize", 1).containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L)
.containsEntry("negativeAckRedeliveryDelayMicros", 3_000_000L)
.containsEntry("maxTotalReceiverQueueSizeAcrossPartitions", 5)
.containsEntry("consumerName", "my-consumer").containsEntry("ackTimeoutMillis", 6_000L)
Expand All @@ -222,7 +246,17 @@ void consumerProperties() {
.containsEntry("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest)
.containsEntry("patternAutoDiscoveryPeriod", 9)
.containsEntry("regexSubscriptionMode", RegexSubscriptionMode.AllTopics)
.containsEntry("autoUpdatePartitions", false).containsEntry("replicateSubscriptionState", true)
.hasEntrySatisfying("deadLetterPolicy", dlp -> {
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) dlp;
assertThat(deadLetterPolicy.getMaxRedeliverCount()).isEqualTo(4);
assertThat(deadLetterPolicy.getRetryLetterTopic()).isEqualTo("my-retry-topic");
assertThat(deadLetterPolicy.getDeadLetterTopic()).isEqualTo("my-dlt-topic");
assertThat(deadLetterPolicy.getInitialSubscriptionName()).isEqualTo("my-initial-subscription");
}).containsEntry("retryEnable", true).containsEntry("autoUpdatePartitions", false)
.containsEntry("autoUpdatePartitionsIntervalSeconds", 10L)
.containsEntry("replicateSubscriptionState", true).containsEntry("resetIncludeHead", true)
.containsEntry("batchIndexAckEnabled", true).containsEntry("ackReceiptEnabled", true)
.containsEntry("poolMessages", true).containsEntry("startPaused", true)
.containsEntry("autoAckOldestChunkedMessageOnQueueFull", false)
.containsEntry("maxPendingChunkedMessage", 11)
.containsEntry("expireTimeOfIncompleteChunkedMessageMillis", 12_000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static <T> void loadConf(ConsumerBuilder<T> builder, Map<String, Object>
propertiesCopy.remove("cryptoKeyReader");
propertiesCopy.remove("messageCrypto");
propertiesCopy.remove("batchReceivePolicy");
propertiesCopy.remove("keySharedPolicy");
propertiesCopy.remove("payloadProcessor");

builder.loadConf(propertiesCopy);
Expand All @@ -79,6 +80,8 @@ public static <T> void loadConf(ConsumerBuilder<T> builder, Map<String, Object>
"messageCrypto");
applyValueToBuilderAfterLoadConf(builderConf::getBatchReceivePolicy, builder::batchReceivePolicy, properties,
"batchReceivePolicy");
applyValueToBuilderAfterLoadConf(builderConf::getKeySharedPolicy, builder::keySharedPolicy, properties,
"keySharedPolicy");
applyValueToBuilderAfterLoadConf(builderConf::getPayloadProcessor, builder::messagePayloadProcessor, properties,
"payloadProcessor");
}
Expand Down

0 comments on commit f577e6e

Please sign in to comment.