Category Archives: Java

Closing Over An Akka Actor Sender In The Receive

Accidental closing over a sender is easy to get wrong but mayhem if you do it. The rule is simply never close over the sender method in a block of code that is potentially executed in another thread, such as a scheduled task or a Future. The trick is to capture the current sender in a val, as illustrated below:

Unsafe


Here are 3 examples of what not to do. It all depends on what is actually done in the receive. For example, if Futures or other threads are involved and a call to forward or sender is executed in callbacks (such as Future.onSuccess) then you have a problem:

1. Calling sender on another thread than the original message in Actor.receive

1
2
3
4
5
6
def receive = {
  case SomeMessage =>
    context.system.scheduleOnce(5 seconds) {
      sender ! DelayedResponse  // Danger!
    }
}

Here the thing to watch out for is { sender ! DelayedResponse } because it is run on another thread than the original message. The sender is associated with current message being processed, and in the Scheduler case illustrated above, several other messages might have been processed during the 5.seconds before the call to the method sender, such that the correct sender is no longer associated with the message that was processed when the scheduled task was triggered.

2. Shared mutable state

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    var state = State()
    def receive = {
      case _ =>
        Future { state = newState }
        otherActor ? message onSuccess { r => state = r }
    }
}

The above code will break your application in weird ways. Because the sender changes for every message this will cause a shared mutable state bug.

3. Calling sender in a Future during Actor.receive could return a null or be the wrong ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ =>  Future { expensiveCalculation(sender) } // painful, and you owe me a beer if you do it
    }
}

The return value of the sender method after processing the current message might be null until the next message is received, but that doesn’t matter. Simply use a val if you close over sender in a a block of code that might be executed by another thread.

Safe


Here are a few examples that are safe, the first because we simply freeze the sender reference:

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
    def receive = {
        case SomeMessage =>
            val requestor = sender
            context.system.scheduleOnce(5 seconds) {
              requestor ! DelayedResponse
           }
   }
}

Or better yet, keep a clean Actor.receive all together – a best practice

1
2
3
4
5
6
7
8
class MyActor extends Actor {
    def receive = {
        case msg: SomeMessage => receiveSomeMessage(sender, msg)
   }
  def receiveSomeMessage(requestor: ActorRef, msg: SomeMessage): Unit = context.system.scheduleOnce(5 seconds) {
        requestor ! DelayedResponse
   }
}

It is safe to close over self and a thread-safe ActorRef

1
2
3
4
5
class MyActor extends Actor {
    def receive = {
      case _ => Future { expensiveCalculation() } onComplete { f => self ! f.value.get }
    }
}

It is safe to close over a fixed value and a thread-safe ActorRef

1
2
3
4
5
6
7
class MyActor extends Actor {
    def receive = {
      case _ =>
        val currentSender = sender // freeze the sender to a val
        Future { expensiveCalculation(currentSender) }
    }
}

Find Out More

Some examples were taken from “Akka Concurrency” by Derek Wyatt, others from the Akka Docs.

Share/Save

AMQP and RabbitMQ basic.ack, basic.reject and requeue

Exposing basic.ack and basic.reject behavior in an AMQP-based messaging abstraction can be critical.

Some misunderstandings about ack/reject: first, on the issue of either being an ‘extra’ operation – An AMQP ack, reject, or Rabbit nack to an AMQP broker is not equal to publishing a message to a broker. On the issue of this being an extraneous operation, when a consumer receives a message (removing it from a queue), the Rabbit server is aware of that and an acknowledge is needed to confirm this. Automatically ack-ing all received messages is not a case I agree with, no matter what abstraction you provide, nor is it good practice. It should to be configurable and provide the ability to do error handling in unsuccessful application scenarios.

basic.ack
Using the rabbit java client: where false is for multiple acks

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

basic.reject

channel.basicReject(delivery.getEnvelope().getDeliveryTag(), requeue);

Maybe an application is in a post-stop/pre-destroy phase. Or perhaps the message content was inappropriate (an error in the sender that composed the message) – nothing whatsoever to do with routing, AMQP, Rabbit but a user/programmer error, a lifecycle issue, and so forth.

basic.reject and requeue
basic.reject could either redeliver the message or dead-letter it

a) If you supply requeue=false the broker will discard the message – This is useful from a error handling point of view; if your application cannot process a particular message, you can get rid of it.

b) if you supply requeue=true, it will release it back on to the queue, to be delivered again – This  is useful for a consumer to decide not to deal with a delivered message, and place it back on the queue.

Rabbit re-enqueues the message and treats it as though it were completely new. This means a consumer can receive it again when message it has rejected. This could be a consumer for a ‘vcaf instance queue’ where for some reason, maybe it is dealing with a fatal error and going to be restarted, should send a basic.reject and requeue=true so that when it is restarted and initialized, it can successfully re-receive the message and process it.

AsyncConsumer consumer = new  AbstractQueueingConsumer(connectionFactory, queue, autoAck) {
     @Override public void handle(Channel channel,  QueueingConsumer.Delivery delivery) {
             ...application logic...
             if (some application case...) reject(channel,  delivery, true); //boolean requeue
             else acknowledge(channel, delivery);
        }
};

Share/Save

Building RabbitMQ Java Client Source from GitHub on Mac

For anyone that thinks of Python as a large snake, here are the steps I took to set up building the Rabbitmq java client on my mac with GitHub sources.

First, macs now come OOTB with python installed so you’re good there unless you wish to update the package. Simply type ‘python’ at the command line to see what you have. Go to http://pypi.python.org/pypi/simplejson#downloads to get the URI to pull for the source.

hedelson$ wget http://pypi.python.org/packages/source/s/simplejson/simplejson-{latest.version}.tar.gz#md5={autogenerated-hash}
hedelson$ cd /path/to/simplejson-2.1.2/
hedelson$ sudo python ez_setup.py
hedelson$ python setup.py build
hedelson$ python setup.py install

If you don’t have this already, create a local repo for rabbitmq-codegen:

hedelson$ git clone git://github.com/rabbitmq/rabbitmq-codegen.git
hedelson$ cd rabbitmq-codegen

To set up a fresh RabbitMQ java client repo from github and build from source (read only. From a fork is a bit different)

hedelson$ git clone git://github.com/rabbitmq/rabbitmq-java-client.git
hedelson$ cd /path/to/rabbitmq-java-client
hedelson$ ant dist

Share/Save

Configuring @Configuration ApplicationContext for Spring Test Framework @ContextConfiguration

Here is a @Configuration class, RabbitTestConfiguration, truncated for the sake of a simple example. Bootstrapping this for testing using Spring’s Test Framework is simple. First make sure you have @ImportResource mapping via classpath to your xml which here has 2 simple declarations:


<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
xmlns="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<component-scan base-package=”org.hyperic.hq.plugin.rabbitmq”/>

<property-placeholder location=”/etc/test.properties”/>

</beans:beans>

Next, make sure you remove the @Configuration annotation declared at the class level. We will be bootstrapping this a different way.

@ImportResource("classpath:/org/hyperic/hq/plugin/rabbitmq/*-context.xml")
public class RabbitTestConfiguration {

    private @Value("${hostname}") String hostname;

    private @Value("${username}") String username;

    private @Value("${password}") String password;

    @Bean
    public SingleConnectionFactory singleConnectionFactory() {
        SingleConnectionFactory connectionFactory = new SingleConnectionFactory(hostname);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
// ... shortened for brevity
}

Now let’s build an abstract Spring base test

/**
 * AbstractSpringTest
 * @author Helena Edelson
 */
@ContextConfiguration(loader = TestContextLoader.class)
@RunWith(SpringJUnit4ClassRunner.class)
public abstract class AbstractSpringTest {
    /** Inheritable logger */
    protected final Log logger = LogFactory.getLog(this.getClass().getName());

    /** Now we can autowire our beans that all child tests will need. Note that they are protected. */
    @Autowired
    protected org.springframework.amqp.rabbit.connection.SingleConnectionFactory singleConnectionFactory;

   @Before
    public void before() {
        assertNotNull("singleConnectionFactory should not be null", singleConnectionFactory);
        //... more assertion checks for other beans, removed for brevity.
    }

And finally, build a test context loader, override customizeContext() and bootstrap your annotational config class. Since the config class bootstraps the minimal context xml config, now we’re all set.

/**
 * TestContextLoader
 * @author Helena Edelson
 */
public class TestContextLoader extends GenericXmlContextLoader {

    @Override
    protected void customizeContext(GenericApplicationContext context) {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        ctx.register(RabbitTestConfiguration.class);
        ctx.refresh();
        /** This is really the key */
        context.setParent(ctx);
        assertTrue(ctx.isRunning());
    }
}

Now all of my Spring test classes can simply extend the base class and freely call the inherited shared beans or declare any @Autowired dependency

public class RabbitGatewayTest extends AbstractSpringTest {
     @Autowired protected Queue marketDataQueue;

    @Test
    public void getConnections() throws Exception {
        com.rabbitmq.client.Connection conn = singleConnectionFactory.createConnection();
        // ... etc
    }
}

Share/Save

Open Source Artificial Intelligence for the Cloud

The idea of introducing intelligent computing so that software can make decisions autonomously and in real time is not new but it is relatively new to the cloud. Microsoft seems to be the strongest force in this realm currently but this is not open source. The idea of clouds that manage themselves steps way beyond Cloud Ops as we know it, and is the only logical, and necessary, next step.

I’ve been researching various open source AI platforms and learning algorithms to start building a prototype for cloud-based learning and action for massive distributed Cloud Ops systems and applications. Once could off er an AI cloud appliance eventually but I will start with a prototype and build on that using RabbitMQ (an AMQP messaging implementation), CEP, Spring, Java as a base. I’ve been looking into OpenStack, the open source cloud computing software being developed by NASA, RackSpace, and many others. Here is the future GitHub repo: http://github.com/helena/OCAI.

Generally to do these sort of projects you need a large amount of funding and engineers with PhD’s, none of which I have. So my only alternative is to see how we might take this concept and create a light weight open source solution.

Imagine having some of these (this being not a comprehensive list) available to your systems:

AI Technology

  • Planning and Scheduling
  • Heuristic Search
  • Temporal Reasoning
  • Learning Models
  • Intelligent Virtual Agents
  • Autonomic and Adaptive Clustering of Distributed Agents
  • Clustering Autonomous Entities
  • Constraint Predictability and Resolution
  • Automated Intelligent Provisioning
  • Real-time learning
  • Real-world action
  • Uncertainty and Probabilistic Reasoning
  • Decision Making
  • Knowledge Representation

I will go into these topics further in future posts.

About me: I am currently an engineer at SpringSource/VMware on the vFabric Cloud Application Platform side of things, however this project is wholly outside of that, I feel the need to pursue something I’m interested in. I work with the RabbitMQ team and a member of the AMQP Working Group.

Share/Save

Future Posts: Esper, RabbitMQ for Cloud Messaging

If I had more time I would add a few new posts on esper and rabbitmq for messaging in the cloud but I just got home from 2 weeks of coast-to-coast consulting and technology presentations. Pretty exhausting. I hope to have time to post on these and a few other fun technologies. I’ve got some code samples and tests I might post with that, plus some cools stuff on esper queries with Spring Integration.

Share/Save

Spring Sample App

I’ve been traveling a lot as a Senior Consultant with SpringSource and many of the engineers that have been at my private and public engagements on core spring and enterprise integration with spring have requested a copy of my sample app that I use to demonstrate configurations, run demos and tests, etc.

This week I’m finally getting it finished and available to check out from svn. I’ve had a project set up in one of our public-facing repositories, the code will be available next week. Shoot me an email via my SpringSource address – attendees have that email, or find me on linkedin.com

May 10 I officially move into engineering!

Share/Save

Simple Asynchronous Processing with Spring’s TaskExecutor

This post is merely meant as a starting guide to tinkering for a light-weight solution to handing off execution of a task for async processing without the overhead of Spring Batch or Spring JMS and Message Brokers, among other middleware solutions.

1. I have a simplistic junit test that merely kicks off the service method to view the path of execution:

/**
 * TaskTests
 *
 * @author Helena Edelson
 * @since v 1.0
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:infrastructure-context.xml"})
public class TaskTests extends BaseTest {
    protected static final Logger logger = Logger.getLogger(TaskTests.class);
    @Autowired private OrderService orderService;

    @Test
    public void testExecution(){
        logger.debug("Starting execution thread...");
        orderService.dispatch(new Order());
    }
}

2. A simple context config:

<?xml version=”1.0″ encoding=”UTF-8″?>
<beans xmlns=”http://www.springframework.org/schema/beans”
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xmlns:context=”http://www.springframework.org/schema/context”
xmlns:task=”http://www.springframework.org/schema/task”
xmlns:aop=”http://www.springframework.org/schema/aop”
xmlns:p=”http://www.springframework.org/schema/p”
xsi:schemaLocation=”
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd”>

<bean id=”taskExecutor” class=”org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor” p:corePoolSize=”5″ p:maxPoolSize=”25″/>

<!– OR alternately: Creates a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor –>
<task:executor id=”taskExecutor” pool-size=”5-25″ queue-capacity=”100″ rejection-policy=”CALLER_RUNS”/>

</beans>

3. OrderService that delegates to the ThreadPoolTaskExecutor:

/**
 * OrderServiceImpl is used for both orders and returns
 *
 * @author Helena Edelson
 * @MessageEndpoint which is a @Component
 * @since Dec 29, 2009
 */
@Service("orderService")
public class OrderServiceImpl implements OrderService {
    private OrderDao orderDao;
    private MerchantService merchantService;
    private ReceivingService receivingService;
    @Autowired private TaskExecutor taskExecutor;

    @Autowired
    public OrderServiceImpl(OrderDao orderDao, ReceivingService receivingService, MerchantService merchantService) {
        this.orderDao = orderDao;
        this.receivingService = receivingService;
        this.merchantService = merchantService;
    }

    public final void dispatch(final Order order) {
        logger.debug("Starting dispatch execution...");

        if (this.taskExecutor != null) {
            this.taskExecutor.execute(new Runnable() {
                public void run() {
                    executorAsync(order);
                }
            });
        }

        logger.debug("Completed dispatch execution...");
    }

    private final void executorAsync(final Order order) {
        logger.debug("Starting Async execution...");

        daoDatasourceOne.createOrder(order);
        daoDatasourceTwo.createOrder(order);

        logger.debug("Completed Async execution...");
    }

/* Where the output will be: Note the dispatch method returns control to its caller before the async method begins:
2010-01-27 13:23:27,546 [main] DEBUG org.springsource.oms.infrastructure.TaskTests  - Starting execution thread...
2010-01-27 13:23:27,546 [main] DEBUG org.springsource.oms.domain.services.OrderServiceImpl  - Starting dispatch execution...
2010-01-27 13:23:27,546 [main] DEBUG org.springsource.oms.domain.services.OrderServiceImpl  - Completed dispatch execution...
2010-01-27 13:23:27,546 [taskExecutor-1] DEBUG org.springsource.oms.domain.services.OrderServiceImpl  - Starting Async execution...
persisting org.springsource.oms.domain.entities.Order@1f10a67
*/

/**
* Alternately for a different scenario you can play around with this:
*/
public void withExecutor(final Order order) {
        try {
            CompletionService completionService = new ExecutorCompletionService(taskExecutor);

            Object result1 = completionService.submit(new Callable() {
                public Object call() {
                    return daoDatasourceOne.createOrder(order);
                }
            });
            Object result2 = completionService.submit(new Callable() {
                public Object call() {
                    return daoDatasourceTwo.createOrder(order);
                }
            });

            completionService.take().get();
            completionService.take().get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }
}

I recommend looking into the @Async annotation, which I will post on shortly. In the meantime here is the ref page for Spring Task and Scheduling: http://static.springsource.org/spring/docs/3.0.x/reference/html/scheduling.html

Share/Save

Getting Started with Spring Integration

If you have Maven installed:

  1. Download the samples
  2. Unzip them
  3. At the command line, cd into the samples dir and enter: mvn install
  4. In your IDE of choice, set up the project from the root pom

If you are familiar with Enterprise Integration Patterns, here are some of the patterns implemented in each sample:

Pattern / Sample Event Driven Consumer Polling Consumer Message Filter Message Translator Content Based Router Splitter Aggregator Channel Adapter Messaging Gateway Service Activator Request/Reply
cafe X X X X X X X X
filecopy X X X X
errorhandling X X X X
helloworld X X
jms X X X X X
oddeven X X X X X
quote X X X
ws X X X X
xml X X X X X X

From Mark Fisher’s Post:

*NOTE: All of the samples feature certain common patterns that are essential to the underlying Spring Integration core:

  • Message: Spring Integration Messages encapsulate a POJO payload and a header Map (Reference).
  • Message Channel: Spring Integration includes many Message Channel options for both point-to-point and publish-subscribe. Some include queues for buffering while others dispatch directly to subscribers (Reference).
  • Message Endpoint: At a high level, this includes all components that connect to channels for input and/or output.
  • Messaging Mapper: Spring Integration binds inbound Messages to method arguments and method return values to Message payloads and/or headers.
  • Message Dispatcher: In Spring Integration, channels that do not have a queue use Message Dispatchers to invoke their subscribers.
  • Pipes and Filters: This is the most general pattern describing Message-processing components connected in a loosely-coupled way via channels.
  • Message Bus: Spring Integration essentially turns a Spring ApplicationContext into a lightweight Message Bus within which all of these other components are hosted.

Share/Save

a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page"); a2a_init("page");