Helena Edelson

Akka, Spark, Scala, Cassandra, Big Data, Data Science, Cloud Computing, Machine Learning, Distributed Architecture

  • cloud

  • RSS Subscribe

  • Topics

  • Archives



    • @helenaedelson on Twitter
    • Helena Edelson on LinkedIn
    • GitHub Octocat

      helena @ github

      • Status updating...


Archive for the 'Messaging' Category

Upgraded ZeroMQ Scala Binding to work with ZeroMQ 3.2.0

Posted by Helena Edelson on 9th October 2012

I just pushed an experimental upgrade of zeromq-scala-binding to branch wip-scala-2.10.0-M7-zeromq-upgrade-3.2 here http://github.com/helena/zeromq-scala-binding.

Here is a summary of changes:

  • Compatible with:
    • ZeroMQ 3.2.0
    • SBT 0.12.0
    • Scala 2.10.0-M7
  • Tested against 2.2.0 and 3.2.0 on MacOSX
  • See Changes.md for full list of additions, removals, modifications based on the ZeroMQ 3.2 API and Upgrading from libzmq 2.x to 3.2 docs
  • Added scalariform plugin
  • Added sbt-run test duration output
  • Removed build.sbt and replaced it with project/Build.scala
  • Removed the sbt plugin – that should live in the developer’s env vs the codebase
  • Removed sbt build file (see above)
  • Upgraded jnr
  • Upgraded jna

I call it experimental because it has been tested against ZeroMQ 3.2 and 2.2 only, and on one OS only – Mac. The test coverage in the API originally was sparse, and is still in need of further coverage, particularly regarding socket options. If anyone feels like trying it out, let me know what you think. I will try to pick away at test coverage and on a few other OS vm’s over the next few months.

Share/Save

Posted in Akka, Messaging, Open Source, Scala, ZeroMQ | No Comments »

The Importance of Patterns in Messaging

Posted by Helena Edelson on 1st March 2012

Organization who dealt with scalability challenges successfully such as Google, eBay, Amazon and Yahoo, went through architecture changes and eventually reached a similar pattern: Asynchronous event-driven design.

Claim Check – Handling large file sizes

Messaging is most applicable to fast, frequent use cases of data transfer and notifications. Transport of large files is not, however the Claim Check Enterprise Integration Pattern (EIP) is very well suited for this.

How can we reduce the data volume of message sent across the system without sacrificing information content?

  • Store message data in a persistent store that has HA and is highly scalable
    • A NOSQL store is more appropriate than a relational database
      • Document Store: If you choose to store the file itself
        • CouchDB – REST protocol, JSON API
      • Key Value: If you choose to store the location URI of the file
        • Riak – REST protocol, JSON API
  • Pass a Claim Check (unique key) to subsequent components
  • These components use the Claim Check to retrieve the stored information

The Claim Check key allows you to replace message content with a claim check for later message retrieval. This pattern is very useful when message content is very large and would be too expensive to send around. They key here is that not all components require all information – some just need the key, while others need the actual data in the file.

It can also be useful in situations where you cannot trust the information with an outside party; in this case, you can use the Claim Check to hide the sensitive portions of data.

Share/Save

Posted in Messaging | No Comments »

Routing Topologies for Performance and Scalability with RabbitMQ

Posted by Helena Edelson on 1st August 2011

Designing a good routing topology for a highly-scalable system can be like mapping a graph. Many things need to be considered, for instance the problem, constraints of the environment, those of the messaging implementation, and performance strategies. What we often run up against is a lack of flexibility and expressivity in fitting routing to our needs. Here is where RabbitMQ stands out.

Basic Concepts

Anyone familiar with messaging in general knows the concept of routing messages from A to B. Routing can be simplistic or quite complex, and when designing a routing topology for a scalable, complex system it must be elegant. Kept clean and decoupled, components can throttle nicely with varying loads. This can be expressed as a simple map or complex graph. In its simplest form a routing topology can be expressed as nodes, for instance hierarchical nodes:

For those new to RabbitMQ or AMQP (note that Rabbit works with many protocols including STOMP, HTTP, HTTPS, XMPP, and SMTP), here are some basic component descriptions:

  • Exchange The entity within the server which receives messages from producer applications and optionally routes these to message queues within the server
  • Exchange type The algorithm and implementation of a particular model of exchange. In contrast to the “exchange instance”, which is the entity that receives and routes messages within the server
  • Message queue A named entity that holds messages and forwards them to consumer applications
  • Binding An entity that creates a relationship between a message queue and an exchange
  • Routing key A virtual address that an exchange may use to decide how to route a specific message

For point-to-point routing, the routing key is usually the name of a message queue. For topic pub-sub routing the routing key is usually hierarchical in nature:

api.agents.agent-{id}.operations.{operationName}

In more complex cases the routing key may be combined with routing on message header fields and/or its content. An exchange examines a message’s properties, header fields, body content, and possibly data from other sources, then decides how to route the message. A binding pattern derived from the above routing key idea might look like api.agents.*.operations.* where we bind exchange E1 to queue Q1 with binding pattern api.agents.*.operations.* so that any messages sent to E1 route to Q1 if their routing key matches the binding pattern.

A Rabbit broker is structured differently than a JMS Broker. Each RabbitMQ server is comprised of at least one node (broker), or more typically, nodes in a cluster. Each node has a default virtual host, “/”, and further virtual hosts can be created such as “/develoment”. Rabbit virtual hosts are like Tomcat virtual hosts and partition broker data into sub-sets. Within these virtual hosts are exchanges and queues. When a user connects with its credentials, it is connecting to a virtual host on a Rabbit node.

Here we connect to a Rabbit node, declare an exchange to publish to, a queue to consume from, a binding pattern, then publish a few messages, using the RabbitMQ java client api:

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

A simple consume of “landed” rockets could look like:

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);

int landed = 0;
while (landed < launched) {
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    String rocketLanded = new String(delivery.getBody());

    if (rocketLanded.equalsIgnoreCase("Alderaan")) {
        System.out.println("That's no moon, that's a space station.");
    }
    landed++;
}

The Problem

In considering what routing strategies perform best in scalable environments where performance itself can also be improved, there many options. One of the great things about messaging in general is the variety of configurations available, and figuring out the right ones that solve both current and growing requirements.

To keep things simple let's consider two strategies:

  1. Highly-partitioned routing with hierarchical routing keys, fewer topic exchanges
  2. A larger number of direct exchanges and queues with far less routing partitions

Each scenario follows this use case: each application that must scale is both producer and consumer:

Producer Consumer

Where To Start

It is a good idea to take stock of your environment and its components before delving into a routing solution that will scale cleanly and efficiently over time. For example, what lends to scaling? Generally, decoupling, distribution, asynchrony, parallelism, levels of abstraction and indirection to name a few. Then consider what elements are current or potential bottlenecks. It is a basic principle that high traffic/volume pathways require more efficient throughput or you incur risk of bottlenecks in your distribution. One exercise is to rank these in terms of traffic or as a heat map. Next, can you classify your traffic – are there overarching patterns, topics or similar message types, and what are the relationships? Now start to consider consolidation, how and where might efficiency be improved, and apply tested patterns that resolve those heat points, decouple for scale, and increase performance.

General Routing Considerations

All exchange types behave differently. Here are a few general rules:

  • If you have a finite domain of routing keys in an application's graph then many fanout exchanges might be the right fit  (1:1 mapping of exchange per routing  key)
  • If you have a potentially infinite number of routing keys, consider topic exchanges
  • For topic routing, performance decreases as the number of bindings increase
  • Fanout exchanges are very fast because they have no routing to process yet if bound to a large number of queues that changes
  • Direct exchanges are a faster form of topic exchanges, provided you do not need the wild card
  • Troubleshooting problems across 100,000+ queues could be tedious versus a topology with more bindings, fewer exchanges and queues
  • A very high number of exchanges and queues take up more memory which may be significant but this really depends

As of RabbitMQ 2.4.0, released March 23, 2011, a new topic routing algorithm optimization is available that is 60 times faster at peak than the previous topic algorithm. Due to this, one recommendation is to go for less exchanges and queues, and more routing because the time increase is now minimal.

Performance

What is Cheap?

In terms of memory cost, exchanges and bindings. In Erlang, which RabbitMQ is built on, each node (broker) is a process, as is each queue. By default the Erlang VM process limit is set to 1M, which can be raised. However, an exchange is not a process for scalability reasons, it is simply a row in RabbitMQ's built-in Mnesia database. In a cluster, declaring an exchange causes it to appear on all nodes of the cluster, while declaring a queue creates it on only one of the nodes. This explains why exchanges survive node restarts or creating a node in a cluster, yet queues do not.

Be wary of binding churn. In strategy number 2, if you create many new queues and their bindings, whenever consumers attach you might run into problems. For instance, given exchanges E1...En to which many messages are being published, whenever consumer Cm connects, it creates bindings from its own queue to all of E1...En which may cause problems, depending on the rate of connections.

To alleviate binding churn, consider exchange-to-exchange bindings, new as of version 2.3.1. Each consumer could have its own secondary exchange Ym which must not be auto-delete. Then bind all of E1...En to Ym. In this way these bindings always exist. In this scenario, whenever consumer Cm connects it simply has to declare its queue and bind that queue to Ym. If Ym is a fanout exchange, it will be very fast and reduce the binding churn rate to 1 per connection, rather than potentially n per connection.

Exchange-to-Exchange Binding

Use Cases

Exchange-to-Exchange Scalable Use Case

Consider a server application with autonomous agents. Each agent is on a virtual machine that is part of an elastically-scaled system. As each agent starts up it sends a message to the server that it is online, followed by many other messages such authentication and data transfer. If we have 1,000 agents, each declaring 50 direct exchanges, queues and bindings, then each agent must know the server's queues in order to fulfill the binding contract on queue.declare operations. That is not a scalable solution.

Now consider creating shared topic exchanges: one exchange for the agent to server pathway,  another for the server to agent pathway, and a third to handle unauthenticated agents which routes only to those queues that do not require security. Now we partition with binding patterns, message routing keys and bring one set of those up for each server to be shared by all agents that connect to it. Then, in its simplest form, as each agent comes online it declares a private exchange and queue, and binds its exchange to the shared topic exchanges.

Our relationships are now expressed by exchange-to-exchange mappings which reduces churn rate and decouples agents from having to 'know' the server queues. Using this pattern the system is  clean, decoupled, and scalable.

Basic Routing Topology

Elastic-Scaling Use Case

Let's take the previous scenario a step further. We are already using topic pub-sub routing over scenario 2: many direct routings. Now let's say the system requirement bumps up to scale clusters of our server application in a data center with 50,000 or more agents. How can we throttle varying loads?

The authenticated client exchange routes messages from agent to server. It handles all operations publishing messages to singular-consumer queues, including those producing the highest frequency of messages. This is a potential bottleneck under the current topology with roughly 60,000 messages per minute for 10,000 clients or 86,400,000 messages per day. This is easily resolvable, RabbitMQ can handle over 1 billion messages per day depending on your configuration, for example whether or not you are persisting messages.

Our server applications are running a RabbitMQ cluster. Remember that in a cluster, declaring an exchange causes it to appear on all nodes, while declaring a queue creates it on only one of the nodes, so we have to configure a solution.

Load-Balancing Between Producers and Consumers

To efficiently handle these potentially very high loads as more client applications (Agents) come online, we can modify this topology in several ways. First, an optimization from the above configuration to load-balance messages across a Rabbit cluster. We can create one queue for each node in the Rabbit cluster. If we have four nodes, for each high-traffic queue we create hfq.{0,1,2,3} for that operation. Now each agent can randomly pick a node by a number between zero and three, or a more sophisticated round-robin implementation, to publish to. With RabbitMQ there are RPC calls or you can use the Rabbit management plugin to GET the number of nodes, which you can use in your round-robin algorithm.

Worker Queues With Round-Robin Dispatching

Worker Queues, or Task Queues, are generally used to efficiently distribute time-consuming tasks among multiple workers and easily parallelise work. In addition, this topology applies to eliminating the need to execute resource-intensive tasks and having to block until they complete. Running several worker queues allows these tasks to be distributed among them.

AMQP Work Queues, Round Robin Load Balancing

With Work Queues, by default, Rabbit uses a round-robin distribution method, sending each message to the next consumer in sequence. Each consumer receives roughly the same number of messages. If you declare a queue and spin up 3 competing consumers, bind them to the exchange, and send 20,000 messages, messages zero will route to the first consumer, message one to the second, message two to the third and so on. If we begin building up a backlog of tasks, we can simply add more workers, allowing the system to scale easily.

Performance

Memory

Neither option above will necessarily induce high load in RabbitMQ. There are no hard limits to the number of exchanges and queues, one can create, and running 100,000 queues on one broker is fine. With the right tuning and enough RAM you can run well over a million queues.

RabbitMQ dynamically pushes messages to disk to free up RAM, thus the memory footprint of a queue is not dependent on its contents. After a queue is idle for 10 seconds or more it will "hibernate" which causes GC on that queue. As a result, the amount of memory a queue requires can dramatically shrink. For example, it might be possible for 1000 empty, idle queues to take up 10MB of RAM. When they're all active (even if empty), they might of course, depending on memory fragmentation, consume much more memory. Forcing them back into hibernation to test behavior is difficult because the Erlang VM does not hand back memory to the OS immediately.

You can, however, observe a large process that hibernates and has very fragmented memory because the amount reclaimed can be sufficient to force the VM to hand back memory to the OS. If you run a test that steadily increases the memory footprint of Rabbit you could observe the effect of hibernation on idle processes as it reduces the rate of increase of memory use.

Erlang is a multi-threaded VM which takes advantage of multiple cores. It presents green threads to the developer which are called 'processes' because unlike threads, they conceptually do not share an address space.

Transactions

Transactions on 10,000 messages can take as along as four minutes to publish. A new RabbitMQ feature called Publisher Confirms is more than 100 times faster than the same, but transactional, code. If you are not explicitly required to implement transactions but do need the verification you might consider this option.

The Take-Away

Here are some final takeaways to help you squeeze the greatest performance gains out of your implementation:

  • The new topic routing algorithm optimization is 60 times faster at peak
  • Topic binding patterns using wildcards '*', which matches a single word, is much faster than '#', which matches zero or more words. Wildcard '#' takes longer to process in the routing table than '*'
  • Exchange-to-exchange bindings improve decoupling, increase topology flexibility, reduce binding churn, and help increase performance
  • Exchanges and bindings are very light weight
  • RabbitMQ Publisher Confirms are more than 100 times faster than AMQP transactions
  • After a queue is idle for >=10 seconds it will "hibernate", inducing GC on the queue, resulting in a dramatic decrease in memory required for that queue
  • Worker Queues help parallelize and distribute workloads
  • Distributing worker queues in the Rabbit cluster helps scale
  • Load-balance your topology

This is by no means a thesis on the subject, there are indeed many more patterns, topologies and performance details to consider. A strategy, as always, depends on so many factors but I hope this encapsulates enough to help or at least get one thinking in the right directions.

Get It

RabbitMQ Source on GitHub
RabbitMQ Binary Downloads and Plugins
Erlang Downloads
Spring AMQP API for RabbitMQ in Java and .NET
Hyperic to monitor RabbitMQ
Maven


<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

Share/Save

Posted in AMQP, Broker Topology, Cloud, Erlang, Messaging, Open Source, Performance, RabbitMQ, Scalability | No Comments »

Routing, Performance and Scalability with RabbitMQ

Posted by Helena Edelson on 7th April 2011

My latest post on the Spring Team blog
Routing Topologies for Performance and Scalability with RabbitMQ

Share/Save

Posted in AMQP, Cloud, Erlang, JMS, Messaging, Open Source, Performance, RabbitMQ, Scalability | No Comments »

Why RabbitMQ for Cloud?

Posted by Helena Edelson on 2nd February 2011

We know that Messaging itself affords moving from a SQL -> Database -> File System model and into the Network layer itself with:

  • Logical decoupling
  • Physical decoupling
  • Temporal decoupling

but I’ve been asked why I would choose implementing RabbitMQ in a cloud solution. This is a big topic, and there is a great deal more I’d say conversationally but for a relatively quick overview let’s start from the ground up.

Why AMQP?

AMQP is an open, wire-level binary protocol for Message Orientated Middleware that was created by and continues to developed collaboratively for users in mission-critical real-world scenarios. AMQP solutions are deployed to high performance clustering and grid computing problems and is developed for typical business scenarios:

1. distributes data to hundreds of clients in real time across a global network
2. accepts workloads to process from those clients and handles them reliably in a clustered transaction processing engine
4. connects to other partners over the Internet to exchange data
5. carries out all these functions in a secure manner which would pass an audit
6. monitors application traffic to build up a real-time activity

AMQP implementations offer fire and forget where you can absolutely trust that your messages will reach their destination, extremely fast delivery of data to a large number of consumers (publish sub-event notification), and
secure, firewall friendly file transfer.

“AMQP is a good fit for cloud MaaS and other wide-area, Internet-based messaging purposes, because it defines the wire protocol and communication semantics without restricting the details of the messaging system implementation or the API. AMQP’s goal is to enable interoperability among disparate vendors’ platforms.”
- Roy Shulte (Gartner), Review of AMQP and other protocol options.

More: amqp.org

Why Erlang?

All major implementations of AMQP such as Open AMQP, Qpid and RedHat MRG are built on a foundational technology. All but one are built on C++. RabbitMQ is built on Erlang. Erlang insures concise binary matching, has built-in support for concurrency, distribution and fault tolerance, has a built-in distributed database, is open source, and operates on many platforms. Erlang is also a runtime environment, similar to the java virtual machine, allowing code compiled on one architecture to run anywhere. The runtime system also allows code in a running system to be updated without interrupting the program.

More: erlang.org

Why RabbitMQ?

Many large-scale projects are using RabbitMQ for complex, scalable, cloud topologies, such as Observatories Initiative (OOI), NASA Nebula, Hiroku, Unique Identification Authority of India (200 million messages per day peak), and CERN because

  • It’s wicked fast and reliable
  • Supports security and transactions, persistence, client acks, and flow control
  • Widely proven and used heavily on Amazon EC2
  • It is open source with support services and several very active user lists
  • Hugely interoperable
    • RabbitMQ brokers run on many platforms and offer several clients (Java, .net and others), all which speak AMQP protocol to each other. Anyone can write a RabbitMQ client in any language: Ruby, Java, Spring, Python, C, C#, Perl, Erlang, Lisp, Haskell, PHP, _your_language_here.
  • Easily clusterable
  • Works with several protocols: AMQP, STOMP, XMPP, 0MQ
  • Highly-flexible topologies
  • Can work with Pacemaker and associated tools to provide High Availability
  • Supports several extension points via plug-ins, e.g. exchange types and backing queue storage
  • Provides management and monitoring of brokers
    • vFabric Hyperic
    • RabbitMQ command line tool and plugin for real-time activity and management
  • Messaging services hosted in the cloud with RabbitMQ as part of overall cloud infrastructure services are highly-supported

An open source RabbitMQ solution stack might look like this: Spring, Tomcat, Hibernate, MySQL, Hadoop, MapReduce, Chef, Apache httpd, RabbitMQ.

More: rabbitmq.com

Resource List And Use Cases

Share/Save

Posted in AMQP, Cloud, Cloud Ops, Concurrency, Erlang, Messaging, Open Source, RabbitMQ | No Comments »

Intro To RabbitMQ for Java

Posted by Helena Edelson on 26th November 2010

Nice intro from the Rabbit team: http://www.rabbitmq.com/tutorial-one-java.html

Share/Save

Posted in Java, Messaging, Open Source, RabbitMQ | No Comments »

Open Artificial Intelligence for the Cloud – Initial Diagram

Posted by Helena Edelson on 20th September 2010

This is a working (initial) idea of where an AI appliance might fit into a virtualized, elastic environment. I will update this concept soon as I created it early this summer and have many changes to now make to it. The parent post: Open Source Artificial Intelligence for the Cloud

© Copyright – not for re-use.

Share/Save

Posted in AI, Artificial Intelligence, Cloud, Cloud Ops, Messaging, Open Source, RabbitMQ, Spring | No Comments »

Open Source Artificial Intelligence for the Cloud

Posted by Helena Edelson on 18th September 2010

The idea of introducing intelligent computing so that software can make decisions autonomously and in real time is not new but it is relatively new to the cloud. Microsoft seems to be the strongest force in this realm currently but this is not open source. The idea of clouds that manage themselves steps way beyond Cloud Ops as we know it, and is the only logical, and necessary, next step.

I’ve been researching various open source AI platforms and learning algorithms to start building a prototype for cloud-based learning and action for massive distributed Cloud Ops systems and applications. Once could off er an AI cloud appliance eventually but I will start with a prototype and build on that using RabbitMQ (an AMQP messaging implementation), CEP, Spring, Java as a base. I’ve been looking into OpenStack, the open source cloud computing software being developed by NASA, RackSpace, and many others. Here is the future GitHub repo: http://github.com/helena/OCAI.

Generally to do these sort of projects you need a large amount of funding and engineers with PhD’s, none of which I have. So my only alternative is to see how we might take this concept and create a light weight open source solution.

Imagine having some of these (this being not a comprehensive list) available to your systems:

AI Technology

  • Planning and Scheduling
  • Heuristic Search
  • Temporal Reasoning
  • Learning Models
  • Intelligent Virtual Agents
  • Autonomic and Adaptive Clustering of Distributed Agents
  • Clustering Autonomous Entities
  • Constraint Predictability and Resolution
  • Automated Intelligent Provisioning
  • Real-time learning
  • Real-world action
  • Uncertainty and Probabilistic Reasoning
  • Decision Making
  • Knowledge Representation

I will go into these topics further in future posts.

About me: I am currently an engineer at SpringSource/VMware on the vFabric Cloud Application Platform side of things, however this project is wholly outside of that, I feel the need to pursue something I’m interested in. I work with the RabbitMQ team and a member of the AMQP Working Group.

Share/Save

Posted in AI, AMQP, Artificial Intelligence, CEP, Cloud, Cloud Ops, Java, Messaging, Open Source, RabbitMQ, Software Development, Spring | No Comments »

Installing Erlang and RabbitMQ on RedHat

Posted by Helena Edelson on 13th September 2010

I am setting up a thin-provisioned vm running RedHat to host erlang and RabbitMQ for our QA team to test my code. Having not done anything on RedHat (just Ubuntu) for a long time I had forgotton everything and yet it was still incredibly simple:

Find the latest release – this was at the time of my install. The path will give you the right idea for the current version:

yum install erlang

For some reason yum has a pretty old version so you may want to manually install it. Choose your RabbitMQ server version -  2.1.0 was the current for this install.

wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.1.0/rabbitmq-server-2.1.0-1.noarch.rpm

rpm –install rabbitmq-server-2.0.0-1.noarch.rpm

Then I verified in a few directories that RabbitMQ was installed, and ran

/sbin/service rabbitmq-server start

Verify that your RabbitMQ node is running:

rabbitmqctl status

If you have no nodes up rabbitmqctl will run but tell you no nodes are up to report status on. Simply restart at least one node.

If you want RabbitMQ to start after everything else on startup/reboot do:

ln -s /etc/rc3.d/S99rabbitmq-server ../init.d/rabbitmq-server

Share/Save

Posted in Erlang, Messaging, RabbitMQ | No Comments »

RabbitMQ 2.0.0 Released

Posted by Helena Edelson on 25th August 2010

More details here.

Release posts are about all I have time for these days…been heads down for a month getting an amqp/rabbitmq thing ready for release.

Share/Save

Posted in AMQP, Erlang, Messaging, RabbitMQ | No Comments »

RabbitMQ and Erlang Intallation for Mac OSX

Posted by Helena Edelson on 11th June 2010

I’ve been a linux developer for years so I was all thumbs having to look up everything when setting up a new dev environment on a mac. Here’s some tips for setting up a local RabbitMQ server to play with (i.e. not using macports locally, not the EC2 install etc):

Requirements

  1. Apple XCode to compile Erlang with gcc. The XCode download could not take longer. Start it, go build an enterprise app in ROO, deploy it to the cloud. When you come back it may have completed.
  2. wget from http://ftp.gnu.org/pub/gnu/wget
  1. tar -xzf wget-{version}.tar.gz
  2. cd wget{version}
  3. ./configure
  4. make
  5. sudo make install

Install and build Erlang

First, Check for the latest Erlang GA Release and note the version.

  1. cd to the dir you wish to install into
  2. wget http://erlang.org/download/otp_src_{version}.tar.gz
  3. tar xzvf otp_src_{version}.tar.gz
  4. cd otp_src_{version}
  5. NOTE: run ./configure –help to see what opts you want to run with
  6. ./configure
  7. make
  8. sudo make install

Install RabbitMQ Server

First check the latest version of RabbitMQ Server and note the filename.

  1. wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.3.1/rabbitmq-server-generic-unix-{version}.tar.gz
  2. tar xzvf rabbitmq-server-generic-unix-{version}.tar.gz
  3. Complete the install and config from http://www.rabbitmq.com/install.html#generic-unix, making sure the logs, clustering, data and config dirs are set up
  4. Start RabbitMQ: run rabbit../sbin/rabbitmq-server

Infinitely easier on Ubuntu.

Share/Save

Posted in Cloud, Erlang, Messaging, RabbitMQ, Spring ROO | No Comments »

Future Posts: Esper, RabbitMQ for Cloud Messaging

Posted by Helena Edelson on 9th May 2010

If I had more time I would add a few new posts on esper and rabbitmq for messaging in the cloud but I just got home from 2 weeks of coast-to-coast consulting and technology presentations. Pretty exhausting. I hope to have time to post on these and a few other fun technologies. I’ve got some code samples and tests I might post with that, plus some cools stuff on esper queries with Spring Integration.

Share/Save

Posted in Concurrency, Erlang, Java, Messaging, RabbitMQ | 2 Comments »

JMS ActiveMQ Broker Topologies and High-Availability Configuration

Posted by Helena Edelson on 22nd December 2009

Pure Master Slave BrokerTopology
Pure Master Slave Simplified Topology
I am not actually going to go into Broker topologies, there are many great resources for that such as this by Bruce Snyder: http://www.slideshare.net/bruce.snyder/messaging-with-activemq-presentation or http://activemq.apache.org/topologies.html, all great stuff. This example uses a store and forward topology, or, distributed queues, and incorporates basic authentication:

My use case was to handle down JMS Servers. What I needed to do was implement failover as well as master slave strategies and a topology for message redundancy in case of hardware failure, etc. The client could not have any message loss. With failover, you can see how ActiveMQ switches from the main broker to the second, third, etc on failure. I have a case of four JMS servers in production, each server it is on is load balanced.

There are just a few configurations to add or modify in order to set up JMS Failover with Master/Slave for your broker topology. Here is a basic configuration. For this use case, all JMS servers are configured as standalone versus embedded.

I. Client URI

You will need to add the Failover protocol, either with a basic URI pattern or a composite. In this use case, there are load balanced servers in Production and multiple Development and QA environments which require different configurations for master/slave and failover.

In your application’s properties file for messaging add a modified version of this with your mappings:
activemq.broker.uri=failover://(tcp://localhost:61616,tcp://slaveh2:61616,tcp://master2:61616,tcp://slave2:61616,network:static://(tcp://localhost:61616,tcp://master2:61616,tcp://slave2:61616))?randomize=false

Note: I set connections as locked down (static) communication configurations vs multicast or dynamic discovery so that I know exactly what servers can communicate with each other and how. Also this is assuming you have one set per environment to account for mapping the appropriate IP’s in development, qa, production, dr, etc.

Note: Do not configure networkConnections for master slave, they are handled on the slave with the following configuration:
<masterConnector remoteURI= "tcp://masterhost:61616" userName="wooty" password="woo"/>

II. Spring Configuration

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<constructor-arg value="${amq.broker.uri}"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
</property
</bean>

III. Broker Configuration

Master

<broker brokerName="{hostname}" waitForSlave="true" xmlns="http://activemq.apache.org/schema/core" dataDirectory="${activemq.base}/data">
<networkConnectors>
<!-- passed in by the client broker URI so you can easily manager per environment: sweet -->
</networkConnectors>

<transportConnectors>
<!-- TCP uses the OpenWire marshaling protocol to convert messages to stream of bytes (and back) -->
<transportConnector name="tcp" uri="tcp://localhost:61616?trace=true" />
<transportConnector name="nio" uri="nio://localhost:61618?trace=true" />
<!-- <transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="http" uri="http://localhost:61613"/
<transportConnector name="https" uri="https://localhost:61222"/> -->
<transportConnectors>
</transportConnectors>

<!-- Basic security and credentials -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="admin, publishers,consumers"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

..more configuration
</broker>

Slave: for ActiveMQ 4.1 or later which also allows for authentication as show below

<broker brokerName="{hostname}Slave" deleteAllMessagesOnStartup="true" xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>

<services>
<masterConnector remoteURI= "tcp://masterhost:62001" userName="wooty" password="woo"/
</services>
<!-- Basic security and credentials -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="manager" groups="admin, publishers,consumers"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

</broker>

Share/Save

Posted in ActiveMQ, Java, JMS, JMS Broker, Messaging, Spring JMS | No Comments »

Decoupling Asynchronous Messaging With Spring JMS and Spring Integration

Posted by Helena Edelson on 4th October 2009

The importance of decoupling in applications is vital but it is not easy to do it well, I am constantly working to improve my strategies. Even more important is the role of Messaging in an enterprise context and in the design. I think Message-driven architecture is perhaps the most important to integrate into applications in terms of its scope of applicability and how it lends to scalability. In this vein, Spring Integration becomes a highly intriguing element worthy of study, testing, and integration. I barely touch on Spring Integration here, particulary in its standard usage but simply use a few classes programmatically to decouple JMS implementation from its clients.

Messaging and Decoupling

Messaging is everywhere and so subtle we are not aware of it, it just happens all around us, all the time. Consider this: in Genetics on a cellular level, which is nongranular in the scope of genetics itself, the major elements in a cell communicate, but how? They are not connected by any physical construct save the mutual environment they are in so how do they do it? They message each other with signals, receptors and a means to translate those messages, which contain instructions.

In Gene expression, DNA/RNA within eukaryote cells (think systems within systems within systems…elements moving in space and time under specific, ever changing constraints and environmental fluctuations (put that in your Agile timebox!) ) communicate by transmitting messages, intercepting, translating and even performing message amplification. There are specialized elements, mRNA specifically, Messenger RNA, which are translated by signal recognition particles… Cool, right? But this happens all around us, outside, in space, everywhere. And it is all decoupled, and therein lies the beauty of messaging.

So what about our applications? Here is one very simple, isolated application of using Spring Integration ( a low-level usage, not very sophisticated ) to decouple your client and server messaging code:

So I wrote a little java package that integrates Spring JMS, ActiveMQ, Spring Integration and Flex Messaging for the front end which hooks into either BlazeDS or Livecycle Data Services. I had a bunch of constraints to solve for such as all Destinations had to come from the database, there were lifecycle issues as far as timing of element initializations with the IoC bean creation and Flex Messaging elements being created and having what they needed such as the Flex Messaging adaptors which I resolved by flex java bootstrapping. In another post I will go into the JMS package further. For the topic here let’s focus on the JMS-Spring JMS-Spring Integration bridge.

The image to the left shows the layout of my jms package to facilitate the mapping. In this system, messages come in from 2 areas: the client and java services on the server. Complicated systems will have many more but let’s talk about the 2 that most would have.  The client sends messages to the server that are both user messages and operational messages by the system. Java services send messages when certain business rules and criteria are triggered, passing business data and any message-aware object could be sending messages.

Sending on the server

To insure proper usage I created an interface that a service must implement to send to ActiveMQ, called JMSClientSupport. Note all code in this post is simplified. Here, I actually have it returning a validation message if errors occurred so that a business service developer could implement handling per requirements.

A Business Entity
public class Foo implements Serializable {...}

A Service
public class FooServiceImpl implements BusinessService, FooService, SMTPClientSupport, JMSClientSupport {
public void insert(Foo foo) {
..//do some important business stuff
publish(foo);
}
public void publish(Object object) {
if ((someBusinessValidation((Foo) object)) {
jmsService.send(destinationId, object);
}
}
}


public interface JMSClientSupport {
void publish (Object object);
}

Sending from the Client

You could have any client sending messages of any nature to the server. In this case I am using Flex. Messages of type <T> are wrapped on the client as an IMessage {AsyncMessage,CommandMessage etc}. When these messages make their way through Blaze or Livecycle, I have it wired to hit this java adapter which is represented in a Hash per FlexDestination for 1:1 FlexDestination : JMS Destination by Flex.

For this example I am implementing the JMS MessageListener to show tight coupling as well as decoupling:

public class FlexMessagingAdapter extends MessagingAdapter implements MessageListener {
// Invoked by Flex when a message comes in from the client to this adapter's Destination
public Object invoke(Message message) {
// a custom interceptor that extracts partition Destination info like ActiveMQ message group or subtopics like STOCKS.NASDAQ for more specific routing
String partition = new DestinationPartitionInterceptor(message).intercept();
jmsService.send(destination, new IntegrationMessageCreator (message, partition));
}
return null;
}


// Decoupled: Invoked when a Message is received from the Spring Integration channel
public void handleMessage(org.springframework.integration.core.Message<?> message) {....}

// Sets the Spring Integration MessageChannel for sending and receiving messages
public void setMessageChannel(MessageChannel messageChannel) {
this.messageChannel = messageChannel;
}


// Tightly coupled with JMS by the MessageListener.onMessage() method
public void onMessage(javax.jms.Message jmsMessage) {
flex.messaging.messages.Message message = new IntegrationMessageCreator(jmsMessage).createMessage(getDestination().getId(), getSubscribers());
if (getMessageService().getMessageBroker().getChannelIds().contains("streaming-amf")) {
MessageBroker broker = MessageBroker.getMessageBroker(null);
broker.routeMessageToService(message, null);
} else {
getMessageService().pushMessageToClients(message, true);
}
}}

The Transformer: Where Messages Intersect

I have a second post that show an even more decoupled messaging strategy with Spring Integration but this is purely a a basic idea using Flex Messaging, Spring Integration, Spring JMS and ActiveMQ. I will post the more broad strategy next :)

Step 1: Client messages are transformed here by extending the JMS MessageCreator. In this class I pull out the data from any Object type but specifically Flex Message and a JMSMessage types.

public class IntegrationMessageCreator implements MessageCreator {
// a few constructors here to handle multiple message types: JMSMessage, Flex Message, Object message, etc

private MessageBuilder createBuilder() {
MessageBuilder builder = null;
if (this.object != null) {
builder = MessageBuilder.withPayload(object);
} else if (this.flexMessage != null && flexMessage.getBody() != null) {
builder = MessageBuilder.withPayload(flexMessage.getBody()).copyHeaders(flexMessage.getHeaders());
}
// ActiveMQ Message Groups
if (this.partition != null) builder.setHeader(MessageConstants.Headers.JMSXGROUPID, partition);

return builder;
}

// to JMS
public javax.jms.Message createMessage(Session session) throws JMSException {
return new IntegrationMessageConverter().toMessage(createBuilder().build(), session);
}

// To Flex
public flex.messaging.messages.Message createMessage(String destinationId, int subscribers) {
Message integrationMessage = (Message) new IntegrationMessageConverter().fromMessage(this.jmsMessage);

flex.messaging.messages.Message flexMessage = new AsyncMessage();
flexMessage.setBody(integrationMessage.getPayload());
flexMessage.setDestination(destinationId);
flexMessage.setHeaders(integrationMessage.getHeaders());
// …and other good jms to flex data

return flexMessage;
}
}

The Converter


import org.springframework.integration.jms.HeaderMappingMessageConverter;
import org.springframework.integration.core.Message;
import javax.jms.Session;

public class IntegrationMessageConverter extends HeaderMappingMessageConverter {

// Converts from a JMS Message to an Integration Message. You should do a try catch but I cut it out for brevity
public Object fromMessage(javax.jms.Message jmsMessage) throws Exception {
return (Message) super.fromMessage(jmsMessage);
}

// Converts from an Integration Message to a JMS Message. You should do a try catch but I cut it out for brevity
public javax.jms.Message toMessage(Object object, Session session) throws Exception {
return jmsMessage = super.toMessage(object, session);
}
}

JMS Asynchronous Reception

In my jmsConfig.xml I configured one Spring MessageListenerAdapter which I have wired with a message delegate, a POJO and its overloaded handleMessage method name:

<bean id=”messageListenerAdapter”>
<property name=”delegate” ref=”defaultMessageDelegate”/>
<property name=”defaultListenerMethod” value=”handleMessage”/>
<property name=”messageConverter” ref=”simpleMessageConverter”/>
</bean>

As the application loads and all Spring beans are initialized, I initialize all of my JMS Destinations. As I do this, I also initialize a MessageListenerAdapter for each Destination. I have a stateless JMSService, which is called by another service, MessagingGateway, to initialize each Destination and which calls PollingListenerContainerFactory to create child MessageListenerAdaptors for each Destination. The adapters are configured based on an abstract parent configuration:

<bean id=”abstractListenerContainer” abstract=”true” destroy-method=”destroy”>
<property name=”connectionFactory” ref=”pooledConnectionFactory”/>
<property name=”transactionManager” ref=”jmsTransActionManager”/>
<property name=”cacheLevel” value=”3″/>
<property name=”taskExecutor” ref=”taskExecutor”/>
<property name=”autoStartup” value=”true”/>
</bean>

Snippet from PollingListenerContainerFactory:

/**
* Gets the parent from the IoC to reduce runtime config and
* resources to create children.
* <p/>
* DefaultMessageListenerContainer is Responsible for all threading
* of message reception and dispatches into the listener for processing.
* Supports dynamic scaling for a higher during peakloads.
*
* @param destination
* @param messageListener
* @return
*/
public static DefaultMessageListenerContainer createMessageListenerContainer(Destination destination, MessageListener messageListener) {

ChildBeanDefinition childBeanDefinition = new ChildBeanDefinition(“abstractListenerContainer”, configureListenerContainer(destination, messageListener));

String beanID = IdGeneratorUtil.getStringId();
ConfigurableListableBeanFactory beanFactory = ApplicationContextAware.getConfigurableListableBeanFactory();
((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(beanID, childBeanDefinition);

DefaultMessageListenerContainer container = (DefaultMessageListenerContainer) ApplicationContextAware.getBean(beanID);
container.setDestination(destination);
return container;
}

/**
* Configures the child listener, based on the parent in jmsConfig.xml.
* <p>Configures Queue or Topic consumers:
* Queue: stick with 1 consumer for low-volume queues: default is
* Topic: there’s no need for more than one concurrent consumer
* Durable Subscription: Only 1 concurrent consumer supported
* <p/>
* props.addPropertyValue(“messageSelector”, “”); sets a message selector for this listener
*
* @param destination
* @param messageListener
* @return
*/
private static MutablePropertyValues configureListenerContainer(Destination destination, MessageListener messageListener) {
MutablePropertyValues props = new MutablePropertyValues();
props.addPropertyValue(“destination”, destination);
props.addPropertyValue(“messageListener”, messageListener);

// Enable throttling on peak loads
if (destination instanceof Queue) {
props.addPropertyValue(“maxConcurrentConsumers”, 50); // modify to needs
}
// Override default setting Point-to-Point (Queues)
if (destination instanceof Topic) {
props.addPropertyValue(“pubSubDomain”, true);
}

return props;
}

this is overkill to this topic but its cool stuff. So we now have a JMS listener for asynchronous JMS reception as well as flex, but now let’s look at the message delegate we wired into the MessageListenerAdapter:

public interface MessageDelegate {

void handleMessage(String message);

void handleMessage(Map message);

void handleMessage(Serializable message);
}

Pretty simple, right? It’s a POJO with absolutely no JMS code whatsoever for asynchronous message reception. How does it work? Spring abstracts the JMS code and calls it behind the scenes, if you look at the source code for Spring’s SimpleMessageConverter, it does the fromMessage() toMessage() handling for you and throws the resultant “message” into the appropriate overloaded method above. Now this is great for simple message abstraction but the above with JMS-Flex and Spring Integration is an example of more complicated handling. With clients you often need to translate and transfer the data from message type 1 to message type 2. In the adapter code above, you would use the handleMessage() method to get the message from Spring Integration and into the message type of your choice, here, a Flex message.

Share/Save

Posted in ActiveMQ, Annotations, Application Cofiguration, Broker Topology, Configuration Management, Flex, Java, JMS, Messaging, Software Development, Spring JMS | No Comments »

SuMQ Messaging Framework

Posted by Helena Edelson on 24th March 2009

I’ve just started staging my new project, SuMQ, and appreciate patience as this will take me a while to get the code standardized and on google.

SuMQ is a light-weight enterprise messaging framework built in Java, leveraging Spring, JMS, and ActiveMQ. It plugs into Flex Messaging via BlazeDS for the client. This can also be configured for other clients aside from Flex.

The sample will be ready for clustered BlazeDS instances and load balanced Application Servers.

code.google.com/p/sumq

Share/Save

Posted in ActiveMQ, Annotations, AOP, Application Cofiguration, BlazeDS & LCDS, Broker Topology, Java, JMS, JMS Broker, Messaging, Spring, Spring JMS | No Comments »

 
a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page");