Category Archives: Big Data

Changes

It seems the word is out that I left DataStax. I made so many friends there, even recruited an old friend, and know many of us will continue to cross paths and collaborate. Thanks to everyone for the great memories.

Over the last year I have spoken at several Spark, Scala and Machine Learning meetups, then internationally at Scala Conferences like Scala Days Amsterdam, many Big Data conferences such as Spark Summit, Strata and Emerging Tech Conferences, all of which I hope to continue to do.

Spark Summit East 2015

Scala Days Amsterdam, 2015



Philly ETE – Emerging Tech 2015

In October 2015 I’ll be speaking at Spark Summit EU, and on the Spark panel at Grace Hopper. Also I’ll be at Strata NYC again, this year hosting the Spark track on Thursday. I’m adding some cool stuff to my Big Data Lambda talk and new code.

What’s next – I’m humbled to have been contacted by many engineers I know and highly respect about working at their companies, where in some cases they are the founders. This is making for a very difficult decision because they are all doing really interesting and exciting work with open source technologies I like. In the meantime I am taking a month or so to enjoy the last part of summer, something I haven’t had time to do for years :)

Share/Save

Spark Streaming with Kafka and Cassandra

This is a lightning post on a future, more in-depth post on how to integrate Spark Streaming, Kafka and Cassandra, using the Spark Cassandra Connector.

First, here are some of the dependencies used below:

1
2
3
4
5
6
7
8
9
10
11
import com.datastax.spark.connector.streaming._
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkEnv, SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.demo.streaming.embedded._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.demo.Assertions

Start by configuring Spark:

1
2
3
4
5
  val sc = new SparkConf(true)
    .set("spark.cassandra.connection.host", SparkMaster)
    .set("spark.cleaner.ttl", "3600")
    .setMaster("local[12]")
    .setAppName("Streaming Kafka App")

Where ‘SparkMaster’ can be a comma-separated list of Cassandra seed nodes.

Using the SparkConf object you can easily configure as well as populate Cassandra with a keyspace, table and data for quick prototyping. This creates the keyspace and table in Cassandra:

1
2
3
4
5
  CassandraConnector(sc).withSessionDo { session =>
    session.execute(s"CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
    session.execute(s"CREATE TABLE IF NOT EXISTS streaming_test.key_value (key VARCHAR PRIMARY KEY, value INT)")
    session.execute(s"TRUNCATE streaming_test.key_value")
  }

Custom embedded class which starts an (embedded local Zookeeper server and an (embedded Kafka server and client (all very basic here):

1
val kafka = new EmbeddedKafka

Create the Spark Streaming context.

1
val ssc =  new StreamingContext(sc, Seconds(2))

One of several ways to insure shutdown of ZooKeeper and Kafka for this basic prototype:

1
SparkEnv.get.actorSystem.registerOnTermination(kafka.shutdown())

Assuming you already have or are creating a new topic in Kafka and are sending messages to Kafka…. but here is a very basic way to do it if not:

1
2
3
4
5
6
7
8
9
10
11
12
def createTopic(topic: String): Unit = 
    CreateTopicCommand.createTopic(client, topic, 1, 1, "0")
 
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
 
    val producer = new Producer[String, String](new ProducerConfig(p))
    producer.send(createTestMessage(topic, sent): _*)
    producer.close()
  }

Where the createTestMessage function just for these purposes generates test data.

Creates an input stream that pulls messages from a Kafka Broker.

1
2
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)

Import the implicits enabling the DStream’s ‘saveToCassandra’ functions:

1
import com.datastax.spark.connector.streaming._

This defines the work to do in the stream. Configure the stream computations (here we do a very rudimentary computation) and write the DStream output to Cassandra:

1
2
3
4
5
6
stream.map { case (_, v) => v }
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("streaming_test", "key_value", SomeColumns("key", "value"), 1)
 
    ssc.start()

Now we can query our Cassandra keyspace and table to see if the expected data has been persisted:

1
2
3
4
5
 import scala.concurrent.duration._
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value")
    awaitCond(rdd.collect.size == sent.size, 5.seconds)
    val rows = rdd.collect
    sent.forall { rows.contains(_)}

And finally, our assertions being successful, we can explicitly do a graceful shutdown of the Spark Streaming context with:

1
ssc.stop(stopSparkContext = true, stopGracefully = true)

Share/Save

Functional Data Structure Resources

By request, and not exactly the ‘short list’, here are some excellent resources on functional data structures: note that some cost $ but you can view them for 24 hours for a few USD:

These are good too but not directly related:

Share/Save