Helena Edelson

Akka, Spark, Scala, Cassandra, Big Data, Data Science, Cloud Computing, Machine Learning, Distributed Architecture

  • cloud

  • RSS Subscribe

  • Topics

  • Archives



    • @helenaedelson on Twitter
    • Helena Edelson on LinkedIn
    • GitHub Octocat

      helena @ github

      • Status updating...


Archive for the 'Concurrency' Category

Recent Gists on Akka Cluster Metrics-Aware Adaptive Load-Balancing Routers

Posted by Helena Edelson on 11th May 2013

These are some of the Gists I wrote for The New Cluster Metrics-Aware Adaptive Load-Balancing Routers and The Cluster Metrics API for letitcrash.com which I recycled for my last Scala user group presentation on Akka

Share/Save

Posted in Akka, Big Data, Cloud, Concurrency, Scala, Scalability | Comments Off

Closing Over An Akka Actor Sender In The Receive

Posted by Helena Edelson on 24th March 2013

Accidental closing over a sender is easy to get wrong but mayhem if you do it. The rule is simply never close over the sender method in a block of code that is potentially executed in another thread, such as a scheduled task or a Future. The trick is to capture the current sender in a val, as illustrated below:

Unsafe


Here are 3 examples of what not to do. It all depends on what is actually done in the receive. For example, if Futures or other threads are involved and a call to forward or sender is executed in callbacks (such as Future.onSuccess) then you have a problem:

1. Calling sender on another thread than the original message in Actor.receive

1
2
3
4
5
6
def receive = {
  case SomeMessage =>
    context.system.scheduleOnce(5 seconds) {
      sender ! DelayedResponse  // Danger!
    }
}

Here the thing to watch out for is { sender ! DelayedResponse } because it is run on another thread than the original message. The sender is associated with current message being processed, and in the Scheduler case illustrated above, several other messages might have been processed during the 5.seconds before the call to the method sender, such that the correct sender is no longer associated with the message that was processed when the scheduled task was triggered.

2. Shared mutable state

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    var state = State()
    def receive = {
      case _ =>
        Future { state = newState }
        otherActor ? message onSuccess { r => state = r }
    }
}

The above code will break your application in weird ways. Because the sender changes for every message this will cause a shared mutable state bug.

3. Calling sender in a Future during Actor.receive could return a null or be the wrong ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ =>  Future { expensiveCalculation(sender) } // painful, and you owe me a beer if you do it
    }
}

The return value of the sender method after processing the current message might be null until the next message is received, but that doesn’t matter. Simply use a val if you close over sender in a a block of code that might be executed by another thread.

Safe


Here are a few examples that are safe, the first because we simply freeze the sender reference:

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
    def receive = {
        case SomeMessage =>
            val requestor = sender
            context.system.scheduleOnce(5 seconds) {
              requestor ! DelayedResponse
           }
   }
}

Or better yet, keep a clean Actor.receive all together – a best practice

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    def receive = {
        case msg: SomeMessage => receiveSomeMessage(sender, msg)
   }
  def receiveSomeMessage(requestor: ActorRef, msg: SomeMessage): Unit = context.system.scheduleOnce(5 seconds) {
        requestor ! DelayedResponse
   }
}

It is safe to close over self and a thread-safe ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ => Future { expensiveCalculation() } onComplete { f => self ! f.value.get }
    }
}

It is safe to close over a fixed value and a thread-safe ActorRef

1
2
3
4
5
6
7
class MyActor extends Actor {
    def receive = {
      case _ =>
        val currentSender = sender // freeze the sender to a val
        Future { expensiveCalculation(currentSender) }
    }
}

Find Out More

Some examples were taken from “Akka Concurrency” by Derek Wyatt, others from the Akka Docs.

Share/Save

Posted in Akka, Cloud, Concurrency, Java, Scala | No Comments »

Why RabbitMQ for Cloud?

Posted by Helena Edelson on 2nd February 2011

We know that Messaging itself affords moving from a SQL -> Database -> File System model and into the Network layer itself with:

  • Logical decoupling
  • Physical decoupling
  • Temporal decoupling

but I’ve been asked why I would choose implementing RabbitMQ in a cloud solution. This is a big topic, and there is a great deal more I’d say conversationally but for a relatively quick overview let’s start from the ground up.

Why AMQP?

AMQP is an open, wire-level binary protocol for Message Orientated Middleware that was created by and continues to developed collaboratively for users in mission-critical real-world scenarios. AMQP solutions are deployed to high performance clustering and grid computing problems and is developed for typical business scenarios:

1. distributes data to hundreds of clients in real time across a global network
2. accepts workloads to process from those clients and handles them reliably in a clustered transaction processing engine
4. connects to other partners over the Internet to exchange data
5. carries out all these functions in a secure manner which would pass an audit
6. monitors application traffic to build up a real-time activity

AMQP implementations offer fire and forget where you can absolutely trust that your messages will reach their destination, extremely fast delivery of data to a large number of consumers (publish sub-event notification), and
secure, firewall friendly file transfer.

“AMQP is a good fit for cloud MaaS and other wide-area, Internet-based messaging purposes, because it defines the wire protocol and communication semantics without restricting the details of the messaging system implementation or the API. AMQP’s goal is to enable interoperability among disparate vendors’ platforms.”
- Roy Shulte (Gartner), Review of AMQP and other protocol options.

More: amqp.org

Why Erlang?

All major implementations of AMQP such as Open AMQP, Qpid and RedHat MRG are built on a foundational technology. All but one are built on C++. RabbitMQ is built on Erlang. Erlang insures concise binary matching, has built-in support for concurrency, distribution and fault tolerance, has a built-in distributed database, is open source, and operates on many platforms. Erlang is also a runtime environment, similar to the java virtual machine, allowing code compiled on one architecture to run anywhere. The runtime system also allows code in a running system to be updated without interrupting the program.

More: erlang.org

Why RabbitMQ?

Many large-scale projects are using RabbitMQ for complex, scalable, cloud topologies, such as Observatories Initiative (OOI), NASA Nebula, Hiroku, Unique Identification Authority of India (200 million messages per day peak), and CERN because

  • It’s wicked fast and reliable
  • Supports security and transactions, persistence, client acks, and flow control
  • Widely proven and used heavily on Amazon EC2
  • It is open source with support services and several very active user lists
  • Hugely interoperable
    • RabbitMQ brokers run on many platforms and offer several clients (Java, .net and others), all which speak AMQP protocol to each other. Anyone can write a RabbitMQ client in any language: Ruby, Java, Spring, Python, C, C#, Perl, Erlang, Lisp, Haskell, PHP, _your_language_here.
  • Easily clusterable
  • Works with several protocols: AMQP, STOMP, XMPP, 0MQ
  • Highly-flexible topologies
  • Can work with Pacemaker and associated tools to provide High Availability
  • Supports several extension points via plug-ins, e.g. exchange types and backing queue storage
  • Provides management and monitoring of brokers
    • vFabric Hyperic
    • RabbitMQ command line tool and plugin for real-time activity and management
  • Messaging services hosted in the cloud with RabbitMQ as part of overall cloud infrastructure services are highly-supported

An open source RabbitMQ solution stack might look like this: Spring, Tomcat, Hibernate, MySQL, Hadoop, MapReduce, Chef, Apache httpd, RabbitMQ.

More: rabbitmq.com

Resource List And Use Cases

Share/Save

Posted in AMQP, Cloud, Cloud Ops, Concurrency, Erlang, Messaging, Open Source, RabbitMQ | No Comments »

Future Posts: Esper, RabbitMQ for Cloud Messaging

Posted by Helena Edelson on 9th May 2010

If I had more time I would add a few new posts on esper and rabbitmq for messaging in the cloud but I just got home from 2 weeks of coast-to-coast consulting and technology presentations. Pretty exhausting. I hope to have time to post on these and a few other fun technologies. I’ve got some code samples and tests I might post with that, plus some cools stuff on esper queries with Spring Integration.

Share/Save

Posted in Concurrency, Erlang, Java, Messaging, RabbitMQ | 2 Comments »

Simple Asynchronous Processing with Spring’s TaskExecutor

Posted by Helena Edelson on 27th January 2010

This post is merely meant as a starting guide to tinkering for a light-weight solution to handing off execution of a task for async processing without the overhead of Spring Batch or Spring JMS and Message Brokers, among other middleware solutions.

1. I have a simplistic junit test that merely kicks off the service method to view the path of execution:

/**
 * TaskTests
 *
 * @author Helena Edelson
 * @since v 1.0
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:infrastructure-context.xml"})
public class TaskTests extends BaseTest {
    protected static final Logger logger = Logger.getLogger(TaskTests.class);
    @Autowired private OrderService orderService;

    @Test
    public void testExecution(){
        logger.debug("Starting execution thread...");
        orderService.dispatch(new Order());
    }
}

2. A simple context config:

<?xml version=”1.0″ encoding=”UTF-8″?>
<beans xmlns=”http://www.springframework.org/schema/beans”
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xmlns:context=”http://www.springframework.org/schema/context”
xmlns:task=”http://www.springframework.org/schema/task”
xmlns:aop=”http://www.springframework.org/schema/aop”
xmlns:p=”http://www.springframework.org/schema/p”
xsi:schemaLocation=”
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd”>

<bean id=”taskExecutor” class=”org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor” p:corePoolSize=”5″ p:maxPoolSize=”25″/>

<!– OR alternately: Creates a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor –>
<task:executor id=”taskExecutor” pool-size=”5-25″ queue-capacity=”100″ rejection-policy=”CALLER_RUNS”/>

</beans>

3. OrderService that delegates to the ThreadPoolTaskExecutor:

/**
 * OrderServiceImpl is used for both orders and returns
 *
 * @author Helena Edelson
 * @MessageEndpoint which is a @Component
 * @since Dec 29, 2009
 */
@Service("orderService")
public class OrderServiceImpl implements OrderService {
    private OrderDao orderDao;
    private MerchantService merchantService;
    private ReceivingService receivingService;
    @Autowired private TaskExecutor taskExecutor;

    @Autowired
    public OrderServiceImpl(OrderDao orderDao, ReceivingService receivingService, MerchantService merchantService) {
        this.orderDao = orderDao;
        this.receivingService = receivingService;
        this.merchantService = merchantService;
    }

    public final void dispatch(final Order order) {
        logger.debug("Starting dispatch execution...");

        if (this.taskExecutor != null) {
            this.taskExecutor.execute(new Runnable() {
                public void run() {
                    executorAsync(order);
                }
            });
        }

        logger.debug("Completed dispatch execution...");
    }

    private final void executorAsync(final Order order) {
        logger.debug("Starting Async execution...");

        daoDatasourceOne.createOrder(order);
        daoDatasourceTwo.createOrder(order);

        logger.debug("Completed Async execution...");
    }

/* Where the output will be: Note the dispatch method returns control to its caller before the async method begins:
2010-01-27 13:23:27,546 [main] DEBUG org.springsource.oms.infrastructure.TaskTests  - Starting execution thread...
2010-01-27 13:23:27,546 [main] DEBUG org.springsource.oms.domain.services.OrderServiceImpl  - Starting dispatch execution...
2010-01-27 13:23:27,546 [main] DEBUG org.springsource.oms.domain.services.OrderServiceImpl  - Completed dispatch execution...
2010-01-27 13:23:27,546 [taskExecutor-1] DEBUG org.springsource.oms.domain.services.OrderServiceImpl  - Starting Async execution...
persisting org.springsource.oms.domain.entities.Order@1f10a67
*/

/**
* Alternately for a different scenario you can play around with this:
*/
public void withExecutor(final Order order) {
        try {
            CompletionService completionService = new ExecutorCompletionService(taskExecutor);

            Object result1 = completionService.submit(new Callable() {
                public Object call() {
                    return daoDatasourceOne.createOrder(order);
                }
            });
            Object result2 = completionService.submit(new Callable() {
                public Object call() {
                    return daoDatasourceTwo.createOrder(order);
                }
            });

            completionService.take().get();
            completionService.take().get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

I recommend looking into the @Async annotation, which I will post on shortly. In the meantime here is the ref page for Spring Task and Scheduling: http://static.springsource.org/spring/docs/3.0.x/reference/html/scheduling.html

Share/Save

Posted in Annotations, Concurrency, Java, Spring, Spring Task | No Comments »

The Basics of Multithreaded Code for Java Applications

Posted by Helena Edelson on 20th September 2008

First, you can count on very little in Java Threading. There are two ways to define and instantiate a Thread, but only the second, in my opinion, is an option for code in an application. Option one: you can create a class that extends Thread and call Thread’s run method:

public class SomeObject extends Thread {
public void run() { // ... }
}

Then all you need to do in your caller code is:

SomeObject so = new SomeObject();
so.run();

The problem with this option for applications is in the design. In Java inheritance is 1:1, one parent: one child. So if SomeObject needed to extend a super class for design purposes but you were relying on extending class Thread for your Threading implementation your stuck. In a good design you would extend a class because the child class needs to embody a more granular behavior. Within the bounds of that construct, SomeObject would only offer more specialized Thread behavior, which might be a rare use case in practice.

Option two: you can implement the Runnable interface. In the paradigm of programming to interfaces as best practices, this is the strategy to choose.

public class SomeObject implements Runnable {
public void run() { // ... }
}

To instantiate and start using SomeObject now, you create your object the same way, but you need to instantiate a Thread and pass your job to the thread to run it.

SomeObject runnable = new SomeObject();
Thread thread = new Thread(runnable);
thread.run();

What this does is tell the new thread to run your job, so you are simply using another Thread constructor and passing in a runnable target. The Thread class implements Runnable which means Thread is of type Runnable so say I have a method like this one below, callers can pass in a list of Runnables or Threads:

public void doSomething(List<Runnable> jobs){ // ... }

So far we have a Thread in the “new” state. So now you want to actually start the thread of execution to get it to the next state.

thread.start();

Now thread is in the “alive” state, at which point if you called thread.isAlive() the return is true. When a thread starts to run, it creates its own stack for the thread of execution, as there is always one stack per thread, as it is itself an individual process. A Thread will run through its full execution and once completed, is considered “terminated” and a call to thread.isAlive() returns false. But the Thread object is still an object, and not null, just no longer a thread of execution. However if you call the start() method on it again it will throw an IllegalThreadStateException, whether or not the run() method has completed.

So putting it all together using the second version of SomeObject that implements Runnable is the code below but note that the run() method is implicit, it is the method that Thread knows to invoke:

SomeObject runnable = new SomeObject();
Thread t = new Thread(runnable);
t.start();

If I invoke thread.run() on the thread above, it will not create its own call stack or start a new thread, it just means you are invoking a method from the thread that is currently executing.

So now lets create multiple threads and name them so we can test which threads are running:

public class SomeObject implements Runnable {
public void run() {
for(int i = 1; i <=3; i++) {
System.out.println(this.getClass().getName() + " is being run by " + Thread.currentThread().getName() + " and x = " + x);
}}}

public class Service {
public void foo() {
SomeObject runnable = new SomeObject();
Thread t1 = new Thread(runnable);
// you could set a name or a unique one is autogenerated
//t1.setName(IdGeneratorUtil.createUniqueStringId());
Thread t2 = new Thread(runnable);
Thread t3 = new Thread(runnable);

List<Thread> threads = new ArrayList<Thread>();
threads.add(t1);
threads.add(t2);
threads.add(t3);

JobRunner jobRunner = new JobRunner();
jobRunner.runJobs(threads);
}}


public class JobRunner {

public void runJobs(List<Thread> jobs) {
for(Runnable job : jobs) {
// Stack A
System.out.println(this.getClass().getName() + is running + Thread.currentThread().getName());
// Creates stack B
jobs.start();
}}

What you would see printed out if you ran Service’s foo() method multiple times or on multiple servers would not be consistent. You can count on very little in Java Threading. Threads are run by the Scheduler, which like Garbage Collection, can’t be controlled. So even through we start threads 1,2,3. Queue behavior is typical but not guaranteed. In fact it is more a pool than a queue.

Thread Scheduling

The thread scheduler decides which runnable thread will run. So a thread must be in the runnable state to be executed. You can’t control when it is run, but you can influence the scheduler with these methods in the Thread class:

The sleep and yield methods are static and never affect a thread instance but rather are called for the currently executing thread

static: sleep(long millis) - has overloaded methods
static: yield()

join() - has overloaded methods
setPriority(int priority)

as well as these from the Object class, inherited by all:
wait() - has overloaded methods
notify()
notifyAll()

States and Transitions
There are five thread states: New, Runnable, Running, Waiting/blocked/sleeping

  • New: a thread is instantiated but start() has not been called yet. The thread is not alive.
  • Runnable: thread is eligible to run because the start() method has been invoked but the scheduler has not set it as the running thread. The thread is alive.
  • Running: the scheduler selected the thread from the runnable pool and it is executing
  • Waiting/blocking/sleeping: the thread is not eligible to run. The thread is alive, but not runnable but might return to a runnable state: i.e. it could be blocked and waiting for a resource, or the object’s lock becomes available, or the thread is sleeping.
  • Dead: a thread’s run method completes. Thread is not alive (shocker…) but is still a valid object on the heap

Sleep

Use the sleep() method to basically slow the currently executing thread down. The thread will transition to sleep mode for x duration then returns to the runnable pool. Sleep is guaranteed but it may be interrupted before the sleep duration has completed so we put it in a try catch, but also it is not guaranteed that after the sleep duration it will be Running:

try {
Thread.sleep(5*60*1000); // 5 minutes
} catch (InterruptedException e) { }

Yeild
Threads run in some sort of priority which is never guaranteed. Really you just implement priorities as an attempt to gain efficiency. The default priority will be the priority of the thread of execution that creates it. To override this you simply code:

thread.setPriority(9); // a positive integer, generally 1 - 10
thread.start();

There are 3 constants from Thread you can use as well:
1 Thread.MIN_PRIORITY
5 Thread.NORM_PRIORITY
10 Thread.MAX_PRIORITY

yield() transitions the Running thread back to the Runnable pool which gives threads of the same priority a chance to run. This method causes a thread to transition from running to runnable or it may have no effect at all.

Join
Use the join() method when you need to start a thread but not have it run until other threads have finished. Thread A and B are started but B should not do its job until A has finished so you would join B to thread A: B will not be runnable until A is dead:

Thread t  = new Thread();
t.start();
t.join(); // joins to the currently executing thread

or you can pass in a duration to have thread B execute after A but only wait for d duration. Join is guaranteed to cause thread B to stop executing until A completes.

Code Synchronization

What we are doing is synchronizing access to mutable data. Say two threads have a reference to the same AccountService instance and both threads are calling methods on accountService.withdraw(), or on an Account object and both threads call account.setDeposit(..). What happens is the object’s state becomes dirty and its data corrupted which is less than awesome. So there are two ways we avoid that: synchronizing a method:

public synchronized void withdraw(int amount) {
if (account.getBalance() >= amount) { account.withdrawAmount(amt);} else { }

}

public synchronized int getBalance() { //…}

or synchronizing a block of code:

public void doSomething(){
doingStuff();
synchronized(this) { moreStuff(); }
}

It is not enough to synchronize the writes, you must synchronize the reads or your code is not effective. Using the volatile modifier on a boolean field guarantees that a thread reading that field will see the most recent value. Here is a cool example from Josh Bloch: you could make nextSerialNum volatile but not a good idea:

private static volatile int nextSerialNum = 0;
public static int getSerialNumber(){ return nextSerialNum++ }

even better is to synchronize it, but even better is to use AtomicLong from the java.util.concurrent.atomic package which handles everything:

private static final AtomicLong nextSerialNum = new AtomicLong();
public static long generateSerialNumber() { return nextSerialNum.getAndIncrement(); }

Object Locks and Synchronization

  • Every java object has a built in lock that is activated when that object has synchronized code in it, and there is only one lock per object
  • Execution entering a non-static synchronized method automatically acquires a lock associated with the current object instance
  • Only methods or blocks can be synchronized
  • Multiple threads can access non synchronized methods of that object but you only need to synchronize critical data vs all methods
  • If a thread has a lock on an object and it goes to sleep it retains the lock
  • A thread can get more than one lock by entering a synchronized method of say AccountService then a synchronized method of Account

Generally, synchronization can get hairy. I’d suggest creating stateless services and not have to worry about that if possible.

Keeping Things Thread-Safe

Where applicable:

  • Methods accessing writable fields need to be synchronized
  • Static fields are accessed from static synchronized methods
  • Non-static fields are accessed from non static synchronized methods

Even if I write something like:

public class MyClass {
private List<Stuff> things = Collections.synchronizedList(new ArrayList<Stuff>());
public void addStuff(Stuff s) { things.add(s); }
public Object removeFirstItem() { return things.size() > 0 ? things.remove(0) : Collections.EMPTY_LIST; }
}

without synchronizing the methods, I can have corrupted data from multithreaded mayhem. So don’t rely on the Collections.synchronized* static method.

I think the key here is read Josh Bloch, “Effective Java”, the concurrency section and code and test out the samples. I won’t go into it for redundancy reasons but…its really good and leads down the right road for writing and designing properly. I’m sure there are many many others as well.

Deadlocking

Just don’t write code that will deadlock, stupid things like nesting synchronized blocks and so forth. And if you do, use AOP to get out of it.

Thread Messaging

Object as noted above has the wait(), notify(), and notifyAll() methods that any pojo can call to help threads message each other. Say one object has to keep checking for received stock updates to then go off and do some analysis and generate an update to the client. It could keep polling or something or it could wait for notification, saving resources for work ready to be done. The important thing here is this: synchronization is required for communication between threads: wait(), notify(), and notifyAll() MUST be wrapped in synchronized code because a thread can’t get a lock on the object to do these unless it own the object lock.

Thread b = new Thread();
b.start();

synchronized(b) { // this has to own the lock on b in order to have it wait
try { b.wait(); } catch (InterruptedException e) { // waits so other threads can execute, then this has to  }
}
synchronized(this) {
notify(); // notifies b which is waiting for word from this, which told it to wait
}

This makes a bit more sense:


public class Executor extends Thread {
public void run(){
while (true) { JobRunner runner = new JobRunner(); runner.add(new Job(JobInfo i); }
}
}


public class  JobRunner extends Thread {
private List<Job> jobs = new ArrayList<Job>();


public void addJob(Job job) {

synchronized(jobs) { jobs.add(job); job.notify(); }

}

public void run(){
while (true) {
synchronized (jobs) {
// wait until at least one job is available
while (jobs.isEmpty()) { try { jobs.wait(); } catch( InterruptedExeption e) { //..}}
// if we get here jobs is not empty
Job job = jobs.remove(0);
}}}

So the key here is if you use wait() and notify()

  • Always wrap them in a loop that checks if the condition you are waiting for is true, otherwise keep waiting
  • Always wrap wait() and notify()  in a synchronized block, getting a lock first on the thread you wish to have wait or be notified
  • Always wrap wait() in a try catch in case to handle thread interruption

Share/Save

Posted in Concurrency, Java | No Comments »

 
a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page");