Category Archives: Akka

Comparing and Choosing Technologies

Unable to sleep at 2:30 AM, I just read an interesting case study with a large company on how they have started to address Ad Tech applied to TV households vs mobile and web, with frequency capping to viewers, etc. Not surprisingly they use Kafka. It made me think of a situation I was in where someone brought up Solace versus Kafka.

When I listen to people that talk about any technology, I try to really listen to

  • Whether they are speaking from a place of personal experience or from what they have read and surmised from blogs by users that may or may not know what they are doing with that technology, may or may not have applied it properly, and so forth
  • If they have merely played around with that technology or do they have actual production experience
  • And if they have prod experience with that technology was it actually at scale, under real world load

Or, are they talking about a solution where required features are not yet completed – i.e. currently vaporware in a sense. These sort of situations must be taken with care, as opposed to proven ones.

For example, trying to compare Solace to Kafka is comparing apples to oranges. I have seen this sort of comparison attempted many times over the last decade that I have been in messaging, and it (a pun) is fruitless :)

They both serve a different purpose in an architecture, and while they overlap in some functionalities, Solace seems to fit more particular use cases – and handle those use cases very well. While some features are both out of the box with Kafka, and proven in production – at extremely high load and at scale.  No technology is a one size fits all solution. For example I know a production system that uses both an Aerospike cluster to solve some problems and a Cassandra cluster to solve others – both NoSQL stores serving different applications in different areas of the architecture. Taking this a step further, a deployment architecture for one technology, such as Cassandra, would have multiple clusters, one cluster for analytics, one cluster for timeseries, and so forth.

The concept I want to highlight is event-driven architecture, nothing new, and I’ve been working in this for years, but it is quite different from more conventional ETL data flows. Leveraging this architecture can both decouple cleanly and improve speed if done right. In ad tech if you do not respond in time, or correctly, you lose the bid and thus lose money. But this is only one part of the system. It’s not the high-frequency trading use case but it shares some similar requirements.

If you are promised features or an MVP by a team you may be a customer for, including being an internal customer, consider whether that team is motivated enough to have your team/company as a customer, and willing as well as able to do the work quickly to support it. For example, I would dig further into this with their product manager, project manager and engineering lead to find out if they have the a) roadmap, b) priorities, c) resources d) QA, test automation and perf engineers to do this. I wouldn’t want to sign up for it only to find they can’t deliver, or can’t deliver in the necessary time frame.

Things like hadoop or terradata are very nice but you can’t scale with them, and they are not applicable for streaming, event-driven architectures and workloads. You can however scale with Kafka and leverage Kafka to handle the ingestion stream to run analytics and respond back in-line, for example a system run on Kafka that can do distributed RPC, leveraging Akka (or Storm but akka is easier). You can do a lot in 20 ms. For example you can write a system where the consumer sees the produced data in 2 ms tops. A response via Akka remoting for example is another ms hop back. 1 ms response time. There are many ways to solve this of course, and here again, you can use Kafka – Storm, you can use Kafka – Akka and feed to Spark if you can do micro-batching and do not need very low latency analytics response. It all depends on your requirements – what are you solving for, your competition and your constraints.

 

Share/Save

Orderly Load-Time Provisioning And Graceful Shutdown of Akka Nodes

Simple (and truncated) example of the CloudExtension’s load-time ordered provisioning and ordered graceful shutdown. Unfortunately this had to be written in an older version of scala and akka – for now. MyNodeGuardian.scala is started in CloudExtension.register() and is a sample of using ProvisioningGuardian which extends OrderedGracefulShutdown: View Gist

Share/Save

Closing Over An Akka Actor Sender In The Receive

Accidental closing over a sender is easy to get wrong but mayhem if you do it. The rule is simply never close over the sender method in a block of code that is potentially executed in another thread, such as a scheduled task or a Future. The trick is to capture the current sender in a val, as illustrated below:

Unsafe


Here are 3 examples of what not to do. It all depends on what is actually done in the receive. For example, if Futures or other threads are involved and a call to forward or sender is executed in callbacks (such as Future.onSuccess) then you have a problem:

1. Calling sender on another thread than the original message in Actor.receive

1
2
3
4
5
6
def receive = {
  case SomeMessage =>
    context.system.scheduleOnce(5 seconds) {
      sender ! DelayedResponse  // Danger!
    }
}

Here the thing to watch out for is { sender ! DelayedResponse } because it is run on another thread than the original message. The sender is associated with current message being processed, and in the Scheduler case illustrated above, several other messages might have been processed during the 5.seconds before the call to the method sender, such that the correct sender is no longer associated with the message that was processed when the scheduled task was triggered.

2. Shared mutable state

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    var state = State()
    def receive = {
      case _ =>
        Future { state = newState }
        otherActor ? message onSuccess { r => state = r }
    }
}

The above code will break your application in weird ways. Because the sender changes for every message this will cause a shared mutable state bug.

3. Calling sender in a Future during Actor.receive could return a null or be the wrong ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ =>  Future { expensiveCalculation(sender) } // painful, and you owe me a beer if you do it
    }
}

The return value of the sender method after processing the current message might be null until the next message is received, but that doesn’t matter. Simply use a val if you close over sender in a a block of code that might be executed by another thread.

Safe


Here are a few examples that are safe, the first because we simply freeze the sender reference:

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
    def receive = {
        case SomeMessage =>
            val requestor = sender
            context.system.scheduleOnce(5 seconds) {
              requestor ! DelayedResponse
           }
   }
}

Or better yet, keep a clean Actor.receive all together – a best practice

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    def receive = {
        case msg: SomeMessage => receiveSomeMessage(sender, msg)
   }
  def receiveSomeMessage(requestor: ActorRef, msg: SomeMessage): Unit = context.system.scheduleOnce(5 seconds) {
        requestor ! DelayedResponse
   }
}

It is safe to close over self and a thread-safe ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ => Future { expensiveCalculation() } onComplete { f => self ! f.value.get }
    }
}

It is safe to close over a fixed value and a thread-safe ActorRef

1
2
3
4
5
6
7
class MyActor extends Actor {
    def receive = {
      case _ =>
        val currentSender = sender // freeze the sender to a val
        Future { expensiveCalculation(currentSender) }
    }
}

Find Out More

Some examples were taken from “Akka Concurrency” by Derek Wyatt, others from the Akka Docs.

Share/Save

Scala 2.10.0 Introduction of Try

One of my favorite things about Scala 2.10 that I’ve been using since this summer is encapsulated in Try.scala. You could remove lines and lines of try catch blocks and horrible (or no) error handling strategies that I have seen strewn around applications over the years. Try gives you a clean, very often one-line, function with error handling built in that is incredibly flexible for the succinctness you can glean from it.

The docs describe it as representing “a computation that may either result in an exception or return a success value. It is analogous to the Either type but encodes common idioms for handling exceptional cases (such as rescue/ensure which is analogous to try/finally)”.

Looking at this simplistically, here are a few samples, the first of which does a very java-like pattern with the try catch block riddled through java code, while the Risky class uses simple one line handling using scala.util.Try.

If you run this you would see the following where the ones showing a scala.util.Success and/or a value result from the fallback:
[info] javaLike [1]
[info] one [None]
[info] two [false]
[info] three [1]
[info] four [1]
[info] five [Failure(java.lang.ArithmeticException: / by zero)]
[info] six [Success(1)]
[info] seven [Success(1)]
[info] eight [Success(1)]

Share/Save

Build Configuration For Scala 2.10.0-RC1 with Akka 2.1.0-RC1 and Slick 0.11.1

Assuming you know the basics, this is just meant to be a quick abridged shortcut ;) I created a sample project to spin up Scala 2.10.0-RC1 with Akka 2.1.0-RC1 using the latest postgresql and Slick 0.11.1

build.properties – Have not tried yet with sbt 0.12.1 just released

sbt.version=0.12.0

In my Build.scala I added

lazy val buildSettings = Seq(
organization := "com.helenaedelson",
version := "1.0-SNAPSHOT",
scalaVersion := "2.10.0-RC1" // FIXME: use 2.10.0 for final
)

These resolvers

resolvers ++= Seq(
"Sonatype Repo" at "https://oss.sonatype.org/content/repositories/releases/",
"Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/",
"Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/"
)

And

object Dependency {
val akka       = "com.typesafe.akka"  % "akka-actor_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val slick      = "com.typesafe"       % "slick_2.10.0-M7"       % "0.11.1" withSources ()
val postgresql = "postgresql"         % "postgresql"            % "9.1-901.jdbc4" % "runtime"
val logback    = "ch.qos.logback"     % "logback-classic"       % "1.0.7" // EPL 1.0 / LGPL 2.1
val slf4j      = "org.slf4j"          % "slf4j-api"             % "1.6.4" // MIT

object Test {
val akkaTestKit = "com.typesafe.akka" % "akka-testkit_2.10.0-RC1" % "2.1.0-RC1" withSources ()
val junit       = "junit"             % "junit"                 % "4.10" % "test" // Common Public License 1.0
val scalatest   = "org.scalatest"     % "scalatest"             % "1.8"  % "test" cross CrossVersion.full // ApacheV2
// Note that this version gives the message "-oDF is deprecated, use stdout instead."
// val scalatest = "org.scalatest"    % "scalatest"             % "2.0.M4-2.10.0-RC1-B1" % "test" cross CrossVersion.full // ApacheV2
val scalacheck   = "org.scalacheck"   % "scalacheck"            % "1.10.0" % "test" cross CrossVersion.full // New BSD
}
}

Share/Save