Skip to content

Commit

Permalink
Switch to reactor snapshots and make use of API change
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Oct 23, 2013
1 parent 6b0a625 commit f3ca3c1
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 98 deletions.
9 changes: 7 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ project("spring-messaging") {
optional(project(":spring-websocket"))
optional(project(":spring-webmvc"))
optional("com.fasterxml.jackson.core:jackson-databind:2.2.0")
optional("org.projectreactor:reactor-core:1.0.0.RC1")
optional("org.projectreactor:reactor-tcp:1.0.0.RC1")
optional("org.projectreactor:reactor-core:1.0.0.BUILD-SNAPSHOT")
optional("org.projectreactor:reactor-tcp:1.0.0.BUILD-SNAPSHOT")
optional("org.eclipse.jetty.websocket:websocket-server:9.0.5.v20130815")
optional("org.eclipse.jetty.websocket:websocket-client:9.0.5.v20130815")
testCompile(project(":spring-test"))
Expand All @@ -406,6 +406,11 @@ project("spring-messaging") {
testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:8.0.0-RC3")
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
}

repositories {
maven { url "http://repo.spring.io/snapshot" } // for reactor (until RC2)
}

}

project("spring-tx") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder;
Expand Down Expand Up @@ -67,10 +68,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler

private static final byte[] EMPTY_PAYLOAD = new byte[0];

private static final Message<byte[]> HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).build();
private static final Message<byte[]> HEARTBEAT_MESSAGE;

private static final long HEARTBEAT_MULTIPLIER = 3;

static {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build();
}


private final MessageChannel messageChannel;

Expand Down Expand Up @@ -464,11 +470,11 @@ public void run() {
TcpConnection<byte[]> conn = tcpConnection;
if (conn != null) {
conn.send(HEARTBEAT_MESSAGE).addCallback(
new ListenableFutureCallback<Boolean>() {
new ListenableFutureCallback<Void>() {
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send heartbeat", null);
handleTcpConnectionFailure("Failed to send heartbeat", t);
}
public void onSuccess(Boolean result) {}
public void onSuccess(Void result) {}
});
}
}
Expand All @@ -492,16 +498,16 @@ public void afterConnectionClosed() {
sendStompErrorToClient("Connection to broker closed");
}

public ListenableFuture<Boolean> forward(final Message<?> message) {
public ListenableFuture<Void> forward(final Message<?> message) {

if (!this.isStompConnected) {
if (logger.isWarnEnabled()) {
logger.warn("Connection to broker inactive or not ready, ignoring message=" + message);
}
return new ListenableFutureTask<Boolean>(new Callable<Boolean>() {
return new ListenableFutureTask<Void>(new Callable<Void>() {
@Override
public Boolean call() throws Exception {
return Boolean.FALSE;
public Void call() throws Exception {
return null;
}
});
}
Expand All @@ -511,11 +517,11 @@ public Boolean call() throws Exception {
}

@SuppressWarnings("unchecked")
ListenableFuture<Boolean> future = this.tcpConnection.send((Message<byte[]>) message);
ListenableFuture<Void> future = this.tcpConnection.send((Message<byte[]>) message);

future.addCallback(new ListenableFutureCallback<Boolean>() {
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Boolean result) {
public void onSuccess(Void result) {
StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
if (command == StompCommand.DISCONNECT) {
resetTcpConnection();
Expand Down Expand Up @@ -574,12 +580,10 @@ public void afterConnectionClosed() {
}

@Override
public ListenableFuture<Boolean> forward(Message<?> message) {
public ListenableFuture<Void> forward(Message<?> message) {
try {
ListenableFuture<Boolean> future = super.forward(message);
if (!future.get()) {
throw new MessageDeliveryException(message);
}
ListenableFuture<Void> future = super.forward(message);
future.get();
return future;
}
catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
* @author Rossen Stoyanchev
* @since 4.0
*/
abstract class PromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {

private final Promise<S> promise;

private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<T>();


protected PromiseToListenableFutureAdapter(Promise<S> promise) {
protected AbstractPromiseToListenableFutureAdapter(Promise<S> promise) {

Assert.notNull(promise, "promise is required");
this.promise = promise;
Expand All @@ -70,11 +70,11 @@ public void accept(Throwable t) {
});
}

protected abstract T adapt(S adapteeResult);
protected abstract T adapt(S result);

@Override
public T get() {
S result = this.promise.get();
public T get() throws InterruptedException {
S result = this.promise.await();
return adapt(result);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.messaging.support.tcp;

import reactor.core.composable.Promise;


/**
* A Promise-to-ListenableFutureAdapter where the source and the target from the Promise and
* the ListenableFuture respectively are of the same type.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
class PassThroughPromiseToListenableFutureAdapter<T> extends AbstractPromiseToListenableFutureAdapter<T, T> {


public PassThroughPromiseToListenableFutureAdapter(Promise<T> promise) {
super(promise);
}

@Override
protected T adapt(T result) {
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void accept(Message<P> message) {
public ListenableFuture<Void> shutdown() {
try {
Promise<Void> promise = this.tcpClient.close();
return new PromiseToListenableFutureAdapter<Void, Void>(promise) {
return new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise) {
@Override
protected Void adapt(Void result) {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ public ReactorTcpConnection(reactor.tcp.TcpConnection<Message<P>, Message<P>> co
}

@Override
public ListenableFuture<Boolean> send(Message<P> message) {
ConsumerListenableFuture future = new ConsumerListenableFuture();
this.reactorTcpConnection.send(message, future);
return future;
public ListenableFuture<Void> send(Message<P> message) {
Promise<Void> promise = this.reactorTcpConnection.send(message);
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}

@Override
Expand All @@ -62,73 +61,4 @@ public void close() {
this.reactorTcpConnection.close();
}


// Use this temporarily until reactor provides a send method returning a Promise


private static class ConsumerListenableFuture implements ListenableFuture<Boolean>, Consumer<Boolean> {

final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();

private final ListenableFutureCallbackRegistry<Boolean> registry =
new ListenableFutureCallbackRegistry<Boolean>();

@Override
public void accept(Boolean result) {

this.deferred.accept(result);

if (result == null) {
this.registry.failure(new TimeoutException());
}
else if (result) {
this.registry.success(result);
}
else {
this.registry.failure(new Exception("Failed send message"));
}
}

@Override
public Boolean get() {
try {
return this.deferred.compose().await();
}
catch (InterruptedException e) {
return Boolean.FALSE;
}
}

@Override
public Boolean get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {

Boolean result = this.deferred.compose().await(timeout, unit);
if (result == null) {
throw new TimeoutException();
}
return result;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return this.deferred.compose().isComplete();
}

@Override
public void addCallback(ListenableFutureCallback<? super Boolean> callback) {
this.registry.addCallback(callback);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface TcpConnection<P> {
* @param message the message
* @return whether the send succeeded or not
*/
ListenableFuture<Boolean> send(Message<P> message);
ListenableFuture<Void> send(Message<P> message);

/**
* Register a task to invoke after a period of of read inactivity.
Expand Down

0 comments on commit f3ca3c1

Please sign in to comment.