Category Archives: Scala

Yet Another Distributed Systems Reading List

I started putting together a reading list for engineers fairly new to distributed systems and theory, with little hands-on experience yet. Having worked solely with/on teams of all senior engineers, I am now running a team with very senior but also one or two relatively recent grads.  In compiling this list to cherry pick from, with input from many twitter friends in response to my plea for input, I realized there’s some I’ve not yet read so this is super useful, many thanks to people that replied.

Some pages make this a list of lists or post of posts :) Ping me on twitter if you have more gems to add https://twitter.com/helenaedelson. I hope to make some sense of this at some point, like a ‘start here’ chunk of readings, progressing to the others.

In No Order:

Share/Save

Changes

It seems the word is out that I left DataStax. I made so many friends there, even recruited an old friend, and know many of us will continue to cross paths and collaborate. Thanks to everyone for the great memories.

Over the last year I have spoken at several Spark, Scala and Machine Learning meetups, then internationally at Scala Conferences like Scala Days Amsterdam, many Big Data conferences such as Spark Summit, Strata and Emerging Tech Conferences, all of which I hope to continue to do.

Spark Summit East 2015

Scala Days Amsterdam, 2015



Philly ETE – Emerging Tech 2015

In October 2015 I’ll be speaking at Spark Summit EU, and on the Spark panel at Grace Hopper. Also I’ll be at Strata NYC again, this year hosting the Spark track on Thursday. I’m adding some cool stuff to my Big Data Lambda talk and new code.

What’s next – I’m humbled to have been contacted by many engineers I know and highly respect about working at their companies, where in some cases they are the founders. This is making for a very difficult decision because they are all doing really interesting and exciting work with open source technologies I like. In the meantime I am taking a month or so to enjoy the last part of summer, something I haven’t had time to do for years :)

Share/Save

Spark Streaming with Kafka and Cassandra

This is a lightning post on a future, more in-depth post on how to integrate Spark Streaming, Kafka and Cassandra, using the Spark Cassandra Connector.

First, here are some of the dependencies used below:

1
2
3
4
5
6
7
8
9
10
11
import com.datastax.spark.connector.streaming._
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkEnv, SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.demo.streaming.embedded._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.demo.Assertions

Start by configuring Spark:

1
2
3
4
5
  val sc = new SparkConf(true)
    .set("spark.cassandra.connection.host", SparkMaster)
    .set("spark.cleaner.ttl", "3600")
    .setMaster("local[12]")
    .setAppName("Streaming Kafka App")

Where ‘SparkMaster’ can be a comma-separated list of Cassandra seed nodes.

Using the SparkConf object you can easily configure as well as populate Cassandra with a keyspace, table and data for quick prototyping. This creates the keyspace and table in Cassandra:

1
2
3
4
5
  CassandraConnector(sc).withSessionDo { session =>
    session.execute(s"CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
    session.execute(s"CREATE TABLE IF NOT EXISTS streaming_test.key_value (key VARCHAR PRIMARY KEY, value INT)")
    session.execute(s"TRUNCATE streaming_test.key_value")
  }

Custom embedded class which starts an (embedded local Zookeeper server and an (embedded Kafka server and client (all very basic here):

1
val kafka = new EmbeddedKafka

Create the Spark Streaming context.

1
val ssc =  new StreamingContext(sc, Seconds(2))

One of several ways to insure shutdown of ZooKeeper and Kafka for this basic prototype:

1
SparkEnv.get.actorSystem.registerOnTermination(kafka.shutdown())

Assuming you already have or are creating a new topic in Kafka and are sending messages to Kafka…. but here is a very basic way to do it if not:

1
2
3
4
5
6
7
8
9
10
11
12
def createTopic(topic: String): Unit = 
    CreateTopicCommand.createTopic(client, topic, 1, 1, "0")
 
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
 
    val producer = new Producer[String, String](new ProducerConfig(p))
    producer.send(createTestMessage(topic, sent): _*)
    producer.close()
  }

Where the createTestMessage function just for these purposes generates test data.

Creates an input stream that pulls messages from a Kafka Broker.

1
2
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)

Import the implicits enabling the DStream’s ‘saveToCassandra’ functions:

1
import com.datastax.spark.connector.streaming._

This defines the work to do in the stream. Configure the stream computations (here we do a very rudimentary computation) and write the DStream output to Cassandra:

1
2
3
4
5
6
stream.map { case (_, v) => v }
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("streaming_test", "key_value", SomeColumns("key", "value"), 1)
 
    ssc.start()

Now we can query our Cassandra keyspace and table to see if the expected data has been persisted:

1
2
3
4
5
 import scala.concurrent.duration._
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value")
    awaitCond(rdd.collect.size == sent.size, 5.seconds)
    val rows = rdd.collect
    sent.forall { rows.contains(_)}

And finally, our assertions being successful, we can explicitly do a graceful shutdown of the Spark Streaming context with:

1
ssc.stop(stopSparkContext = true, stopGracefully = true)

Share/Save

Orderly Load-Time Provisioning And Graceful Shutdown of Akka Nodes

Simple (and truncated) example of the CloudExtension’s load-time ordered provisioning and ordered graceful shutdown. Unfortunately this had to be written in an older version of scala and akka – for now. MyNodeGuardian.scala is started in CloudExtension.register() and is a sample of using ProvisioningGuardian which extends OrderedGracefulShutdown: View Gist

Share/Save

Closing Over An Akka Actor Sender In The Receive

Accidental closing over a sender is easy to get wrong but mayhem if you do it. The rule is simply never close over the sender method in a block of code that is potentially executed in another thread, such as a scheduled task or a Future. The trick is to capture the current sender in a val, as illustrated below:

Unsafe


Here are 3 examples of what not to do. It all depends on what is actually done in the receive. For example, if Futures or other threads are involved and a call to forward or sender is executed in callbacks (such as Future.onSuccess) then you have a problem:

1. Calling sender on another thread than the original message in Actor.receive

1
2
3
4
5
6
def receive = {
  case SomeMessage =>
    context.system.scheduleOnce(5 seconds) {
      sender ! DelayedResponse  // Danger!
    }
}

Here the thing to watch out for is { sender ! DelayedResponse } because it is run on another thread than the original message. The sender is associated with current message being processed, and in the Scheduler case illustrated above, several other messages might have been processed during the 5.seconds before the call to the method sender, such that the correct sender is no longer associated with the message that was processed when the scheduled task was triggered.

2. Shared mutable state

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    var state = State()
    def receive = {
      case _ =>
        Future { state = newState }
        otherActor ? message onSuccess { r => state = r }
    }
}

The above code will break your application in weird ways. Because the sender changes for every message this will cause a shared mutable state bug.

3. Calling sender in a Future during Actor.receive could return a null or be the wrong ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ =>  Future { expensiveCalculation(sender) } // painful, and you owe me a beer if you do it
    }
}

The return value of the sender method after processing the current message might be null until the next message is received, but that doesn’t matter. Simply use a val if you close over sender in a a block of code that might be executed by another thread.

Safe


Here are a few examples that are safe, the first because we simply freeze the sender reference:

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
    def receive = {
        case SomeMessage =>
            val requestor = sender
            context.system.scheduleOnce(5 seconds) {
              requestor ! DelayedResponse
           }
   }
}

Or better yet, keep a clean Actor.receive all together – a best practice

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    def receive = {
        case msg: SomeMessage => receiveSomeMessage(sender, msg)
   }
  def receiveSomeMessage(requestor: ActorRef, msg: SomeMessage): Unit = context.system.scheduleOnce(5 seconds) {
        requestor ! DelayedResponse
   }
}

It is safe to close over self and a thread-safe ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ => Future { expensiveCalculation() } onComplete { f => self ! f.value.get }
    }
}

It is safe to close over a fixed value and a thread-safe ActorRef

1
2
3
4
5
6
7
class MyActor extends Actor {
    def receive = {
      case _ =>
        val currentSender = sender // freeze the sender to a val
        Future { expensiveCalculation(currentSender) }
    }
}

Find Out More

Some examples were taken from “Akka Concurrency” by Derek Wyatt, others from the Akka Docs.

Share/Save

Scala 2.10.0 Introduction of Try

One of my favorite things about Scala 2.10 that I’ve been using since this summer is encapsulated in Try.scala. You could remove lines and lines of try catch blocks and horrible (or no) error handling strategies that I have seen strewn around applications over the years. Try gives you a clean, very often one-line, function with error handling built in that is incredibly flexible for the succinctness you can glean from it.

The docs describe it as representing “a computation that may either result in an exception or return a success value. It is analogous to the Either type but encodes common idioms for handling exceptional cases (such as rescue/ensure which is analogous to try/finally)”.

Looking at this simplistically, here are a few samples, the first of which does a very java-like pattern with the try catch block riddled through java code, while the Risky class uses simple one line handling using scala.util.Try.

If you run this you would see the following where the ones showing a scala.util.Success and/or a value result from the fallback:
[info] javaLike [1]
[info] one [None]
[info] two [false]
[info] three [1]
[info] four [1]
[info] five [Failure(java.lang.ArithmeticException: / by zero)]
[info] six [Success(1)]
[info] seven [Success(1)]
[info] eight [Success(1)]

Share/Save

Functional Data Structure Resources

By request, and not exactly the ‘short list’, here are some excellent resources on functional data structures: note that some cost $ but you can view them for 24 hours for a few USD:

These are good too but not directly related:

Share/Save

Build Configuration For Scala 2.10.0-RC1 with Akka 2.1.0-RC1 and Slick 0.11.1

Assuming you know the basics, this is just meant to be a quick abridged shortcut ;) I created a sample project to spin up Scala 2.10.0-RC1 with Akka 2.1.0-RC1 using the latest postgresql and Slick 0.11.1

build.properties – Have not tried yet with sbt 0.12.1 just released

sbt.version=0.12.0

In my Build.scala I added

lazy val buildSettings = Seq(
organization := "com.helenaedelson",
version := "1.0-SNAPSHOT",
scalaVersion := "2.10.0-RC1" // FIXME: use 2.10.0 for final
)

These resolvers

resolvers ++= Seq(
"Sonatype Repo" at "https://oss.sonatype.org/content/repositories/releases/",
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/"
)

And

object Dependency {
val akka       = "com.typesafe.akka"  % "akka-actor_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val slick      = "com.typesafe"       % "slick_2.10.0-M7"       % "0.11.1" withSources ()
val postgresql = "postgresql"         % "postgresql"            % "9.1-901.jdbc4" % "runtime"
val logback    = "ch.qos.logback"     % "logback-classic"       % "1.0.7" // EPL 1.0 / LGPL 2.1
val slf4j      = "org.slf4j"          % "slf4j-api"             % "1.6.4" // MIT

object Test {
val akkaTestKit = "com.typesafe.akka" % "akka-testkit_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val junit       = "junit"             % "junit"                 % "4.10" % "test" // Common Public License 1.0
val scalatest   = "org.scalatest"     % "scalatest"             % "1.8"  % "test" cross CrossVersion.full // ApacheV2
// Note that this version gives the message "-oDF is deprecated, use stdout instead."
// val scalatest = "org.scalatest"    % "scalatest"             % "2.0.M4-2.10.0-RC1-B1" % "test" cross CrossVersion.full // ApacheV2
val scalacheck   = "org.scalacheck"   % "scalacheck"            % "1.10.0" % "test" cross CrossVersion.full // New BSD
}
}

Share/Save