Category Archives: Spring JMS

JMS ActiveMQ Broker Topologies and High-Availability Configuration

Pure Master Slave BrokerTopology
Pure Master Slave Simplified Topology
I am not actually going to go into Broker topologies, there are many great resources for that such as this by Bruce Snyder: http://www.slideshare.net/bruce.snyder/messaging-with-activemq-presentation or http://activemq.apache.org/topologies.html, all great stuff. This example uses a store and forward topology, or, distributed queues, and incorporates basic authentication:

My use case was to handle down JMS Servers. What I needed to do was implement failover as well as master slave strategies and a topology for message redundancy in case of hardware failure, etc. The client could not have any message loss. With failover, you can see how ActiveMQ switches from the main broker to the second, third, etc on failure. I have a case of four JMS servers in production, each server it is on is load balanced.

There are just a few configurations to add or modify in order to set up JMS Failover with Master/Slave for your broker topology. Here is a basic configuration. For this use case, all JMS servers are configured as standalone versus embedded.

I. Client URI

You will need to add the Failover protocol, either with a basic URI pattern or a composite. In this use case, there are load balanced servers in Production and multiple Development and QA environments which require different configurations for master/slave and failover.

In your application’s properties file for messaging add a modified version of this with your mappings:
activemq.broker.uri=failover://(tcp://localhost:61616,tcp://slaveh2:61616,tcp://master2:61616,tcp://slave2:61616,network:static://(tcp://localhost:61616,tcp://master2:61616,tcp://slave2:61616))?randomize=false

Note: I set connections as locked down (static) communication configurations vs multicast or dynamic discovery so that I know exactly what servers can communicate with each other and how. Also this is assuming you have one set per environment to account for mapping the appropriate IP’s in development, qa, production, dr, etc.

Note: Do not configure networkConnections for master slave, they are handled on the slave with the following configuration:
<masterConnector remoteURI= "tcp://masterhost:61616" userName="wooty" password="woo"/>

II. Spring Configuration

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<constructor-arg value="${amq.broker.uri}"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
</property
</bean>

III. Broker Configuration

Master

<broker brokerName="{hostname}" waitForSlave="true" xmlns="http://activemq.apache.org/schema/core" dataDirectory="${activemq.base}/data">
<networkConnectors>
<!-- passed in by the client broker URI so you can easily manager per environment: sweet -->
</networkConnectors>

<transportConnectors>
<!-- TCP uses the OpenWire marshaling protocol to convert messages to stream of bytes (and back) -->
<transportConnector name="tcp" uri="tcp://localhost:61616?trace=true" />
<transportConnector name="nio" uri="nio://localhost:61618?trace=true" />
<!-- <transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="http" uri="http://localhost:61613"/
<transportConnector name="https" uri="https://localhost:61222"/> -->
<transportConnectors>
</transportConnectors>

<!-- Basic security and credentials -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="admin, publishers,consumers"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

..more configuration
</broker>

Slave: for ActiveMQ 4.1 or later which also allows for authentication as show below

<broker brokerName="{hostname}Slave" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>

<services>
<masterConnector remoteURI= "tcp://masterhost:62001" userName="wooty" password="woo"/
</services>
<!-- Basic security and credentials -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="admin, publishers,consumers"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

</broker>

Share/Save

Decoupling Asynchronous Messaging With Spring JMS and Spring Integration

The importance of decoupling in applications is vital but it is not easy to do it well, I am constantly working to improve my strategies. Even more important is the role of Messaging in an enterprise context and in the design. I think Message-driven architecture is perhaps the most important to integrate into applications in terms of its scope of applicability and how it lends to scalability. In this vein, Spring Integration becomes a highly intriguing element worthy of study, testing, and integration. I barely touch on Spring Integration here, particulary in its standard usage but simply use a few classes programmatically to decouple JMS implementation from its clients.

Messaging and Decoupling

Messaging is everywhere and so subtle we are not aware of it, it just happens all around us, all the time. Consider this: in Genetics on a cellular level, which is nongranular in the scope of genetics itself, the major elements in a cell communicate, but how? They are not connected by any physical construct save the mutual environment they are in so how do they do it? They message each other with signals, receptors and a means to translate those messages, which contain instructions.

In Gene expression, DNA/RNA within eukaryote cells (think systems within systems within systems…elements moving in space and time under specific, ever changing constraints and environmental fluctuations (put that in your Agile timebox!) ) communicate by transmitting messages, intercepting, translating and even performing message amplification. There are specialized elements, mRNA specifically, Messenger RNA, which are translated by signal recognition particles… Cool, right? But this happens all around us, outside, in space, everywhere. And it is all decoupled, and therein lies the beauty of messaging.

So what about our applications? Here is one very simple, isolated application of using Spring Integration ( a low-level usage, not very sophisticated ) to decouple your client and server messaging code:

So I wrote a little java package that integrates Spring JMS, ActiveMQ, Spring Integration and Flex Messaging for the front end which hooks into either BlazeDS or Livecycle Data Services. I had a bunch of constraints to solve for such as all Destinations had to come from the database, there were lifecycle issues as far as timing of element initializations with the IoC bean creation and Flex Messaging elements being created and having what they needed such as the Flex Messaging adaptors which I resolved by flex java bootstrapping. In another post I will go into the JMS package further. For the topic here let’s focus on the JMS-Spring JMS-Spring Integration bridge.

The image to the left shows the layout of my jms package to facilitate the mapping. In this system, messages come in from 2 areas: the client and java services on the server. Complicated systems will have many more but let’s talk about the 2 that most would have.  The client sends messages to the server that are both user messages and operational messages by the system. Java services send messages when certain business rules and criteria are triggered, passing business data and any message-aware object could be sending messages.

Sending on the server

To insure proper usage I created an interface that a service must implement to send to ActiveMQ, called JMSClientSupport. Note all code in this post is simplified. Here, I actually have it returning a validation message if errors occurred so that a business service developer could implement handling per requirements.

A Business Entity
public class Foo implements Serializable {...}

A Service
public class FooServiceImpl implements BusinessService, FooService, SMTPClientSupport, JMSClientSupport {
public void insert(Foo foo) {
..//do some important business stuff
publish(foo);
}
public void publish(Object object) {
if ((someBusinessValidation((Foo) object)) {
jmsService.send(destinationId, object);
}
}
}


public interface JMSClientSupport {
void publish (Object object);
}

Sending from the Client

You could have any client sending messages of any nature to the server. In this case I am using Flex. Messages of type <T> are wrapped on the client as an IMessage {AsyncMessage,CommandMessage etc}. When these messages make their way through Blaze or Livecycle, I have it wired to hit this java adapter which is represented in a Hash per FlexDestination for 1:1 FlexDestination : JMS Destination by Flex.

For this example I am implementing the JMS MessageListener to show tight coupling as well as decoupling:

public class FlexMessagingAdapter extends MessagingAdapter implements MessageListener {
// Invoked by Flex when a message comes in from the client to this adapter's Destination
public Object invoke(Message message) {
// a custom interceptor that extracts partition Destination info like ActiveMQ message group or subtopics like STOCKS.NASDAQ for more specific routing
String partition = new DestinationPartitionInterceptor(message).intercept();
jmsService.send(destination, new IntegrationMessageCreator (message, partition));
}
return null;
}


// Decoupled: Invoked when a Message is received from the Spring Integration channel
public void handleMessage(org.springframework.integration.core.Message<?> message) {....}

// Sets the Spring Integration MessageChannel for sending and receiving messages
public void setMessageChannel(MessageChannel messageChannel) {
this.messageChannel = messageChannel;
}


// Tightly coupled with JMS by the MessageListener.onMessage() method
public void onMessage(javax.jms.Message jmsMessage) {
flex.messaging.messages.Message message = new IntegrationMessageCreator(jmsMessage).createMessage(getDestination().getId(), getSubscribers());
if (getMessageService().getMessageBroker().getChannelIds().contains("streaming-amf")) {
MessageBroker broker = MessageBroker.getMessageBroker(null);
broker.routeMessageToService(message, null);
} else {
getMessageService().pushMessageToClients(message, true);
}
}}

The Transformer: Where Messages Intersect

I have a second post that show an even more decoupled messaging strategy with Spring Integration but this is purely a a basic idea using Flex Messaging, Spring Integration, Spring JMS and ActiveMQ. I will post the more broad strategy next :)

Step 1: Client messages are transformed here by extending the JMS MessageCreator. In this class I pull out the data from any Object type but specifically Flex Message and a JMSMessage types.

public class IntegrationMessageCreator implements MessageCreator {
// a few constructors here to handle multiple message types: JMSMessage, Flex Message, Object message, etc

private MessageBuilder createBuilder() {
MessageBuilder builder = null;
if (this.object != null) {
builder = MessageBuilder.withPayload(object);
} else if (this.flexMessage != null && flexMessage.getBody() != null) {
builder = MessageBuilder.withPayload(flexMessage.getBody()).copyHeaders(flexMessage.getHeaders());
}
// ActiveMQ Message Groups
if (this.partition != null) builder.setHeader(MessageConstants.Headers.JMSXGROUPID, partition);

return builder;
}

// to JMS
public javax.jms.Message createMessage(Session session) throws JMSException {
return new IntegrationMessageConverter().toMessage(createBuilder().build(), session);
}

// To Flex
public flex.messaging.messages.Message createMessage(String destinationId, int subscribers) {
Message integrationMessage = (Message) new IntegrationMessageConverter().fromMessage(this.jmsMessage);

flex.messaging.messages.Message flexMessage = new AsyncMessage();
flexMessage.setBody(integrationMessage.getPayload());
flexMessage.setDestination(destinationId);
flexMessage.setHeaders(integrationMessage.getHeaders());
// …and other good jms to flex data

return flexMessage;
}
}

The Converter


import org.springframework.integration.jms.HeaderMappingMessageConverter;
import org.springframework.integration.core.Message;
import javax.jms.Session;

public class IntegrationMessageConverter extends HeaderMappingMessageConverter {

// Converts from a JMS Message to an Integration Message. You should do a try catch but I cut it out for brevity
public Object fromMessage(javax.jms.Message jmsMessage) throws Exception {
return (Message) super.fromMessage(jmsMessage);
}

// Converts from an Integration Message to a JMS Message. You should do a try catch but I cut it out for brevity
public javax.jms.Message toMessage(Object object, Session session) throws Exception {
return jmsMessage = super.toMessage(object, session);
}
}

JMS Asynchronous Reception

In my jmsConfig.xml I configured one Spring MessageListenerAdapter which I have wired with a message delegate, a POJO and its overloaded handleMessage method name:

<bean id=”messageListenerAdapter”>
<property name=”delegate” ref=”defaultMessageDelegate”/>
<property name=”defaultListenerMethod” value=”handleMessage”/>
<property name=”messageConverter” ref=”simpleMessageConverter”/>
</bean>

As the application loads and all Spring beans are initialized, I initialize all of my JMS Destinations. As I do this, I also initialize a MessageListenerAdapter for each Destination. I have a stateless JMSService, which is called by another service, MessagingGateway, to initialize each Destination and which calls PollingListenerContainerFactory to create child MessageListenerAdaptors for each Destination. The adapters are configured based on an abstract parent configuration:

<bean id=”abstractListenerContainer” abstract=”true” destroy-method=”destroy”>
<property name=”connectionFactory” ref=”pooledConnectionFactory”/>
<property name=”transactionManager” ref=”jmsTransActionManager”/>
<property name=”cacheLevel” value=”3″/>
<property name=”taskExecutor” ref=”taskExecutor”/>
<property name=”autoStartup” value=”true”/>
</bean>

Snippet from PollingListenerContainerFactory:

/**
* Gets the parent from the IoC to reduce runtime config and
* resources to create children.
* <p/>
* DefaultMessageListenerContainer is Responsible for all threading
* of message reception and dispatches into the listener for processing.
* Supports dynamic scaling for a higher during peakloads.
*
* @param destination
* @param messageListener
* @return
*/
public static DefaultMessageListenerContainer createMessageListenerContainer(Destination destination, MessageListener messageListener) {

ChildBeanDefinition childBeanDefinition = new ChildBeanDefinition(“abstractListenerContainer”, configureListenerContainer(destination, messageListener));

String beanID = IdGeneratorUtil.getStringId();
ConfigurableListableBeanFactory beanFactory = ApplicationContextAware.getConfigurableListableBeanFactory();
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(beanID, childBeanDefinition);

DefaultMessageListenerContainer container = (DefaultMessageListenerContainer) ApplicationContextAware.getBean(beanID);
container.setDestination(destination);
return container;
}

/**
* Configures the child listener, based on the parent in jmsConfig.xml.
* <p>Configures Queue or Topic consumers:
* Queue: stick with 1 consumer for low-volume queues: default is
* Topic: there’s no need for more than one concurrent consumer
* Durable Subscription: Only 1 concurrent consumer supported
* <p/>
* props.addPropertyValue(“messageSelector”, “”); sets a message selector for this listener
*
* @param destination
* @param messageListener
* @return
*/
private static MutablePropertyValues configureListenerContainer(Destination destination, MessageListener messageListener) {
MutablePropertyValues props = new MutablePropertyValues();
props.addPropertyValue(“destination”, destination);
props.addPropertyValue(“messageListener”, messageListener);

// Enable throttling on peak loads
if (destination instanceof Queue) {
props.addPropertyValue(“maxConcurrentConsumers”, 50); // modify to needs
}
// Override default setting Point-to-Point (Queues)
if (destination instanceof Topic) {
props.addPropertyValue(“pubSubDomain”, true);
}

return props;
}

this is overkill to this topic but its cool stuff. So we now have a JMS listener for asynchronous JMS reception as well as flex, but now let’s look at the message delegate we wired into the MessageListenerAdapter:

public interface MessageDelegate {

void handleMessage(String message);

void handleMessage(Map message);

void handleMessage(Serializable message);
}

Pretty simple, right? It’s a POJO with absolutely no JMS code whatsoever for asynchronous message reception. How does it work? Spring abstracts the JMS code and calls it behind the scenes, if you look at the source code for Spring’s SimpleMessageConverter, it does the fromMessage() toMessage() handling for you and throws the resultant “message” into the appropriate overloaded method above. Now this is great for simple message abstraction but the above with JMS-Flex and Spring Integration is an example of more complicated handling. With clients you often need to translate and transfer the data from message type 1 to message type 2. In the adapter code above, you would use the handleMessage() method to get the message from Spring Integration and into the message type of your choice, here, a Flex message.

Share/Save

How To Run Two ActiveMQ Brokers On A Laptop

I needed to set up and test ActiveMQ failover URI’s and play around with Broker topologies for optimization in a safe environment (in my control environment where no other app servers or brokers other than this env could access). I have a Mac laptop with multiple VM operating systems so that I can easily work with any client/company requirements for a current project (ubuntu, windoz, etc). Here’s what I did to set up 2 local test Brokers:

1. Know both native and virtual operating system host names. IP addresses will not work

Explicitly setting IP addresses in the tc config will fail on the native OS as it attempts to bind to the virtual IP

2. Give each Broker a unique name

Standard procedure for any multiple Broker topology on a network

Installing, configuring, and running ActiveMQ is a no brainer so I won’t cover that here but the specific, simple configuration allowing me to run multiple instances on one machine were:

3. Super basic configuration of the connectors: In the activemq.xml Spring config file

Note: what I did here is set up two transport protocols, one for TCP, one for NIO


Instance One: on the native laptop OS

<networkConnectors>
<networkConnector name=”nc-1″ uri=”multicast://default” userName=”system” password=”manager”/>
</networkConnectors>

<transportConnectors>
<transportConnector name=”tcp” uri=”tcp://nativeHostName:61616?trace=true” discoveryUri=”multicast://default”/>
<transportConnector name=”nio” uri=”nio://nativeHostName:61618?trace=true” discoveryUri=”multicast://default”/>
</transportConnectors>


Instance Two: on the virtual machine’s OS

<networkConnectors>
<networkConnector name=”nc-2″ uri=”multicast://default” userName=”system” password=”manager”/>
</networkConnectors>

<transportConnectors>
<transportConnector name=”tcp” uri=”tcp://virtualHostName:61616?trace=true” discoveryUri=”multicast://default”/>
<transportConnector name=”nio” uri=”nio://virtualHostName:61618?trace=true” discoveryUri=”multicast://default”/>
</transportConnectors>

It’s that easy.

4. Start the native then virtual ActiveMQ servers

5. Open a browser on both OS’s and if you enabled the web console, at http://localhost:8161/admin/ you will see the broker interface with each unique broker name.

Share/Save

SuMQ Messaging Framework

I’ve just started staging my new project, SuMQ, and appreciate patience as this will take me a while to get the code standardized and on google.

SuMQ is a light-weight enterprise messaging framework built in Java, leveraging Spring, JMS, and ActiveMQ. It plugs into Flex Messaging via BlazeDS for the client. This can also be configured for other clients aside from Flex.

The sample will be ready for clustered BlazeDS instances and load balanced Application Servers.

code.google.com/p/sumq

Share/Save

Enterprise Messaging with JMS, Flex, Spring and ActiveMQ

I finally finished building an enterprise messaging system using Spring, JMS, and ActiveMQ integrated with Flex Messaging from Livecycle Data Services and a clients in both Flex and Java. The Flex-Java(Spring) integration is handled by Anvil for this project.

I plan to re-configure this in a simplified app using BlazeDS, Spring BlazeDS Integration with Spring, ActiveMQ, Flex, JMS and putting the source code here for a test drive in Tomcat, unless SpringSource beats me to the punch ;)

I probably won’t finish this for at least a month though since I’m still tuning the Spring/ActiveMQ configuration for enterprise use cases.

Share/Save

Configuring JMS in Spring

When initially looking into implementation strategies for building a JMS framework component into an enterprise application, it took quite a while to sort out what elements were necessary. From there, I began looking into what the configuration options were. What follows are the key components of my first JMS implementation with Spring 2.5x, Java 1.6x. Note that this is for a client that while were were using the latest java sdk, we were deliberately introducing annotations slowly. I started with annotations in java, Spring, and JUnit for creating a parent test that rolled back database state after method execution and transaction completion. The core developers were new to Java, so we kept all spring config files as schema, and I based my configurations, even for AOP, as mostly non annotational for now. Although I have written a SystemArchitecture.java for design-level assertions, it will not be implemented for at least a year :) too bad because it’s fun stuff.

<?xml version=”1.0″ encoding=”UTF-8″?>
<beans xmlns=”http://www.springframework.org/schema/beans”
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xmlns:p=”http://www.springframework.org/schema/p”
xmlns:jms=”http://www.springframework.org/schema/jms”
xmlns:amq=”http://activemq.apache.org/schema/core”
xsi:schemaLocation=”
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd”>

…this post is in process…more soon

Share/Save