Yet Another Distributed Systems Reading List

I started putting together a reading list for engineers fairly new to distributed systems and theory, with little hands-on experience yet. Having worked solely with/on teams of all senior engineers, I am now running a team with very senior but also one or two relatively recent grads.  In compiling this list to cherry pick from, with input from many twitter friends in response to my plea for input, I realized there’s some I’ve not yet read so this is super useful, many thanks to people that replied.

Some pages make this a list of lists or post of posts :) Ping me on twitter if you have more gems to add https://twitter.com/helenaedelson. I hope to make some sense of this at some point, like a ‘start here’ chunk of readings, progressing to the others.

In No Order:

Share/Save

Comparing and Choosing Technologies

Unable to sleep at 2:30 AM, I just read an interesting case study with a large company on how they have started to address Ad Tech applied to TV households vs mobile and web, with frequency capping to viewers, etc. Not surprisingly they use Kafka. It made me think of a situation I was in where someone brought up Solace versus Kafka.

When I listen to people that talk about any technology, I try to really listen to

  • Whether they are speaking from a place of personal experience or from what they have read and surmised from blogs by users that may or may not know what they are doing with that technology, may or may not have applied it properly, and so forth
  • If they have merely played around with that technology or do they have actual production experience
  • And if they have prod experience with that technology was it actually at scale, under real world load

Or, are they talking about a solution where required features are not yet completed – i.e. currently vaporware in a sense. These sort of situations must be taken with care, as opposed to proven ones.

For example, trying to compare Solace to Kafka is comparing apples to oranges. I have seen this sort of comparison attempted many times over the last decade that I have been in messaging, and it (a pun) is fruitless :)

They both serve a different purpose in an architecture, and while they overlap in some functionalities, Solace seems to fit more particular use cases – and handle those use cases very well. While some features are both out of the box with Kafka, and proven in production – at extremely high load and at scale.  No technology is a one size fits all solution. For example I know a production system that uses both an Aerospike cluster to solve some problems and a Cassandra cluster to solve others – both NoSQL stores serving different applications in different areas of the architecture. Taking this a step further, a deployment architecture for one technology, such as Cassandra, would have multiple clusters, one cluster for analytics, one cluster for timeseries, and so forth.

The concept I want to highlight is event-driven architecture, nothing new, and I’ve been working in this for years, but it is quite different from more conventional ETL data flows. Leveraging this architecture can both decouple cleanly and improve speed if done right. In ad tech if you do not respond in time, or correctly, you lose the bid and thus lose money. But this is only one part of the system. It’s not the high-frequency trading use case but it shares some similar requirements.

If you are promised features or an MVP by a team you may be a customer for, including being an internal customer, consider whether that team is motivated enough to have your team/company as a customer, and willing as well as able to do the work quickly to support it. For example, I would dig further into this with their product manager, project manager and engineering lead to find out if they have the a) roadmap, b) priorities, c) resources d) QA, test automation and perf engineers to do this. I wouldn’t want to sign up for it only to find they can’t deliver, or can’t deliver in the necessary time frame.

Things like hadoop or terradata are very nice but you can’t scale with them, and they are not applicable for streaming, event-driven architectures and workloads. You can however scale with Kafka and leverage Kafka to handle the ingestion stream to run analytics and respond back in-line, for example a system run on Kafka that can do distributed RPC, leveraging Akka (or Storm but akka is easier). You can do a lot in 20 ms. For example you can write a system where the consumer sees the produced data in 2 ms tops. A response via Akka remoting for example is another ms hop back. 1 ms response time. There are many ways to solve this of course, and here again, you can use Kafka – Storm, you can use Kafka – Akka and feed to Spark if you can do micro-batching and do not need very low latency analytics response. It all depends on your requirements – what are you solving for, your competition and your constraints.

 

Share/Save

Changes

It seems the word is out that I left DataStax. I made so many friends there, even recruited an old friend, and know many of us will continue to cross paths and collaborate. Thanks to everyone for the great memories.

Over the last year I have spoken at several Spark, Scala and Machine Learning meetups, then internationally at Scala Conferences like Scala Days Amsterdam, many Big Data conferences such as Spark Summit, Strata and Emerging Tech Conferences, all of which I hope to continue to do.

Spark Summit East 2015

Scala Days Amsterdam, 2015



Philly ETE – Emerging Tech 2015

In October 2015 I’ll be speaking at Spark Summit EU, and on the Spark panel at Grace Hopper. Also I’ll be at Strata NYC again, this year hosting the Spark track on Thursday. I’m adding some cool stuff to my Big Data Lambda talk and new code.

What’s next – I’m humbled to have been contacted by many engineers I know and highly respect about working at their companies, where in some cases they are the founders. This is making for a very difficult decision because they are all doing really interesting and exciting work with open source technologies I like. In the meantime I am taking a month or so to enjoy the last part of summer, something I haven’t had time to do for years :)

Share/Save

Spark Streaming with Kafka and Cassandra

This is a lightning post on a future, more in-depth post on how to integrate Spark Streaming, Kafka and Cassandra, using the Spark Cassandra Connector.

First, here are some of the dependencies used below:

1
2
3
4
5
6
7
8
9
10
11
import com.datastax.spark.connector.streaming._
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkEnv, SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.demo.streaming.embedded._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.demo.Assertions

Start by configuring Spark:

1
2
3
4
5
  val sc = new SparkConf(true)
    .set("spark.cassandra.connection.host", SparkMaster)
    .set("spark.cleaner.ttl", "3600")
    .setMaster("local[12]")
    .setAppName("Streaming Kafka App")

Where ‘SparkMaster’ can be a comma-separated list of Cassandra seed nodes.

Using the SparkConf object you can easily configure as well as populate Cassandra with a keyspace, table and data for quick prototyping. This creates the keyspace and table in Cassandra:

1
2
3
4
5
  CassandraConnector(sc).withSessionDo { session =>
    session.execute(s"CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
    session.execute(s"CREATE TABLE IF NOT EXISTS streaming_test.key_value (key VARCHAR PRIMARY KEY, value INT)")
    session.execute(s"TRUNCATE streaming_test.key_value")
  }

Custom embedded class which starts an (embedded local Zookeeper server and an (embedded Kafka server and client (all very basic here):

1
val kafka = new EmbeddedKafka

Create the Spark Streaming context.

1
val ssc =  new StreamingContext(sc, Seconds(2))

One of several ways to insure shutdown of ZooKeeper and Kafka for this basic prototype:

1
SparkEnv.get.actorSystem.registerOnTermination(kafka.shutdown())

Assuming you already have or are creating a new topic in Kafka and are sending messages to Kafka…. but here is a very basic way to do it if not:

1
2
3
4
5
6
7
8
9
10
11
12
def createTopic(topic: String): Unit = 
    CreateTopicCommand.createTopic(client, topic, 1, 1, "0")
 
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
 
    val producer = new Producer[String, String](new ProducerConfig(p))
    producer.send(createTestMessage(topic, sent): _*)
    producer.close()
  }

Where the createTestMessage function just for these purposes generates test data.

Creates an input stream that pulls messages from a Kafka Broker.

1
2
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)

Import the implicits enabling the DStream’s ‘saveToCassandra’ functions:

1
import com.datastax.spark.connector.streaming._

This defines the work to do in the stream. Configure the stream computations (here we do a very rudimentary computation) and write the DStream output to Cassandra:

1
2
3
4
5
6
stream.map { case (_, v) => v }
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("streaming_test", "key_value", SomeColumns("key", "value"), 1)
 
    ssc.start()

Now we can query our Cassandra keyspace and table to see if the expected data has been persisted:

1
2
3
4
5
 import scala.concurrent.duration._
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value")
    awaitCond(rdd.collect.size == sent.size, 5.seconds)
    val rows = rdd.collect
    sent.forall { rows.contains(_)}

And finally, our assertions being successful, we can explicitly do a graceful shutdown of the Spark Streaming context with:

1
ssc.stop(stopSparkContext = true, stopGracefully = true)

Share/Save

Orderly Load-Time Provisioning And Graceful Shutdown of Akka Nodes

Simple (and truncated) example of the CloudExtension’s load-time ordered provisioning and ordered graceful shutdown. Unfortunately this had to be written in an older version of scala and akka – for now. MyNodeGuardian.scala is started in CloudExtension.register() and is a sample of using ProvisioningGuardian which extends OrderedGracefulShutdown: View Gist

Share/Save

Closing Over An Akka Actor Sender In The Receive

Accidental closing over a sender is easy to get wrong but mayhem if you do it. The rule is simply never close over the sender method in a block of code that is potentially executed in another thread, such as a scheduled task or a Future. The trick is to capture the current sender in a val, as illustrated below:

Unsafe


Here are 3 examples of what not to do. It all depends on what is actually done in the receive. For example, if Futures or other threads are involved and a call to forward or sender is executed in callbacks (such as Future.onSuccess) then you have a problem:

1. Calling sender on another thread than the original message in Actor.receive

1
2
3
4
5
6
def receive = {
  case SomeMessage =>
    context.system.scheduleOnce(5 seconds) {
      sender ! DelayedResponse  // Danger!
    }
}

Here the thing to watch out for is { sender ! DelayedResponse } because it is run on another thread than the original message. The sender is associated with current message being processed, and in the Scheduler case illustrated above, several other messages might have been processed during the 5.seconds before the call to the method sender, such that the correct sender is no longer associated with the message that was processed when the scheduled task was triggered.

2. Shared mutable state

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    var state = State()
    def receive = {
      case _ =>
        Future { state = newState }
        otherActor ? message onSuccess { r => state = r }
    }
}

The above code will break your application in weird ways. Because the sender changes for every message this will cause a shared mutable state bug.

3. Calling sender in a Future during Actor.receive could return a null or be the wrong ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ =>  Future { expensiveCalculation(sender) } // painful, and you owe me a beer if you do it
    }
}

The return value of the sender method after processing the current message might be null until the next message is received, but that doesn’t matter. Simply use a val if you close over sender in a a block of code that might be executed by another thread.

Safe


Here are a few examples that are safe, the first because we simply freeze the sender reference:

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
    def receive = {
        case SomeMessage =>
            val requestor = sender
            context.system.scheduleOnce(5 seconds) {
              requestor ! DelayedResponse
           }
   }
}

Or better yet, keep a clean Actor.receive all together – a best practice

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    def receive = {
        case msg: SomeMessage => receiveSomeMessage(sender, msg)
   }
  def receiveSomeMessage(requestor: ActorRef, msg: SomeMessage): Unit = context.system.scheduleOnce(5 seconds) {
        requestor ! DelayedResponse
   }
}

It is safe to close over self and a thread-safe ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ => Future { expensiveCalculation() } onComplete { f => self ! f.value.get }
    }
}

It is safe to close over a fixed value and a thread-safe ActorRef

1
2
3
4
5
6
7
class MyActor extends Actor {
    def receive = {
      case _ =>
        val currentSender = sender // freeze the sender to a val
        Future { expensiveCalculation(currentSender) }
    }
}

Find Out More

Some examples were taken from “Akka Concurrency” by Derek Wyatt, others from the Akka Docs.

Share/Save

Scala 2.10.0 Introduction of Try

One of my favorite things about Scala 2.10 that I’ve been using since this summer is encapsulated in Try.scala. You could remove lines and lines of try catch blocks and horrible (or no) error handling strategies that I have seen strewn around applications over the years. Try gives you a clean, very often one-line, function with error handling built in that is incredibly flexible for the succinctness you can glean from it.

The docs describe it as representing “a computation that may either result in an exception or return a success value. It is analogous to the Either type but encodes common idioms for handling exceptional cases (such as rescue/ensure which is analogous to try/finally)”.

Looking at this simplistically, here are a few samples, the first of which does a very java-like pattern with the try catch block riddled through java code, while the Risky class uses simple one line handling using scala.util.Try.

If you run this you would see the following where the ones showing a scala.util.Success and/or a value result from the fallback:
[info] javaLike [1]
[info] one [None]
[info] two [false]
[info] three [1]
[info] four [1]
[info] five [Failure(java.lang.ArithmeticException: / by zero)]
[info] six [Success(1)]
[info] seven [Success(1)]
[info] eight [Success(1)]

Share/Save

Functional Data Structure Resources

By request, and not exactly the ‘short list’, here are some excellent resources on functional data structures: note that some cost $ but you can view them for 24 hours for a few USD:

These are good too but not directly related:

Share/Save

Build Configuration For Scala 2.10.0-RC1 with Akka 2.1.0-RC1 and Slick 0.11.1

Assuming you know the basics, this is just meant to be a quick abridged shortcut ;) I created a sample project to spin up Scala 2.10.0-RC1 with Akka 2.1.0-RC1 using the latest postgresql and Slick 0.11.1

build.properties – Have not tried yet with sbt 0.12.1 just released

sbt.version=0.12.0

In my Build.scala I added

lazy val buildSettings = Seq(
organization := "com.helenaedelson",
version := "1.0-SNAPSHOT",
scalaVersion := "2.10.0-RC1" // FIXME: use 2.10.0 for final
)

These resolvers

resolvers ++= Seq(
"Sonatype Repo" at "https://oss.sonatype.org/content/repositories/releases/",
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/"
)

And

object Dependency {
val akka       = "com.typesafe.akka"  % "akka-actor_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val slick      = "com.typesafe"       % "slick_2.10.0-M7"       % "0.11.1" withSources ()
val postgresql = "postgresql"         % "postgresql"            % "9.1-901.jdbc4" % "runtime"
val logback    = "ch.qos.logback"     % "logback-classic"       % "1.0.7" // EPL 1.0 / LGPL 2.1
val slf4j      = "org.slf4j"          % "slf4j-api"             % "1.6.4" // MIT

object Test {
val akkaTestKit = "com.typesafe.akka" % "akka-testkit_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val junit       = "junit"             % "junit"                 % "4.10" % "test" // Common Public License 1.0
val scalatest   = "org.scalatest"     % "scalatest"             % "1.8"  % "test" cross CrossVersion.full // ApacheV2
// Note that this version gives the message "-oDF is deprecated, use stdout instead."
// val scalatest = "org.scalatest"    % "scalatest"             % "2.0.M4-2.10.0-RC1-B1" % "test" cross CrossVersion.full // ApacheV2
val scalacheck   = "org.scalacheck"   % "scalacheck"            % "1.10.0" % "test" cross CrossVersion.full // New BSD
}
}

Share/Save

Upgraded ZeroMQ Scala Binding to work with ZeroMQ 3.2.0

I just pushed an experimental upgrade of zeromq-scala-binding to branch wip-scala-2.10.0-M7-zeromq-upgrade-3.2 here http://github.com/helena/zeromq-scala-binding.

Here is a summary of changes:

  • Compatible with:
    • ZeroMQ 3.2.0
    • SBT 0.12.0
    • Scala 2.10.0-M7
  • Tested against 2.2.0 and 3.2.0 on MacOSX
  • See Changes.md for full list of additions, removals, modifications based on the ZeroMQ 3.2 API and Upgrading from libzmq 2.x to 3.2 docs
  • Added scalariform plugin
  • Added sbt-run test duration output
  • Removed build.sbt and replaced it with project/Build.scala
  • Removed the sbt plugin – that should live in the developer’s env vs the codebase
  • Removed sbt build file (see above)
  • Upgraded jnr
  • Upgraded jna

I call it experimental because it has been tested against ZeroMQ 3.2 and 2.2 only, and on one OS only – Mac. The test coverage in the API originally was sparse, and is still in need of further coverage, particularly regarding socket options. If anyone feels like trying it out, let me know what you think. I will try to pick away at test coverage and on a few other OS vm’s over the next few months.

Share/Save

The Importance of Patterns in Messaging

Organization who dealt with scalability challenges successfully such as Google, eBay, Amazon and Yahoo, went through architecture changes and eventually reached a similar pattern: Asynchronous event-driven design.

Claim Check – Handling large file sizes

Messaging is most applicable to fast, frequent use cases of data transfer and notifications. Transport of large files is not, however the Claim Check Enterprise Integration Pattern (EIP) is very well suited for this.

How can we reduce the data volume of message sent across the system without sacrificing information content?

  • Store message data in a persistent store that has HA and is highly scalable
    • A NOSQL store is more appropriate than a relational database
      • Document Store: If you choose to store the file itself
        • CouchDB – REST protocol, JSON API
      • Key Value: If you choose to store the location URI of the file
        • Riak – REST protocol, JSON API
  • Pass a Claim Check (unique key) to subsequent components
  • These components use the Claim Check to retrieve the stored information

The Claim Check key allows you to replace message content with a claim check for later message retrieval. This pattern is very useful when message content is very large and would be too expensive to send around. They key here is that not all components require all information – some just need the key, while others need the actual data in the file.

It can also be useful in situations where you cannot trust the information with an outside party; in this case, you can use the Claim Check to hide the sensitive portions of data.

Share/Save