Skip to content

Commit

Permalink
INT-3527: WebSocket and 'o-c-a' Improvements
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3527

* `WebSocketInboundChannelAdapter` now handles `CONNECT` STOMP message and sends `CONNECT_ACK` message to the `WebSocketSession` immediately
* `ExpressionMessageProducerSupport` implementations now checks the result of `expression` and if it is a `Message<?>` it is sent to channel without creating a new one
which previously wrapped that `Message<?>` as the `payload`.
* Add `<script>` support to the `<outbound-channel-adapter>`
* Upgrade to the SF 4.1.1
* Add appropriate notes to the Docs
  • Loading branch information
artembilan authored and garyrussell committed Oct 14, 2014
1 parent d5f1bb6 commit 252c53d
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ subprojects { subproject ->
springSecurityVersion = '3.2.5.RELEASE'
springSocialTwitterVersion = '1.1.0.RELEASE'
springRetryVersion = '1.1.1.RELEASE'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.1.0.RELEASE'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.1.1.RELEASE'
springWsVersion = '2.2.0.RELEASE'
xmlUnitVersion = '1.5'
xstreamVersion = '1.4.7'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.w3c.dom.Element;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.RootBeanDefinition;
Expand All @@ -27,6 +28,7 @@
import org.springframework.integration.handler.ExpressionEvaluatingMessageHandler;
import org.springframework.integration.handler.MethodInvokingMessageHandler;
import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;

/**
* Parser for the &lt;outbound-channel-adapter/&gt; element.
Expand All @@ -39,20 +41,32 @@ public class DefaultOutboundChannelAdapterParser extends AbstractOutboundChannel

@Override
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
BeanComponentDefinition innerConsumerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element, parserContext);
Object source = parserContext.extractSource(element);
BeanComponentDefinition innerConsumerDefinition =
IntegrationNamespaceUtils.parseInnerHandlerDefinition(element, parserContext);

String consumerRef = element.getAttribute(IntegrationNamespaceUtils.REF_ATTRIBUTE);
String methodName = element.getAttribute(IntegrationNamespaceUtils.METHOD_ATTRIBUTE);
String consumerExpressionString = element.getAttribute(IntegrationNamespaceUtils.EXPRESSION_ATTRIBUTE);
Element scriptElement = DomUtils.getChildElementByTagName(element, "script");

boolean isInnerConsumer = innerConsumerDefinition != null;
boolean isRef = StringUtils.hasText(consumerRef);
boolean isExpression = StringUtils.hasText(consumerExpressionString);
boolean hasMethod = StringUtils.hasText(methodName);
boolean hasScript = scriptElement != null;

if (!(isInnerConsumer ^ (isRef ^ isExpression))) {
if (!isInnerConsumer & !isRef & !isExpression & !hasScript) {
parserContext.getReaderContext().error(
"Exactly one of the 'ref', 'expression' or inner bean is required.", element);
"Exactly one of the 'ref', 'expression', <script> or inner bean is required.", source);
}

if (hasScript) {
if (isRef | isExpression) {
parserContext.getReaderContext().error(
"Neither 'ref' nor 'expression' are permitted when an inner script element is configured.",
source);
}
}

if (hasMethod & isExpression) {
Expand All @@ -68,6 +82,13 @@ protected AbstractBeanDefinition parseConsumer(Element element, ParserContext pa
expressionDef.getConstructorArgumentValues().addGenericArgumentValue(consumerExpressionString);
consumerBuilder.addConstructorArgValue(expressionDef);
}
else if (hasScript) {
consumerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingMessageHandler.class);
BeanDefinition scriptBeanDefinition = parserContext.getDelegate().parseCustomElement(scriptElement,
consumerBuilder.getBeanDefinition());
consumerBuilder.addConstructorArgValue(scriptBeanDefinition);
consumerBuilder.addConstructorArgValue("processMessage");
}
else {
consumerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingMessageHandler.class);
if (isRef) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,11 +1152,11 @@
</xsd:complexType>

<xsd:complexType name="outboundChannelAdapterType">
<xsd:all>
<xsd:choice minOccurs="0" maxOccurs="3">
<xsd:element name="poller" type="basePollerType" minOccurs="0" maxOccurs="1" />
<xsd:element ref="beans:bean" minOccurs="0" maxOccurs="1" />
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0" maxOccurs="1" />
</xsd:all>
<xsd:any namespace="##other" processContents="strict" minOccurs="0" maxOccurs="1" />
</xsd:choice>
<xsd:attributeGroup ref="channelAdapterAttributes" />
<xsd:attribute name="order">
<xsd:annotation>
Expand All @@ -1171,9 +1171,9 @@
</xsd:complexType>

<xsd:complexType name="outboundChannelAdapterTypeChain">
<xsd:all>
<xsd:element ref="beans:bean" minOccurs="0" maxOccurs="1" />
</xsd:all>
<xsd:choice minOccurs="0">
<xsd:any namespace="##other" processContents="strict" minOccurs="0" maxOccurs="1" />
</xsd:choice>
<xsd:attribute name="id" type="xsd:string" />
</xsd:complexType>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ public class ApplicationEventListeningMessageProducer extends ExpressionMessageP
* In addition, this method re-registers the current instance as a {@link ApplicationListener}
* with the {@link ApplicationEventMulticaster} which clears the listener cache. The cache will be
* refreshed on the next appropriate {@link ApplicationEvent}.
*
* @param eventTypes The event types.
*
* @see ApplicationEventMulticaster#addApplicationListener
* @see #supportsEventType
*/
Expand Down Expand Up @@ -104,8 +102,15 @@ public void onApplicationEvent(ApplicationEvent event) {
this.sendMessage((Message<?>) event.getSource());
}
else {
Object payload = this.evaluatePayloadExpression(event);
this.sendMessage(this.getMessageBuilderFactory().withPayload(payload).build());
Message<?> message = null;
Object result = this.evaluatePayloadExpression(event);
if (result instanceof Message) {
message = (Message<?>) result;
}
else {
message = this.getMessageBuilderFactory().withPayload(result).build();
}
this.sendMessage(message);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import com.gemstone.gemfire.cache.CacheClosedException;
Expand All @@ -41,6 +42,7 @@
*
* @author Mark Fisher
* @author David Turanski
* @author Artem Bilan
* @since 2.1
*/
@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down Expand Up @@ -131,8 +133,15 @@ private void processEvent(EntryEvent event) {

}

private void publish(Object payload) {
sendMessage(CacheListeningMessageProducer.this.getMessageBuilderFactory().withPayload(payload).build());
private void publish(Object object) {
Message<?> message = null;
if (object instanceof Message) {
message = (Message<?>) object;
}
else {
message = getMessageBuilderFactory().withPayload(object).build();
}
sendMessage(message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.Set;

import com.gemstone.gemfire.cache.query.CqEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand All @@ -31,6 +30,8 @@
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import com.gemstone.gemfire.cache.query.CqEvent;

/**
* Responds to a Gemfire continuous query (set using the #query field) that is
* constantly evaluated against a cache
Expand All @@ -39,6 +40,7 @@
*
* @author Josh Long
* @author David Turanski
* @author Artem Bilan
* @since 2.1
*
*/
Expand All @@ -55,12 +57,11 @@ public class ContinuousQueryMessageProducer extends ExpressionMessageProducerSup

private boolean durable;

private volatile Set<CqEventType> supportedEventTypes = new HashSet<CqEventType>(Arrays.asList(CqEventType.CREATED,
CqEventType.UPDATED));
private volatile Set<CqEventType> supportedEventTypes =
new HashSet<CqEventType>(Arrays.asList(CqEventType.CREATED, CqEventType.UPDATED));

/**
*
* @param queryListenerContainer a {@link org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer}
* @param queryListenerContainer a {@link ContinuousQueryListenerContainer}
* @param query the query string
*/
public ContinuousQueryMessageProducer(ContinuousQueryListenerContainer queryListenerContainer, String query) {
Expand All @@ -71,15 +72,13 @@ public ContinuousQueryMessageProducer(ContinuousQueryListenerContainer queryList
}

/**
*
* @param queryName optional query name
*/
public void setQueryName(String queryName) {
this.queryName = queryName;
}

/**
*
* @param durable true if the query is a durable subscription
*/
public void setDurable(boolean durable) {
Expand Down Expand Up @@ -110,10 +109,7 @@ protected void onInit() {

/*
* (non-Javadoc)
*
* @see
* org.springframework.data.gemfire.listener.QueryListener#onEvent(com.gemstone
* .gemfire.cache.query.CqEvent)
* @see org.springframework.data.gemfire.listener.QueryListener#onEvent(com.gemstone.gemfire.cache.query.CqEvent)
*/
@Override
public void onEvent(CqEvent event) {
Expand All @@ -122,9 +118,15 @@ public void onEvent(CqEvent event) {
logger.debug(String.format("processing cq event key [%s] event [%s]", event.getQueryOperation()
.toString(), event.getKey()));
}
Message<?> cqEventMessage = this.getMessageBuilderFactory().withPayload(evaluatePayloadExpression(event))
.build();
sendMessage(cqEventMessage);
Message<?> message = null;
Object object = evaluatePayloadExpression(event);
if (object instanceof Message) {
message = (Message<?>) object;
}
else {
message = getMessageBuilderFactory().withPayload(object).build();
}
sendMessage(message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,12 @@
</groovy:script>
</service-activator>

<beans:bean id="invoked" class="java.util.concurrent.atomic.AtomicBoolean"/>

<outbound-channel-adapter id="outboundChannelAdapterWithGroovy">
<groovy:script>
invoked.set(true)
</groovy:script>
</outbound-channel-adapter>

</beans:beans>
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import groovy.lang.GroovyObject;
import groovy.lang.MissingPropertyException;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.hamcrest.Matchers;
import org.junit.Test;
Expand All @@ -38,20 +37,23 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.ReplyRequiredException;
import org.springframework.integration.scripting.ScriptVariableGenerator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scripting.groovy.GroovyObjectCustomizer;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import groovy.lang.GroovyObject;
import groovy.lang.MissingPropertyException;

/**
* @author Mark Fisher
* @author Oleg Zhurakousky
Expand Down Expand Up @@ -81,6 +83,12 @@ public class GroovyServiceActivatorTests {
@Autowired
private MyGroovyCustomizer groovyCustomizer;

@Autowired
private AtomicBoolean invoked;

@Autowired
private MessageChannel outboundChannelAdapterWithGroovy;


@Test
public void referencedScriptAndCustomiser() throws Exception{
Expand Down Expand Up @@ -193,6 +201,12 @@ public void variablesAndScriptVariableGenerator() throws Exception{
new ClassPathXmlApplicationContext("GroovyServiceActivatorTests-fail-withgenerator-context.xml", this.getClass());
}

@Test
public void testGroovyScriptForOutboundChannelAdapter() {
this.outboundChannelAdapterWithGroovy.send(new GenericMessage<String>("foo"));
assertTrue(this.invoked.get());
}


public static class SampleScriptVariSource implements ScriptVariableGenerator{
public Map<String, Object> generateScriptVariables(Message<?> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void setHeaders(HttpHeaders headers) {
* @return the {@link #clientSession}, if established.
*/
@Override
public WebSocketSession getSession(String sessionId) throws Exception {
public WebSocketSession getSession(String sessionId) {
if (this.isRunning()) {
try {
this.connectionLatch.await(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Map<String, WebSocketSession> getSessions() {
return Collections.unmodifiableMap(this.sessions);
}

public WebSocketSession getSession(String sessionId) throws Exception {
public WebSocketSession getSession(String sessionId) {
WebSocketSession session = this.sessions.get(sessionId);
Assert.notNull(session, "Session not found for id '" + sessionId + "'");
return session;
Expand Down
Loading

0 comments on commit 252c53d

Please sign in to comment.