Yet Another Distributed Systems Reading List

I started putting together a reading list for engineers fairly new to distributed systems and theory, with little hands-on experience yet. Having worked solely with/on teams of all senior engineers, I am now running a team with very senior but also one or two relatively recent grads.  In compiling this list to cherry pick from, with input from many twitter friends in response to my plea for input, I realized there’s some I’ve not yet read so this is super useful, many thanks to people that replied.

Some pages make this a list of lists or post of posts :) Ping me on twitter if you have more gems to add https://twitter.com/helenaedelson. I hope to make some sense of this at some point, like a ‘start here’ chunk of readings, progressing to the others.

In No Order:

Share/Save

Comparing and Choosing Technologies

Unable to sleep at 2:30 AM, I just read an interesting case study with a large company on how they have started to address Ad Tech applied to TV households vs mobile and web, with frequency capping to viewers, etc. Not surprisingly they use Kafka. It made me think of a situation I was in where someone brought up Solace versus Kafka.

When I listen to people that talk about any technology, I try to really listen to

  • Whether they are speaking from a place of personal experience or from what they have read and surmised from blogs by users that may or may not know what they are doing with that technology, may or may not have applied it properly, and so forth
  • If they have merely played around with that technology or do they have actual production experience
  • And if they have prod experience with that technology was it actually at scale, under real world load

Or, are they talking about a solution where required features are not yet completed – i.e. currently vaporware in a sense. These sort of situations must be taken with care, as opposed to proven ones.

For example, trying to compare Solace to Kafka is comparing apples to oranges. I have seen this sort of comparison attempted many times over the last decade that I have been in messaging, and it (a pun) is fruitless :)

They both serve a different purpose in an architecture, and while they overlap in some functionalities, Solace seems to fit more particular use cases – and handle those use cases very well. While some features are both out of the box with Kafka, and proven in production – at extremely high load and at scale.  No technology is a one size fits all solution. For example I know a production system that uses both an Aerospike cluster to solve some problems and a Cassandra cluster to solve others – both NoSQL stores serving different applications in different areas of the architecture. Taking this a step further, a deployment architecture for one technology, such as Cassandra, would have multiple clusters, one cluster for analytics, one cluster for timeseries, and so forth.

The concept I want to highlight is event-driven architecture, nothing new, and I’ve been working in this for years, but it is quite different from more conventional ETL data flows. Leveraging this architecture can both decouple cleanly and improve speed if done right. In ad tech if you do not respond in time, or correctly, you lose the bid and thus lose money. But this is only one part of the system. It’s not the high-frequency trading use case but it shares some similar requirements.

If you are promised features or an MVP by a team you may be a customer for, including being an internal customer, consider whether that team is motivated enough to have your team/company as a customer, and willing as well as able to do the work quickly to support it. For example, I would dig further into this with their product manager, project manager and engineering lead to find out if they have the a) roadmap, b) priorities, c) resources d) QA, test automation and perf engineers to do this. I wouldn’t want to sign up for it only to find they can’t deliver, or can’t deliver in the necessary time frame.

Things like hadoop or terradata are very nice but you can’t scale with them, and they are not applicable for streaming, event-driven architectures and workloads. You can however scale with Kafka and leverage Kafka to handle the ingestion stream to run analytics and respond back in-line, for example a system run on Kafka that can do distributed RPC, leveraging Akka (or Storm but akka is easier). You can do a lot in 20 ms. For example you can write a system where the consumer sees the produced data in 2 ms tops. A response via Akka remoting for example is another ms hop back. 1 ms response time. There are many ways to solve this of course, and here again, you can use Kafka – Storm, you can use Kafka – Akka and feed to Spark if you can do micro-batching and do not need very low latency analytics response. It all depends on your requirements – what are you solving for, your competition and your constraints.

 

Share/Save

Changes

It seems the word is out that I left DataStax. I made so many friends there, even recruited an old friend, and know many of us will continue to cross paths and collaborate. Thanks to everyone for the great memories.

Over the last year I have spoken at several Spark, Scala and Machine Learning meetups, then internationally at Scala Conferences like Scala Days Amsterdam, many Big Data conferences such as Spark Summit, Strata and Emerging Tech Conferences, all of which I hope to continue to do.

Spark Summit East 2015

Scala Days Amsterdam, 2015



Philly ETE – Emerging Tech 2015

In October 2015 I’ll be speaking at Spark Summit EU, and on the Spark panel at Grace Hopper. Also I’ll be at Strata NYC again, this year hosting the Spark track on Thursday. I’m adding some cool stuff to my Big Data Lambda talk and new code.

What’s next – I’m humbled to have been contacted by many engineers I know and highly respect about working at their companies, where in some cases they are the founders. This is making for a very difficult decision because they are all doing really interesting and exciting work with open source technologies I like. In the meantime I am taking a month or so to enjoy the last part of summer, something I haven’t had time to do for years :)

Share/Save

Spark Streaming with Kafka and Cassandra

This is a lightning post on a future, more in-depth post on how to integrate Spark Streaming, Kafka and Cassandra, using the Spark Cassandra Connector.

First, here are some of the dependencies used below:

1
2
3
4
5
6
7
8
9
10
11
import com.datastax.spark.connector.streaming._
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkEnv, SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.demo.streaming.embedded._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.demo.Assertions

Start by configuring Spark:

1
2
3
4
5
  val sc = new SparkConf(true)
    .set("spark.cassandra.connection.host", SparkMaster)
    .set("spark.cleaner.ttl", "3600")
    .setMaster("local[12]")
    .setAppName("Streaming Kafka App")

Where ‘SparkMaster’ can be a comma-separated list of Cassandra seed nodes.

Using the SparkConf object you can easily configure as well as populate Cassandra with a keyspace, table and data for quick prototyping. This creates the keyspace and table in Cassandra:

1
2
3
4
5
  CassandraConnector(sc).withSessionDo { session =>
    session.execute(s"CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
    session.execute(s"CREATE TABLE IF NOT EXISTS streaming_test.key_value (key VARCHAR PRIMARY KEY, value INT)")
    session.execute(s"TRUNCATE streaming_test.key_value")
  }

Custom embedded class which starts an (embedded local Zookeeper server and an (embedded Kafka server and client (all very basic here):

1
val kafka = new EmbeddedKafka

Create the Spark Streaming context.

1
val ssc =  new StreamingContext(sc, Seconds(2))

One of several ways to insure shutdown of ZooKeeper and Kafka for this basic prototype:

1
SparkEnv.get.actorSystem.registerOnTermination(kafka.shutdown())

Assuming you already have or are creating a new topic in Kafka and are sending messages to Kafka…. but here is a very basic way to do it if not:

1
2
3
4
5
6
7
8
9
10
11
12
def createTopic(topic: String): Unit = 
    CreateTopicCommand.createTopic(client, topic, 1, 1, "0")
 
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
 
    val producer = new Producer[String, String](new ProducerConfig(p))
    producer.send(createTestMessage(topic, sent): _*)
    producer.close()
  }

Where the createTestMessage function just for these purposes generates test data.

Creates an input stream that pulls messages from a Kafka Broker.

1
2
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)

Import the implicits enabling the DStream’s ‘saveToCassandra’ functions:

1
import com.datastax.spark.connector.streaming._

This defines the work to do in the stream. Configure the stream computations (here we do a very rudimentary computation) and write the DStream output to Cassandra:

1
2
3
4
5
6
stream.map { case (_, v) => v }
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("streaming_test", "key_value", SomeColumns("key", "value"), 1)
 
    ssc.start()

Now we can query our Cassandra keyspace and table to see if the expected data has been persisted:

1
2
3
4
5
 import scala.concurrent.duration._
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value")
    awaitCond(rdd.collect.size == sent.size, 5.seconds)
    val rows = rdd.collect
    sent.forall { rows.contains(_)}

And finally, our assertions being successful, we can explicitly do a graceful shutdown of the Spark Streaming context with:

1
ssc.stop(stopSparkContext = true, stopGracefully = true)

Share/Save

Orderly Load-Time Provisioning And Graceful Shutdown of Akka Nodes

Simple (and truncated) example of the CloudExtension’s load-time ordered provisioning and ordered graceful shutdown. Unfortunately this had to be written in an older version of scala and akka – for now. MyNodeGuardian.scala is started in CloudExtension.register() and is a sample of using ProvisioningGuardian which extends OrderedGracefulShutdown: View Gist

Share/Save

Closing Over An Akka Actor Sender In The Receive

Accidental closing over a sender is easy to get wrong but mayhem if you do it. The rule is simply never close over the sender method in a block of code that is potentially executed in another thread, such as a scheduled task or a Future. The trick is to capture the current sender in a val, as illustrated below:

Unsafe


Here are 3 examples of what not to do. It all depends on what is actually done in the receive. For example, if Futures or other threads are involved and a call to forward or sender is executed in callbacks (such as Future.onSuccess) then you have a problem:

1. Calling sender on another thread than the original message in Actor.receive

1
2
3
4
5
6
def receive = {
  case SomeMessage =>
    context.system.scheduleOnce(5 seconds) {
      sender ! DelayedResponse  // Danger!
    }
}

Here the thing to watch out for is { sender ! DelayedResponse } because it is run on another thread than the original message. The sender is associated with current message being processed, and in the Scheduler case illustrated above, several other messages might have been processed during the 5.seconds before the call to the method sender, such that the correct sender is no longer associated with the message that was processed when the scheduled task was triggered.

2. Shared mutable state

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    var state = State()
    def receive = {
      case _ =>
        Future { state = newState }
        otherActor ? message onSuccess { r => state = r }
    }
}

The above code will break your application in weird ways. Because the sender changes for every message this will cause a shared mutable state bug.

3. Calling sender in a Future during Actor.receive could return a null or be the wrong ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ =>  Future { expensiveCalculation(sender) } // painful, and you owe me a beer if you do it
    }
}

The return value of the sender method after processing the current message might be null until the next message is received, but that doesn’t matter. Simply use a val if you close over sender in a a block of code that might be executed by another thread.

Safe


Here are a few examples that are safe, the first because we simply freeze the sender reference:

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
    def receive = {
        case SomeMessage =>
            val requestor = sender
            context.system.scheduleOnce(5 seconds) {
              requestor ! DelayedResponse
           }
   }
}

Or better yet, keep a clean Actor.receive all together – a best practice

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    def receive = {
        case msg: SomeMessage => receiveSomeMessage(sender, msg)
   }
  def receiveSomeMessage(requestor: ActorRef, msg: SomeMessage): Unit = context.system.scheduleOnce(5 seconds) {
        requestor ! DelayedResponse
   }
}

It is safe to close over self and a thread-safe ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ => Future { expensiveCalculation() } onComplete { f => self ! f.value.get }
    }
}

It is safe to close over a fixed value and a thread-safe ActorRef

1
2
3
4
5
6
7
class MyActor extends Actor {
    def receive = {
      case _ =>
        val currentSender = sender // freeze the sender to a val
        Future { expensiveCalculation(currentSender) }
    }
}

Find Out More

Some examples were taken from “Akka Concurrency” by Derek Wyatt, others from the Akka Docs.

Share/Save

Scala 2.10.0 Introduction of Try

One of my favorite things about Scala 2.10 that I’ve been using since this summer is encapsulated in Try.scala. You could remove lines and lines of try catch blocks and horrible (or no) error handling strategies that I have seen strewn around applications over the years. Try gives you a clean, very often one-line, function with error handling built in that is incredibly flexible for the succinctness you can glean from it.

The docs describe it as representing “a computation that may either result in an exception or return a success value. It is analogous to the Either type but encodes common idioms for handling exceptional cases (such as rescue/ensure which is analogous to try/finally)”.

Looking at this simplistically, here are a few samples, the first of which does a very java-like pattern with the try catch block riddled through java code, while the Risky class uses simple one line handling using scala.util.Try.

If you run this you would see the following where the ones showing a scala.util.Success and/or a value result from the fallback:
[info] javaLike [1]
[info] one [None]
[info] two [false]
[info] three [1]
[info] four [1]
[info] five [Failure(java.lang.ArithmeticException: / by zero)]
[info] six [Success(1)]
[info] seven [Success(1)]
[info] eight [Success(1)]

Share/Save

Functional Data Structure Resources

By request, and not exactly the ‘short list’, here are some excellent resources on functional data structures: note that some cost $ but you can view them for 24 hours for a few USD:

These are good too but not directly related:

Share/Save

Build Configuration For Scala 2.10.0-RC1 with Akka 2.1.0-RC1 and Slick 0.11.1

Assuming you know the basics, this is just meant to be a quick abridged shortcut ;) I created a sample project to spin up Scala 2.10.0-RC1 with Akka 2.1.0-RC1 using the latest postgresql and Slick 0.11.1

build.properties – Have not tried yet with sbt 0.12.1 just released

sbt.version=0.12.0

In my Build.scala I added

lazy val buildSettings = Seq(
organization := "com.helenaedelson",
version := "1.0-SNAPSHOT",
scalaVersion := "2.10.0-RC1" // FIXME: use 2.10.0 for final
)

These resolvers

resolvers ++= Seq(
"Sonatype Repo" at "https://oss.sonatype.org/content/repositories/releases/",
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/"
)

And

object Dependency {
val akka       = "com.typesafe.akka"  % "akka-actor_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val slick      = "com.typesafe"       % "slick_2.10.0-M7"       % "0.11.1" withSources ()
val postgresql = "postgresql"         % "postgresql"            % "9.1-901.jdbc4" % "runtime"
val logback    = "ch.qos.logback"     % "logback-classic"       % "1.0.7" // EPL 1.0 / LGPL 2.1
val slf4j      = "org.slf4j"          % "slf4j-api"             % "1.6.4" // MIT

object Test {
val akkaTestKit = "com.typesafe.akka" % "akka-testkit_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val junit       = "junit"             % "junit"                 % "4.10" % "test" // Common Public License 1.0
val scalatest   = "org.scalatest"     % "scalatest"             % "1.8"  % "test" cross CrossVersion.full // ApacheV2
// Note that this version gives the message "-oDF is deprecated, use stdout instead."
// val scalatest = "org.scalatest"    % "scalatest"             % "2.0.M4-2.10.0-RC1-B1" % "test" cross CrossVersion.full // ApacheV2
val scalacheck   = "org.scalacheck"   % "scalacheck"            % "1.10.0" % "test" cross CrossVersion.full // New BSD
}
}

Share/Save

The Importance of Patterns in Messaging

Organization who dealt with scalability challenges successfully such as Google, eBay, Amazon and Yahoo, went through architecture changes and eventually reached a similar pattern: Asynchronous event-driven design.

Claim Check – Handling large file sizes

Messaging is most applicable to fast, frequent use cases of data transfer and notifications. Transport of large files is not, however the Claim Check Enterprise Integration Pattern (EIP) is very well suited for this.

How can we reduce the data volume of message sent across the system without sacrificing information content?

  • Store message data in a persistent store that has HA and is highly scalable
    • A NOSQL store is more appropriate than a relational database
      • Document Store: If you choose to store the file itself
        • CouchDB – REST protocol, JSON API
      • Key Value: If you choose to store the location URI of the file
        • Riak – REST protocol, JSON API
  • Pass a Claim Check (unique key) to subsequent components
  • These components use the Claim Check to retrieve the stored information

The Claim Check key allows you to replace message content with a claim check for later message retrieval. This pattern is very useful when message content is very large and would be too expensive to send around. They key here is that not all components require all information – some just need the key, while others need the actual data in the file.

It can also be useful in situations where you cannot trust the information with an outside party; in this case, you can use the Claim Check to hide the sensitive portions of data.

Share/Save

An Agile Software Development Lifecycle – Increasing Efficiency and Throughput, Decreasing Bottlenecks

 

This post is in progress, and comes from years of finding things that work, as well as slamming into the pain and not having the authority to change the process (That’s called using power for good, people), or saying, “Good luck with that”. One thing I have learned in all the org’s I have frequented is that most orgs that have problems recognize this and sincerely want and are trying to improve the process. Getting the process right is an art.


SDLC – Increasing Efficiency and Throughput, Decreasing Bottlenecks


Whether you are new to Agile or seasoned, you should read The Agile Manifesto – poetry for the weary.

 

Software development lifecycle improvements can increase throughput, manageability and maintainability, vastly increase overall efficiency, and decreases bugs. A major factor is improving the agile process for the organization (from planning to deploys), for which you need to not only educate staff but also have employee buy-in to the process. Jumping into this with a system that is in need of re-architecting could create more bottlenecks and technical debt so timing and rollout of new processes must be planned well.

 

Planning And Requirements


Insure that requirements are broken down into what can be completed and demoed correctly in each sprint. Scoping each unit of work for a feature, enhancement, bug fix or task must be done in such a way that dependencies are never a bottleneck and goals are achievable and testable. The business should set requirements in a scope that is achievable, partitioning work sometimes over multiple sprints.

 

Tooling


Do not overlook a very important part of the process that lends to success – your process tooling. Tools like JIRA are often underutilized. Leveraged properly, it makes the work and process transparent, improving not only efficiency but accuracy of implementations, testing, and deploys. If you don’t already have Greenhopper for JIRA, get it. JIRA is not just a bug tool, it is a SDLC tool.

 

  • I would not do a project without JIRA (with Greenhopper)
  • I would not do a project without GIT as the repo. Anything else and you are asking for pain and incredible inefficiency

Turning Requirements Into Stories


This may be the starting directive: Build in real-time asynchronous stock ticker updates to the client. In Agile, the Story should look like this:

 

As a User I want to receive real-time updates of stocks I have selected to watch, so that I can take immediate, accurate advantage of market fluctuations

We can not proceed without covering dependencies. A functional dependency is something that requires or depends on another thing being done first. Example: I want to tweet, I have a dependency on an internet connection to send the tweet in order to accomplish that task.

This assumes or highlights many things, for instance, what could be done in parallel as tasks of the same story? What may more appropriately completed in a prior Sprint?

  • Is there a UI dependency to create a form where the user can select stocks to watch, submit the form?
  • Does a new db table need to be created for persisting stock selections by user?
  • What in the messaging system needs to be added to do this?
  • What could be done in parallel as tasks of the same story?
  • How can this be architected to allow for consecutive sprint development?
  • How can QA best test this over each sprint until the final task is committed?
  • Do we put this in a feature branch or roll each part in by sprint so QA can test?

A business dependency, which may also be called a requirement, is something engineering needs from the business in order to complete a Story or Task. For example:

As a business owner I want to authorize all incoming user requests from the client to insure that user has been granted rights to do what they are requesting to do in the system.

Say the Product Manager brings this up in a Sprint Planning meeting to the team, reviews the requirements, and one of the engineers says, “I see a requirement missing, there is no failure scenario”. If the client is simply making API REST calls to the server, then any client can call what it is not granted authority to do. A failure plan is needed for what is generated on the server and sent to the caller/client and if the client is a UI, how this is displayed. This could need a new UI task and time.

 

Story Points And Time Boxing


Possibly the most highly-debated part of any SDLC process methodology, whether agile, lean or other.
Planning Poker is an interesting read.

Planning And Effective Use Of Tooling


First, a Project Manager should identify the availability in hours per day of each team member for a sprint to get the total hours of the team for a sprint. I should note here that I have fought with QA many times over the years and may be biased.

 

  • Product Manager Creates a Story ticket that will have child tasks
    • The business insures that business requirements are accurately and granularily described in the story’s description
    • Team (Dev/QA) assesses the business requirements, and identifies questions or information lacking by the business
    • Project Manager assigns the Story ticket to the business/Product Manager to complete – and everyone can see where in the process this ticket is. When complete, the business/Product Manager assigns ticket back to Project Manager
    • Product Manager and Project Manger reviews with team
      • If team deems requirements are adequate to proceed, Project Manager can schedule the Story
    • Team (Engineer(s) – Dev) votes on the number of story points to assign to the Story – how many days will it take to complete? What is the difficulty of the story?
    • Does this story have dependencies?
      • If so, are they completed or scheduled? How long will they take?
      • Does this story require an architect before developers come on?
      • Do we need to prototype or vet a new technology decision or strategy first?
      • Is there a functional or UI dependency?
      • Dependencies in the same sprint must be scheduled in such a way that no
    • Team (Engineer(s) – Dev) create Child Tasks under Story – if a junior team, Team Lead insures accuracy and that nothing is left out
      • Developers task out what needs to be done to complete a scoped story
      • Developers pick task tickets and assign to self
      • Developer / QA timeboxes task – how many hours will this take?
    • Team (Engineer(s) – QA) create Child Tasks under Story
      • Create task ticket to create test plan for story
      • Review test plan with the business and developers
      • Attach approved test plan to Story’s JIRA ticket
    • more coming

Now insure that the hours it will take to complete the work is achievable given the availability of the team.

 

Development – and Effective User Of GIT


  • TDD – Test Driven Development
    • 1:1 correlation between requirements from the business and unit/integration test methods
    • Unit test: testing a method or small unit of code
    • Integration test: testing a Service method that covers multiple component (DAO, Util, Messaging, etc)
    • Tests must be written in such a way that they are OS and Environment independent
    • Tests must be written in such a way that they can be run by an automated process – Continuous Integration (CI)
    • If CI tests fail, the team should be notified by email that tests have failed, and attended to quicky
    • Before code check in
      • Latest changes in the current branch should be pulled in
      • The new code must compile with the latest code in the branch
      • Tests should be run by the developer and pass
        • This means that every developer is responsible for all of their tests passing when a team mate checks out the latest code
      • New code must be deployed successfully by the developer
  • Coding Standards
    • Every org needs coding standards and a means to insure that such standards are being implemented
      • Code Reviews
      • Developer training
      • Best Practices instruction
      • Lunch and Learn presentations for the team
      • Team education of the system, tools, processes – the more everyone knows, the faster and less error prone things will be
      • How to properly test code
    • Architecture enforcement with AOP in codebase
      • Automate catching poor architecture or code implementations in dev-time
        • No engineer has time to stay on top of static code standard documentation

Technical Debt

If an org is accumulating technical debt in part due to the business: sales and marketing pressures for new features and bus fixes faster than engineering can also take care of issues that need to be resolved as well as doing work properly the first time, then the risk management of the system is put on the back burner and accumulates. This will eventually blow up.

Technical debt time must be built into R&D sprints, and often are done as sprints on their own.

 

QA, Staging, Deploys, and Rollback Strategies


Not a free-for-all, I have not written this yet. However, deploys should be automated. If you need to do it manually, there is something wrong with your application architecture and/or deployment architecture.

Share/Save

Routing Topologies for Performance and Scalability with RabbitMQ

Designing a good routing topology for a highly-scalable system can be like mapping a graph. Many things need to be considered, for instance the problem, constraints of the environment, those of the messaging implementation, and performance strategies. What we often run up against is a lack of flexibility and expressivity in fitting routing to our needs. Here is where RabbitMQ stands out.

Basic Concepts

Anyone familiar with messaging in general knows the concept of routing messages from A to B. Routing can be simplistic or quite complex, and when designing a routing topology for a scalable, complex system it must be elegant. Kept clean and decoupled, components can throttle nicely with varying loads. This can be expressed as a simple map or complex graph. In its simplest form a routing topology can be expressed as nodes, for instance hierarchical nodes:

For those new to RabbitMQ or AMQP (note that Rabbit works with many protocols including STOMP, HTTP, HTTPS, XMPP, and SMTP), here are some basic component descriptions:

  • Exchange The entity within the server which receives messages from producer applications and optionally routes these to message queues within the server
  • Exchange type The algorithm and implementation of a particular model of exchange. In contrast to the “exchange instance”, which is the entity that receives and routes messages within the server
  • Message queue A named entity that holds messages and forwards them to consumer applications
  • Binding An entity that creates a relationship between a message queue and an exchange
  • Routing key A virtual address that an exchange may use to decide how to route a specific message

For point-to-point routing, the routing key is usually the name of a message queue. For topic pub-sub routing the routing key is usually hierarchical in nature:

api.agents.agent-{id}.operations.{operationName}

In more complex cases the routing key may be combined with routing on message header fields and/or its content. An exchange examines a message’s properties, header fields, body content, and possibly data from other sources, then decides how to route the message. A binding pattern derived from the above routing key idea might look like api.agents.*.operations.* where we bind exchange E1 to queue Q1 with binding pattern api.agents.*.operations.* so that any messages sent to E1 route to Q1 if their routing key matches the binding pattern.

A Rabbit broker is structured differently than a JMS Broker. Each RabbitMQ server is comprised of at least one node (broker), or more typically, nodes in a cluster. Each node has a default virtual host, “/”, and further virtual hosts can be created such as “/develoment”. Rabbit virtual hosts are like Tomcat virtual hosts and partition broker data into sub-sets. Within these virtual hosts are exchanges and queues. When a user connects with its credentials, it is connecting to a virtual host on a Rabbit node.

Here we connect to a Rabbit node, declare an exchange to publish to, a queue to consume from, a binding pattern, then publish a few messages, using the RabbitMQ java client api:

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

A simple consume of “landed” rockets could look like:

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);

int landed = 0;
while (landed < launched) {
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    String rocketLanded = new String(delivery.getBody());

    if (rocketLanded.equalsIgnoreCase("Alderaan")) {
        System.out.println("That's no moon, that's a space station.");
    }
    landed++;
}

The Problem

In considering what routing strategies perform best in scalable environments where performance itself can also be improved, there many options. One of the great things about messaging in general is the variety of configurations available, and figuring out the right ones that solve both current and growing requirements.

To keep things simple let's consider two strategies:

  1. Highly-partitioned routing with hierarchical routing keys, fewer topic exchanges
  2. A larger number of direct exchanges and queues with far less routing partitions

Each scenario follows this use case: each application that must scale is both producer and consumer:

Producer Consumer

Where To Start

It is a good idea to take stock of your environment and its components before delving into a routing solution that will scale cleanly and efficiently over time. For example, what lends to scaling? Generally, decoupling, distribution, asynchrony, parallelism, levels of abstraction and indirection to name a few. Then consider what elements are current or potential bottlenecks. It is a basic principle that high traffic/volume pathways require more efficient throughput or you incur risk of bottlenecks in your distribution. One exercise is to rank these in terms of traffic or as a heat map. Next, can you classify your traffic – are there overarching patterns, topics or similar message types, and what are the relationships? Now start to consider consolidation, how and where might efficiency be improved, and apply tested patterns that resolve those heat points, decouple for scale, and increase performance.

General Routing Considerations

All exchange types behave differently. Here are a few general rules:

  • If you have a finite domain of routing keys in an application's graph then many fanout exchanges might be the right fit  (1:1 mapping of exchange per routing  key)
  • If you have a potentially infinite number of routing keys, consider topic exchanges
  • For topic routing, performance decreases as the number of bindings increase
  • Fanout exchanges are very fast because they have no routing to process yet if bound to a large number of queues that changes
  • Direct exchanges are a faster form of topic exchanges, provided you do not need the wild card
  • Troubleshooting problems across 100,000+ queues could be tedious versus a topology with more bindings, fewer exchanges and queues
  • A very high number of exchanges and queues take up more memory which may be significant but this really depends

As of RabbitMQ 2.4.0, released March 23, 2011, a new topic routing algorithm optimization is available that is 60 times faster at peak than the previous topic algorithm. Due to this, one recommendation is to go for less exchanges and queues, and more routing because the time increase is now minimal.

Performance

What is Cheap?

In terms of memory cost, exchanges and bindings. In Erlang, which RabbitMQ is built on, each node (broker) is a process, as is each queue. By default the Erlang VM process limit is set to 1M, which can be raised. However, an exchange is not a process for scalability reasons, it is simply a row in RabbitMQ's built-in Mnesia database. In a cluster, declaring an exchange causes it to appear on all nodes of the cluster, while declaring a queue creates it on only one of the nodes. This explains why exchanges survive node restarts or creating a node in a cluster, yet queues do not.

Be wary of binding churn. In strategy number 2, if you create many new queues and their bindings, whenever consumers attach you might run into problems. For instance, given exchanges E1...En to which many messages are being published, whenever consumer Cm connects, it creates bindings from its own queue to all of E1...En which may cause problems, depending on the rate of connections.

To alleviate binding churn, consider exchange-to-exchange bindings, new as of version 2.3.1. Each consumer could have its own secondary exchange Ym which must not be auto-delete. Then bind all of E1...En to Ym. In this way these bindings always exist. In this scenario, whenever consumer Cm connects it simply has to declare its queue and bind that queue to Ym. If Ym is a fanout exchange, it will be very fast and reduce the binding churn rate to 1 per connection, rather than potentially n per connection.

Exchange-to-Exchange Binding

Use Cases

Exchange-to-Exchange Scalable Use Case

Consider a server application with autonomous agents. Each agent is on a virtual machine that is part of an elastically-scaled system. As each agent starts up it sends a message to the server that it is online, followed by many other messages such authentication and data transfer. If we have 1,000 agents, each declaring 50 direct exchanges, queues and bindings, then each agent must know the server's queues in order to fulfill the binding contract on queue.declare operations. That is not a scalable solution.

Now consider creating shared topic exchanges: one exchange for the agent to server pathway,  another for the server to agent pathway, and a third to handle unauthenticated agents which routes only to those queues that do not require security. Now we partition with binding patterns, message routing keys and bring one set of those up for each server to be shared by all agents that connect to it. Then, in its simplest form, as each agent comes online it declares a private exchange and queue, and binds its exchange to the shared topic exchanges.

Our relationships are now expressed by exchange-to-exchange mappings which reduces churn rate and decouples agents from having to 'know' the server queues. Using this pattern the system is  clean, decoupled, and scalable.

Basic Routing Topology

Elastic-Scaling Use Case

Let's take the previous scenario a step further. We are already using topic pub-sub routing over scenario 2: many direct routings. Now let's say the system requirement bumps up to scale clusters of our server application in a data center with 50,000 or more agents. How can we throttle varying loads?

The authenticated client exchange routes messages from agent to server. It handles all operations publishing messages to singular-consumer queues, including those producing the highest frequency of messages. This is a potential bottleneck under the current topology with roughly 60,000 messages per minute for 10,000 clients or 86,400,000 messages per day. This is easily resolvable, RabbitMQ can handle over 1 billion messages per day depending on your configuration, for example whether or not you are persisting messages.

Our server applications are running a RabbitMQ cluster. Remember that in a cluster, declaring an exchange causes it to appear on all nodes, while declaring a queue creates it on only one of the nodes, so we have to configure a solution.

Load-Balancing Between Producers and Consumers

To efficiently handle these potentially very high loads as more client applications (Agents) come online, we can modify this topology in several ways. First, an optimization from the above configuration to load-balance messages across a Rabbit cluster. We can create one queue for each node in the Rabbit cluster. If we have four nodes, for each high-traffic queue we create hfq.{0,1,2,3} for that operation. Now each agent can randomly pick a node by a number between zero and three, or a more sophisticated round-robin implementation, to publish to. With RabbitMQ there are RPC calls or you can use the Rabbit management plugin to GET the number of nodes, which you can use in your round-robin algorithm.

Worker Queues With Round-Robin Dispatching

Worker Queues, or Task Queues, are generally used to efficiently distribute time-consuming tasks among multiple workers and easily parallelise work. In addition, this topology applies to eliminating the need to execute resource-intensive tasks and having to block until they complete. Running several worker queues allows these tasks to be distributed among them.

AMQP Work Queues, Round Robin Load Balancing

With Work Queues, by default, Rabbit uses a round-robin distribution method, sending each message to the next consumer in sequence. Each consumer receives roughly the same number of messages. If you declare a queue and spin up 3 competing consumers, bind them to the exchange, and send 20,000 messages, messages zero will route to the first consumer, message one to the second, message two to the third and so on. If we begin building up a backlog of tasks, we can simply add more workers, allowing the system to scale easily.

Performance

Memory

Neither option above will necessarily induce high load in RabbitMQ. There are no hard limits to the number of exchanges and queues, one can create, and running 100,000 queues on one broker is fine. With the right tuning and enough RAM you can run well over a million queues.

RabbitMQ dynamically pushes messages to disk to free up RAM, thus the memory footprint of a queue is not dependent on its contents. After a queue is idle for 10 seconds or more it will "hibernate" which causes GC on that queue. As a result, the amount of memory a queue requires can dramatically shrink. For example, it might be possible for 1000 empty, idle queues to take up 10MB of RAM. When they're all active (even if empty), they might of course, depending on memory fragmentation, consume much more memory. Forcing them back into hibernation to test behavior is difficult because the Erlang VM does not hand back memory to the OS immediately.

You can, however, observe a large process that hibernates and has very fragmented memory because the amount reclaimed can be sufficient to force the VM to hand back memory to the OS. If you run a test that steadily increases the memory footprint of Rabbit you could observe the effect of hibernation on idle processes as it reduces the rate of increase of memory use.

Erlang is a multi-threaded VM which takes advantage of multiple cores. It presents green threads to the developer which are called 'processes' because unlike threads, they conceptually do not share an address space.

Transactions

Transactions on 10,000 messages can take as along as four minutes to publish. A new RabbitMQ feature called Publisher Confirms is more than 100 times faster than the same, but transactional, code. If you are not explicitly required to implement transactions but do need the verification you might consider this option.

The Take-Away

Here are some final takeaways to help you squeeze the greatest performance gains out of your implementation:

  • The new topic routing algorithm optimization is 60 times faster at peak
  • Topic binding patterns using wildcards '*', which matches a single word, is much faster than '#', which matches zero or more words. Wildcard '#' takes longer to process in the routing table than '*'
  • Exchange-to-exchange bindings improve decoupling, increase topology flexibility, reduce binding churn, and help increase performance
  • Exchanges and bindings are very light weight
  • RabbitMQ Publisher Confirms are more than 100 times faster than AMQP transactions
  • After a queue is idle for >=10 seconds it will "hibernate", inducing GC on the queue, resulting in a dramatic decrease in memory required for that queue
  • Worker Queues help parallelize and distribute workloads
  • Distributing worker queues in the Rabbit cluster helps scale
  • Load-balance your topology

This is by no means a thesis on the subject, there are indeed many more patterns, topologies and performance details to consider. A strategy, as always, depends on so many factors but I hope this encapsulates enough to help or at least get one thinking in the right directions.

Get It

RabbitMQ Source on GitHub
RabbitMQ Binary Downloads and Plugins
Erlang Downloads
Spring AMQP API for RabbitMQ in Java and .NET
Hyperic to monitor RabbitMQ
Maven


<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

Share/Save