Skip to content

Commit

Permalink
INT-2352 Support Control-Bus Atomic Router Updates
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-2352

Add `replaceChannelMappings()` to the `AMMR`.

INT-2352 Polishing - PR Comments

Consolidate tests.
Fix up `@ManagedAttribute` Vs `@ManagedOperation`
Expose `setChannelMappings` over JMX
  • Loading branch information
garyrussell authored and Artem Bilan committed Apr 15, 2014
1 parent 466af8a commit 66fe1c7
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -64,15 +67,13 @@ public abstract class AbstractMappingMessageRouter extends AbstractMessageRouter
*
* @param channelMappings The channel mappings.
*/
@Override
@ManagedAttribute
public void setChannelMappings(Map<String, String> channelMappings) {
Map<String, String> oldChannelMappings = this.channelMappings;
Assert.notNull(channelMappings, "'channelMappings' must not be null");
Map<String, String> newChannelMappings = new ConcurrentHashMap<String, String>();
newChannelMappings.putAll(channelMappings);
this.channelMappings = newChannelMappings;
if (logger.isDebugEnabled()) {
logger.debug("Channel mappings:" + oldChannelMappings
+ " replaced with:" + newChannelMappings);
}
this.doSetChannelMappings(newChannelMappings);
}

/**
Expand Down Expand Up @@ -122,7 +123,9 @@ public void setResolutionRequired(boolean resolutionRequired) {
*
* @return The channel mappings.
*/
protected Map<String, String> getChannelMappings() {
@Override
@ManagedAttribute
public Map<String, String> getChannelMappings() {
return Collections.unmodifiableMap(this.channelMappings);
}

Expand Down Expand Up @@ -176,6 +179,38 @@ protected Collection<MessageChannel> determineTargetChannels(Message<?> message)
return channels;
}

/**
* Convenience method allowing conversion of a list
* of mappings in a control-bus message.
* <p>This is intended to be called via a control-bus; keys and values that are not
* Strings will be ignored.
* <p>Mappings must be delimited with newlines, for example:
* <p>{@code "@'myRouter.handler'.replaceChannelMappings('foo=qux \n baz=bar')"}.
* @param channelMappings The channel mappings.
*
* @since 4.0
*/
@Override
@ManagedOperation
public void replaceChannelMappings(Properties channelMappings) {
Assert.notNull(channelMappings, "'channelMappings' must not be null");
Map<String, String> newChannelMappings = new ConcurrentHashMap<String, String>();
Set<String> keys = channelMappings.stringPropertyNames();
for (String key : keys) {
newChannelMappings.put(key.trim(), channelMappings.getProperty(key).trim());
}
this.doSetChannelMappings(newChannelMappings);
}

private void doSetChannelMappings(Map<String, String> newChannelMappings) {
Map<String, String> oldChannelMappings = this.channelMappings;
this.channelMappings = newChannelMappings;
if (logger.isDebugEnabled()) {
logger.debug("Channel mappings:" + oldChannelMappings
+ " replaced with:" + newChannelMappings);
}
}

private MessageChannel resolveChannelForName(String channelName, Message<?> message) {
if (this.channelResolver == null) {
this.onInit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
*/
package org.springframework.integration.router;

import java.util.Map;
import java.util.Properties;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.messaging.core.DestinationResolver;

/**
* Exposes channel mapping operations when the router is proxied.
* {@link #setChannelMappings(Map)} is also exposed. This cannot
* be used with a control-bus, but it can be used programmatically
* or over JMX.
*
* @author Gary Russell
* @since 2.1
*
Expand All @@ -26,19 +36,45 @@ public interface MappingMessageRouterManagement {

/**
* Add a channel mapping from the provided key to channel name.
*
* @param key The key.
* @param channelName The channel name.
*/
@ManagedOperation
public abstract void setChannelMapping(String key, String channelName);
void setChannelMapping(String key, String channelName);

/**
* Remove a channel mapping for the given key if present.
*
* @param key The key.
*/
@ManagedOperation
public abstract void removeChannelMapping(String key);
void removeChannelMapping(String key);

/**
* Provide mappings from channel keys to channel names.
* @param channelMappings The channel mappings.
*
* @since 4.0
*/
@ManagedOperation
void replaceChannelMappings(Properties channelMappings);

/**
* @return an unmodifiable map of channel mappings.
*
* @since 4.0
*/
@ManagedAttribute
Map<String, String> getChannelMappings();

/**
* Provide mappings from channel keys to channel names.
* Channel names will be resolved by the {@link DestinationResolver}.
*
* @param channelMappings The channel mappings.
*
* @since 4.0
*/
@ManagedAttribute
void setChannelMappings(Map<String, String> channelMappings);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,13 @@

<beans:bean id="service" class="org.springframework.integration.config.xml.ControlBusTests$Service" />

<router id="router" expression="headers['channel']" input-channel="routerIn">
<mapping value="foo" channel="bar" />
<mapping value="baz" channel="qux" />
</router>

<channel id="bar" />

<channel id="qux" />

</beans:beans>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -40,6 +41,7 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

Expand All @@ -51,6 +53,7 @@
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class ControlBusTests {

@Autowired
Expand Down Expand Up @@ -115,6 +118,26 @@ public void testControlHeaderChannelReaper() throws InterruptedException {
this.registry.setReaperDelay(60000);
}

@Test
public void testRouterMappings() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setReceiveTimeout(1000);
messagingTemplate.convertAndSend(input, "@'router.handler'.getChannelMappings()");
Message<?> result = this.output.receive(0);
assertNotNull(result);
Map<?, ?> mappings = (Map<?, ?>) result.getPayload();
assertEquals("bar", mappings.get("foo"));
assertEquals("qux", mappings.get("baz"));
messagingTemplate.convertAndSend(input,
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')");
messagingTemplate.convertAndSend(input, "@'router.handler'.getChannelMappings()");
result = this.output.receive(0);
assertNotNull(result);
mappings = (Map<?, ?>) result.getPayload();
assertEquals("bar", mappings.get("baz"));
assertEquals("qux", mappings.get("foo"));
}

public static class Service {

private final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,28 @@
*/
package org.springframework.integration.jmx;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

Expand All @@ -35,6 +47,7 @@
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class UpdateMappingsTests {

@Autowired
Expand All @@ -46,6 +59,9 @@ public class UpdateMappingsTests {
@Autowired
private PollableChannel qux;

@Autowired
private MBeanServer server;

@Test
public void test() {
control.send(new GenericMessage<String>("@myRouter.setChannelMapping('baz', 'qux')"));
Expand All @@ -55,4 +71,44 @@ public void test() {
assertNotNull(qux.receive());
}

@Test
public void testChangeRouterMappings() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setReceiveTimeout(1000);
messagingTemplate.convertAndSend(control,
"@'router.handler'.replaceChannelMappings('foo=bar \n baz=qux')");
Map<?, ?> mappings = messagingTemplate.convertSendAndReceive(control, "@'router.handler'.getChannelMappings()", Map.class);
assertNotNull(mappings);
assertEquals(2, mappings.size());
assertEquals("bar", mappings.get("foo"));
assertEquals("qux", mappings.get("baz"));
messagingTemplate.convertAndSend(control,
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')");
mappings = messagingTemplate.convertSendAndReceive(control, "@'router.handler'.getChannelMappings()", Map.class);
assertEquals(2, mappings.size());
assertEquals("bar", mappings.get("baz"));
assertEquals("qux", mappings.get("foo"));
}

@Test
public void testJmx() throws Exception {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setReceiveTimeout(1000);
Set<ObjectName> names = this.server.queryNames(ObjectName
.getInstance("update.mapping.domain:type=HeaderValueRouter,name=router"),
null);
assertEquals(1, names.size());
Map<String, String> map = new HashMap<String, String>();
map.put("foo", "bar");
map.put("baz", "qux");
Object[] params = new Object[] {map};
this.server.invoke(names.iterator().next(), "setChannelMappings", params,
new String[] { "java.util.Map" });
Map<?, ?> mappings = messagingTemplate.convertSendAndReceive(control, "@'router.handler'.getChannelMappings()", Map.class);
assertNotNull(mappings);
assertEquals(2, mappings.size());
assertEquals("bar", mappings.get("foo"));
assertEquals("qux", mappings.get("baz"));
}

}
29 changes: 28 additions & 1 deletion src/reference/docbook/router.xml
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ public List<String> route(@Header("orderStatus") OrderStatus status)]]></program
</note>
<para>
Typically you would send a control message asking to invoke a
particular operation on a particular managed component (e.g. router). The two managed operations (methods) that are
particular operation on a particular managed component (e.g. router). Two managed operations (methods) that are
specific to changing the router resolution process are:
</para>
<itemizedlist>
Expand All @@ -946,6 +946,33 @@ public List<String> route(@Header("orderStatus") OrderStatus status)]]></program
<code>channel identifier</code> and <code>channel name</code> </para>
</listitem>
</itemizedlist>
<para>
Note that these methods can be used for simple changes (updating a single route or adding/removing a
route). However, if you want to remove one route and add another, the updates are not atomic. This
means the routing table may be in an indeterminate state betweent the updates. Starting with
<emphasis>version 4.0</emphasis>, you can now use the control bus to update the entire routing
table atomically.
</para>
<itemizedlist>
<listitem>
<para>
<code>public Map&lt;String, String&gt;getChannelMappings()</code> returns the current
mappings.
</para>
</listitem>
<listitem>
<para>
<code>public void replaceChannelMappings(Properties channelMappings)</code> updates the mappings.
Notice that the parameter is a properties object; this allows the use of the inbuilt
<code>StringToPropertiesConverter</code> by a control bus command, for example:
<programlisting>"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"</programlisting>
- note that each mapping is separted by a newline character (<code>\n</code>). For programmatic
changes to the map, it is recommended that the <code>setChannelMappings</code> method
is used instead, for type-safety. Any non-String keys or values passed into
<code>replaceChannelMappings</code> are ingnored.
</para>
</listitem>
</itemizedlist>
</section>
<section id="dynamic-routers-jmx">
<title>Manage Router Mappings using JMX</title>
Expand Down

0 comments on commit 66fe1c7

Please sign in to comment.