Skip to content

Commit

Permalink
Option to preserve publish order
Browse files Browse the repository at this point in the history
Issue: SPR-13989
  • Loading branch information
rstoyanchev committed Jul 24, 2018
1 parent 430250c commit 7500b14
Show file tree
Hide file tree
Showing 17 changed files with 496 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public abstract class AbstractBrokerMessageHandler

private final Collection<String> destinationPrefixes;

private boolean preservePublishOrder = false;

@Nullable
private ApplicationEventPublisher eventPublisher;

Expand Down Expand Up @@ -132,6 +134,31 @@ public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher pub
this.eventPublisher = publisher;
}

/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
* not be processed in the same order because the channel is backed by a
* ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed
* since there is some performance overhead to keep messages in order.
* @param preservePublishOrder whether to publish in order
* @since 5.1
*/
public void setPreservePublishOrder(boolean preservePublishOrder) {
OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder);
this.preservePublishOrder = preservePublishOrder;
}

/**
* Whether to ensure messages are received in the order of publication.
* @since 5.1
*/
public boolean isPreservePublishOrder() {
return this.preservePublishOrder;
}

@Nullable
public ApplicationEventPublisher getApplicationEventPublisher() {
return this.eventPublisher;
Expand Down Expand Up @@ -269,6 +296,16 @@ protected void publishBrokerUnavailableEvent() {
}
}

/**
* Get the MessageChannel to use for sending messages to clients, possibly
* a per-session wrapper when {@code preservePublishOrder=true}.
* @since 5.1
*/
protected MessageChannel getClientOutboundChannelForSession(String sessionId) {
return this.preservePublishOrder ?
new OrderedMessageSender(getClientOutboundChannel(), logger) : getClientOutboundChannel();
}


/**
* Detect unsent DISCONNECT messages and process them anyway.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.broker;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;

/**
* Submit messages to an ExecutorSubscribableChannel, one at a time. The channel
* must have been configured with {@link #configureOutboundChannel}.
*
* @author Rossen Stoyanchev
* @since 5.1
*/
class OrderedMessageSender implements MessageChannel {

static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask";


private final MessageChannel channel;

private final Log logger;

private final Queue<Message<?>> messages = new ConcurrentLinkedQueue<>();

private final AtomicBoolean sendInProgress = new AtomicBoolean(false);


public OrderedMessageSender(MessageChannel channel, Log logger) {
this.channel = channel;
this.logger = logger;
}


public boolean send(Message<?> message) {
return send(message, -1);
}

@Override
public boolean send(Message<?> message, long timeout) {
this.messages.add(message);
trySend();
return true;
}

private void trySend() {

// Take sendInProgress flag only if queue is not empty
if (this.messages.isEmpty()) {
return;
}

if (this.sendInProgress.compareAndSet(false, true)) {
sendNextMessage();
}
}

private void sendNextMessage() {
for (;;) {
Message<?> message = this.messages.poll();
if (message != null) {
try {
addCompletionCallback(message);
if (this.channel.send(message)) {
return;
}
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
}
}
}
else {
// We ran out of messages..
this.sendInProgress.set(false);
trySend();
break;
}
}
}

private void addCompletionCallback(Message<?> msg) {
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(msg, SimpMessageHeaderAccessor.class);
Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor");
accessor.setHeader(COMPLETION_TASK_HEADER, (Runnable) this::sendNextMessage);
}


/**
* Install or remove an {@link ExecutorChannelInterceptor} that invokes a
* completion task once the message is handled.
* @param channel the channel to configure
* @param preservePublishOrder whether preserve order is on or off based on
* which an interceptor is either added or removed.
*/
static void configureOutboundChannel(MessageChannel channel, boolean preservePublishOrder) {
if (preservePublishOrder) {
Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel,
"An ExecutorSubscribableChannel is required for `preservePublishOrder`");
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CallbackInterceptor)) {
execChannel.addInterceptor(0, new CallbackInterceptor());
}
}
else if (channel instanceof ExecutorSubscribableChannel) {
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel;
execChannel.getInterceptors().stream().filter(i -> i instanceof CallbackInterceptor)
.findFirst()
.map(execChannel::removeInterceptor);

}
}


private static class CallbackInterceptor implements ExecutorChannelInterceptor {

@Override
public void afterMessageHandled(Message<?> msg, MessageChannel ch, MessageHandler handler, Exception ex) {
Runnable task = (Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER);
if (task != null) {
task.run();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,19 @@ protected void handleMessageInternal(Message<?> message) {
else if (SimpMessageType.CONNECT.equals(messageType)) {
logMessage(message);
if (sessionId != null) {
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] serverHeartbeat = getHeartbeatValue();
long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] heartbeatOut = getHeartbeatValue();
Principal user = SimpMessageHeaderAccessor.getUser(headers);
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
MessageChannel outChannel = getClientOutboundChannelForSession(sessionId);
this.sessions.put(sessionId, new SessionInfo(sessionId, user, outChannel, heartbeatIn, heartbeatOut));
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(connectAck);
connectAck.setSessionId(sessionId);
if (user != null) {
connectAck.setUser(user);
}
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatOut);
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
getClientOutboundChannel().send(messageOut);
}
Expand Down Expand Up @@ -391,19 +392,20 @@ protected void sendMessageToSubscribers(@Nullable String destination, Message<?>
headerAccessor.setSessionId(sessionId);
headerAccessor.setSubscriptionId(subscriptionId);
headerAccessor.copyHeadersIfAbsent(message.getHeaders());
headerAccessor.setLeaveMutable(true);
Object payload = message.getPayload();
Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
try {
getClientOutboundChannel().send(reply);
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
try {
info.getClientOutboundChannel().send(reply);
}
}
finally {
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
}
}
finally {
info.setLastWriteTime(now);
}
}
Expand All @@ -427,6 +429,8 @@ private static class SessionInfo {
@Nullable
private final Principal user;

private final MessageChannel clientOutboundChannel;

private final long readInterval;

private final long writeInterval;
Expand All @@ -435,11 +439,13 @@ private static class SessionInfo {

private volatile long lastWriteTime;

public SessionInfo(String sessionId, @Nullable Principal user,

public SessionInfo(String sessionId, @Nullable Principal user, MessageChannel outboundChannel,
@Nullable long[] clientHeartbeat, @Nullable long[] serverHeartbeat) {

this.sessionId = sessionId;
this.user = user;
this.clientOutboundChannel = outboundChannel;
if (clientHeartbeat != null && serverHeartbeat != null) {
this.readInterval = (clientHeartbeat[0] > 0 && serverHeartbeat[1] > 0 ?
Math.max(clientHeartbeat[0], serverHeartbeat[1]) * HEARTBEAT_MULTIPLIER : 0);
Expand All @@ -462,6 +468,10 @@ public Principal getUser() {
return this.user;
}

public MessageChannel getClientOutboundChannel() {
return this.clientOutboundChannel;
}

public long getReadInterval() {
return this.readInterval;
}
Expand Down Expand Up @@ -505,8 +515,9 @@ public void run() {
accessor.setUser(user);
}
initHeaders(accessor);
accessor.setLeaveMutable(true);
MessageHeaders headers = accessor.getMessageHeaders();
getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class MessageBrokerRegistry {
@Nullable
private String userDestinationPrefix;

private boolean preservePublishOrder;

@Nullable
private PathMatcher pathMatcher;

Expand Down Expand Up @@ -160,6 +162,30 @@ protected String getUserDestinationPrefix() {
return this.userDestinationPrefix;
}

/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
* not be processed in the same order because the channel is backed by a
* ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed
* since there is some performance overhead to keep messages in order.
* @param preservePublishOrder whether to publish in order
* @since 5.1
*/
public void setPreservePublishOrder(boolean preservePublishOrder) {
this.preservePublishOrder = preservePublishOrder;
}

/**
* Whether to ensure messages are received in the order of publication.
* @since 5.1
*/
protected boolean isPreservePublishOrder() {
return this.preservePublishOrder;
}

/**
* Configure the PathMatcher to use to match the destinations of incoming
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
Expand Down Expand Up @@ -209,6 +235,7 @@ protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerC
SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
handler.setPathMatcher(this.pathMatcher);
handler.setCacheLimit(this.cacheLimit);
handler.setPreservePublishOrder(this.preservePublishOrder);
return handler;
}
return null;
Expand All @@ -217,7 +244,9 @@ protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerC
@Nullable
protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) {
if (this.brokerRelayRegistration != null) {
return this.brokerRelayRegistration.getMessageHandler(brokerChannel);
StompBrokerRelayMessageHandler relay = this.brokerRelayRegistration.getMessageHandler(brokerChannel);
relay.setPreservePublishOrder(this.preservePublishOrder);
return relay;
}
return null;
}
Expand Down
Loading

0 comments on commit 7500b14

Please sign in to comment.