AMQP and RabbitMQ basic.ack, basic.reject and requeue

Exposing basic.ack and basic.reject behavior in an AMQP-based messaging abstraction can be critical.

Some misunderstandings about ack/reject: first, on the issue of either being an ‘extra’ operation – An AMQP ack, reject, or Rabbit nack to an AMQP broker is not equal to publishing a message to a broker. On the issue of this being an extraneous operation, when a consumer receives a message (removing it from a queue), the Rabbit server is aware of that and an acknowledge is needed to confirm this. Automatically ack-ing all received messages is not a case I agree with, no matter what abstraction you provide, nor is it good practice. It should to be configurable and provide the ability to do error handling in unsuccessful application scenarios.

basic.ack
Using the rabbit java client: where false is for multiple acks

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

basic.reject

channel.basicReject(delivery.getEnvelope().getDeliveryTag(), requeue);

Maybe an application is in a post-stop/pre-destroy phase. Or perhaps the message content was inappropriate (an error in the sender that composed the message) – nothing whatsoever to do with routing, AMQP, Rabbit but a user/programmer error, a lifecycle issue, and so forth.

basic.reject and requeue
basic.reject could either redeliver the message or dead-letter it

a) If you supply requeue=false the broker will discard the message – This is useful from a error handling point of view; if your application cannot process a particular message, you can get rid of it.

b) if you supply requeue=true, it will release it back on to the queue, to be delivered again – This  is useful for a consumer to decide not to deal with a delivered message, and place it back on the queue.

Rabbit re-enqueues the message and treats it as though it were completely new. This means a consumer can receive it again when message it has rejected. This could be a consumer for a ‘vcaf instance queue’ where for some reason, maybe it is dealing with a fatal error and going to be restarted, should send a basic.reject and requeue=true so that when it is restarted and initialized, it can successfully re-receive the message and process it.

AsyncConsumer consumer = new  AbstractQueueingConsumer(connectionFactory, queue, autoAck) {
     @Override public void handle(Channel channel,  QueueingConsumer.Delivery delivery) {
             ...application logic...
             if (some application case...) reject(channel,  delivery, true); //boolean requeue
             else acknowledge(channel, delivery);
        }
};

Share/Save

Why RabbitMQ for Cloud?

We know that Messaging itself affords moving from a SQL -> Database -> File System model and into the Network layer itself with:

  • Logical decoupling
  • Physical decoupling
  • Temporal decoupling

but I’ve been asked why I would choose implementing RabbitMQ in a cloud solution. This is a big topic, and there is a great deal more I’d say conversationally but for a relatively quick overview let’s start from the ground up.

Why AMQP?

AMQP is an open, wire-level binary protocol for Message Orientated Middleware that was created by and continues to developed collaboratively for users in mission-critical real-world scenarios. AMQP solutions are deployed to high performance clustering and grid computing problems and is developed for typical business scenarios:

1. distributes data to hundreds of clients in real time across a global network
2. accepts workloads to process from those clients and handles them reliably in a clustered transaction processing engine
4. connects to other partners over the Internet to exchange data
5. carries out all these functions in a secure manner which would pass an audit
6. monitors application traffic to build up a real-time activity

AMQP implementations offer fire and forget where you can absolutely trust that your messages will reach their destination, extremely fast delivery of data to a large number of consumers (publish sub-event notification), and
secure, firewall friendly file transfer.

“AMQP is a good fit for cloud MaaS and other wide-area, Internet-based messaging purposes, because it defines the wire protocol and communication semantics without restricting the details of the messaging system implementation or the API. AMQP’s goal is to enable interoperability among disparate vendors’ platforms.”
- Roy Shulte (Gartner), Review of AMQP and other protocol options.

More: amqp.org

Why Erlang?

All major implementations of AMQP such as Open AMQP, Qpid and RedHat MRG are built on a foundational technology. All but one are built on C++. RabbitMQ is built on Erlang. Erlang insures concise binary matching, has built-in support for concurrency, distribution and fault tolerance, has a built-in distributed database, is open source, and operates on many platforms. Erlang is also a runtime environment, similar to the java virtual machine, allowing code compiled on one architecture to run anywhere. The runtime system also allows code in a running system to be updated without interrupting the program.

More: erlang.org

Why RabbitMQ?

Many large-scale projects are using RabbitMQ for complex, scalable, cloud topologies, such as Observatories Initiative (OOI), NASA Nebula, Hiroku, Unique Identification Authority of India (200 million messages per day peak), and CERN because

  • It’s wicked fast and reliable
  • Supports security and transactions, persistence, client acks, and flow control
  • Widely proven and used heavily on Amazon EC2
  • It is open source with support services and several very active user lists
  • Hugely interoperable
    • RabbitMQ brokers run on many platforms and offer several clients (Java, .net and others), all which speak AMQP protocol to each other. Anyone can write a RabbitMQ client in any language: Ruby, Java, Spring, Python, C, C#, Perl, Erlang, Lisp, Haskell, PHP, _your_language_here.
  • Easily clusterable
  • Works with several protocols: AMQP, STOMP, XMPP, 0MQ
  • Highly-flexible topologies
  • Can work with Pacemaker and associated tools to provide High Availability
  • Supports several extension points via plug-ins, e.g. exchange types and backing queue storage
  • Provides management and monitoring of brokers
    • vFabric Hyperic
    • RabbitMQ command line tool and plugin for real-time activity and management
  • Messaging services hosted in the cloud with RabbitMQ as part of overall cloud infrastructure services are highly-supported

An open source RabbitMQ solution stack might look like this: Spring, Tomcat, Hibernate, MySQL, Hadoop, MapReduce, Chef, Apache httpd, RabbitMQ.

More: rabbitmq.com

Resource List And Use Cases

Share/Save

Building RabbitMQ Java Client Source from GitHub on Mac

For anyone that thinks of Python as a large snake, here are the steps I took to set up building the Rabbitmq java client on my mac with GitHub sources.

First, macs now come OOTB with python installed so you’re good there unless you wish to update the package. Simply type ‘python’ at the command line to see what you have. Go to http://pypi.python.org/pypi/simplejson#downloads to get the URI to pull for the source.

hedelson$ wget http://pypi.python.org/packages/source/s/simplejson/simplejson-{latest.version}.tar.gz#md5={autogenerated-hash}
hedelson$ cd /path/to/simplejson-2.1.2/
hedelson$ sudo python ez_setup.py
hedelson$ python setup.py build
hedelson$ python setup.py install

If you don’t have this already, create a local repo for rabbitmq-codegen:

hedelson$ git clone git://github.com/rabbitmq/rabbitmq-codegen.git
hedelson$ cd rabbitmq-codegen

To set up a fresh RabbitMQ java client repo from github and build from source (read only. From a fork is a bit different)

hedelson$ git clone git://github.com/rabbitmq/rabbitmq-java-client.git
hedelson$ cd /path/to/rabbitmq-java-client
hedelson$ ant dist

Share/Save

JInterface Can not connect to Peer Node

I ran into a nasty issue with jinterface when making a rapid succession of calls at the start of the application. I need to move this to a caching solution of the connections today but here is a quick solution for anyone else hitting their head against the wall with this. Jinterface swallows exception causes and it is very hard to hunt down the true underlying error. You will see the same error message for n-number of things that can go wrong farther down the stack.

First I’ve not added a boolean setter for if you want to use unique each time or not, so that’s something you can add. Also there’s a connection monitor for shared connections. I’ve left out all error handling for brevity. And finally, also for brevity, just used one constructor using the cookie to keep this shorter.

public class ConnectionFactory {

    private String cookie;
    private String selfNodeName;
    private String peerNodeName;
    private OtpSelf otpSelf;
    private OtpPeer otpPeer;
    private OtpConnection otpConnection;
    private final Object connectionMonitor = new Object();

    public ConnectionFactory(String selfNodeName, String cookie, String peerNodeName) {
		this.selfNodeName = selfNodeName;
		this.cookie = cookie;
		this.peerNodeName = peerNodeName;
                initialize();
    }

    public void initialize() {
		// assertions...
		String selfName = this.selfNodeName + "-" + UUID.randomUUID().toString();

		try {
			this.otpSelf = new OtpSelf(selfName.trim(), this.cookie);
		} catch (IOException e) {
			//
		}
		this.otpPeer = new OtpPeer(this.peerNodeName.trim());
	}

	public OtpConnection createConnection() {
		synchronized (this.connectionMonitor) {
			if (this.otpConnection == null) {
				try {
					doCreateConnection();
				} catch (IOException e) {
					//
				} catch (OtpAuthException e) {
                                 //
                         }
                  }
		return this.otpConnection;
		}
	}

	public void doCreateConnection() throws IOException, OtpAuthException {
		synchronized (this.connectionMonitor) {
			if (this.otpConnection != null) {
                             otpConnection.close();
			}

                       this.otpConnection = otpSelf.connect(otpPeer);
		}
	}

	public void destroy() {
		resetConnection();
	}

	public void resetConnection() {
		synchronized (this.connectionMonitor) {
			if (this.otpConnection != null) {
				otpConnection.close();
			}
			this.otpConnection = null;
		}
	}
}

Share/Save

Configuring @Configuration ApplicationContext for Spring Test Framework @ContextConfiguration

Here is a @Configuration class, RabbitTestConfiguration, truncated for the sake of a simple example. Bootstrapping this for testing using Spring’s Test Framework is simple. First make sure you have @ImportResource mapping via classpath to your xml which here has 2 simple declarations:


<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
xmlns="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<component-scan base-package=”org.hyperic.hq.plugin.rabbitmq”/>

<property-placeholder location=”/etc/test.properties”/>

</beans:beans>

Next, make sure you remove the @Configuration annotation declared at the class level. We will be bootstrapping this a different way.

@ImportResource("classpath:/org/hyperic/hq/plugin/rabbitmq/*-context.xml")
public class RabbitTestConfiguration {

    private @Value("${hostname}") String hostname;

    private @Value("${username}") String username;

    private @Value("${password}") String password;

    @Bean
    public SingleConnectionFactory singleConnectionFactory() {
        SingleConnectionFactory connectionFactory = new SingleConnectionFactory(hostname);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
// ... shortened for brevity
}

Now let’s build an abstract Spring base test

/**
 * AbstractSpringTest
 * @author Helena Edelson
 */
@ContextConfiguration(loader = TestContextLoader.class)
@RunWith(SpringJUnit4ClassRunner.class)
public abstract class AbstractSpringTest {
    /** Inheritable logger */
    protected final Log logger = LogFactory.getLog(this.getClass().getName());

    /** Now we can autowire our beans that all child tests will need. Note that they are protected. */
    @Autowired
    protected org.springframework.amqp.rabbit.connection.SingleConnectionFactory singleConnectionFactory;

   @Before
    public void before() {
        assertNotNull("singleConnectionFactory should not be null", singleConnectionFactory);
        //... more assertion checks for other beans, removed for brevity.
    }

And finally, build a test context loader, override customizeContext() and bootstrap your annotational config class. Since the config class bootstraps the minimal context xml config, now we’re all set.

/**
 * TestContextLoader
 * @author Helena Edelson
 */
public class TestContextLoader extends GenericXmlContextLoader {

    @Override
    protected void customizeContext(GenericApplicationContext context) {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        ctx.register(RabbitTestConfiguration.class);
        ctx.refresh();
        /** This is really the key */
        context.setParent(ctx);
        assertTrue(ctx.isRunning());
    }
}

Now all of my Spring test classes can simply extend the base class and freely call the inherited shared beans or declare any @Autowired dependency

public class RabbitGatewayTest extends AbstractSpringTest {
     @Autowired protected Queue marketDataQueue;

    @Test
    public void getConnections() throws Exception {
        com.rabbitmq.client.Connection conn = singleConnectionFactory.createConnection();
        // ... etc
    }
}

Share/Save

Open Artificial Intelligence for the Cloud – Initial Diagram

This is a working (initial) idea of where an AI appliance might fit into a virtualized, elastic environment. I will update this concept soon as I created it early this summer and have many changes to now make to it. The parent post: Open Source Artificial Intelligence for the Cloud

© Copyright – not for re-use.

Share/Save

Open Source Artificial Intelligence for the Cloud

The idea of introducing intelligent computing so that software can make decisions autonomously and in real time is not new but it is relatively new to the cloud. Microsoft seems to be the strongest force in this realm currently but this is not open source. The idea of clouds that manage themselves steps way beyond Cloud Ops as we know it, and is the only logical, and necessary, next step.

I’ve been researching various open source AI platforms and learning algorithms to start building a prototype for cloud-based learning and action for massive distributed Cloud Ops systems and applications. Once could off er an AI cloud appliance eventually but I will start with a prototype and build on that using RabbitMQ (an AMQP messaging implementation), CEP, Spring, Java as a base. I’ve been looking into OpenStack, the open source cloud computing software being developed by NASA, RackSpace, and many others. Here is the future GitHub repo: http://github.com/helena/OCAI.

Generally to do these sort of projects you need a large amount of funding and engineers with PhD’s, none of which I have. So my only alternative is to see how we might take this concept and create a light weight open source solution.

Imagine having some of these (this being not a comprehensive list) available to your systems:

AI Technology

  • Planning and Scheduling
  • Heuristic Search
  • Temporal Reasoning
  • Learning Models
  • Intelligent Virtual Agents
  • Autonomic and Adaptive Clustering of Distributed Agents
  • Clustering Autonomous Entities
  • Constraint Predictability and Resolution
  • Automated Intelligent Provisioning
  • Real-time learning
  • Real-world action
  • Uncertainty and Probabilistic Reasoning
  • Decision Making
  • Knowledge Representation

I will go into these topics further in future posts.

About me: I am currently an engineer at SpringSource/VMware on the vFabric Cloud Application Platform side of things, however this project is wholly outside of that, I feel the need to pursue something I’m interested in. I work with the RabbitMQ team and a member of the AMQP Working Group.

Share/Save

Installing Erlang and RabbitMQ on RedHat

I am setting up a thin-provisioned vm running RedHat to host erlang and RabbitMQ for our QA team to test my code. Having not done anything on RedHat (just Ubuntu) for a long time I had forgotton everything and yet it was still incredibly simple:

Find the latest release – this was at the time of my install. The path will give you the right idea for the current version:

yum install erlang

For some reason yum has a pretty old version so you may want to manually install it. Choose your RabbitMQ server version -  2.1.0 was the current for this install.

wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.1.0/rabbitmq-server-2.1.0-1.noarch.rpm

rpm –install rabbitmq-server-2.0.0-1.noarch.rpm

Then I verified in a few directories that RabbitMQ was installed, and ran

/sbin/service rabbitmq-server start

Verify that your RabbitMQ node is running:

rabbitmqctl status

If you have no nodes up rabbitmqctl will run but tell you no nodes are up to report status on. Simply restart at least one node.

If you want RabbitMQ to start after everything else on startup/reboot do:

ln -s /etc/rc3.d/S99rabbitmq-server ../init.d/rabbitmq-server

Share/Save