Tag Archives: Spark

Spark Streaming with Kafka and Cassandra

This is a lightning post on a future, more in-depth post on how to integrate Spark Streaming, Kafka and Cassandra, using the Spark Cassandra Connector.

First, here are some of the dependencies used below:

1
2
3
4
5
6
7
8
9
10
11
import com.datastax.spark.connector.streaming._
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkEnv, SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.demo.streaming.embedded._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.demo.Assertions

Start by configuring Spark:

1
2
3
4
5
  val sc = new SparkConf(true)
    .set("spark.cassandra.connection.host", SparkMaster)
    .set("spark.cleaner.ttl", "3600")
    .setMaster("local[12]")
    .setAppName("Streaming Kafka App")

Where ‘SparkMaster’ can be a comma-separated list of Cassandra seed nodes.

Using the SparkConf object you can easily configure as well as populate Cassandra with a keyspace, table and data for quick prototyping. This creates the keyspace and table in Cassandra:

1
2
3
4
5
  CassandraConnector(sc).withSessionDo { session =>
    session.execute(s"CREATE KEYSPACE IF NOT EXISTS streaming_test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3 }")
    session.execute(s"CREATE TABLE IF NOT EXISTS streaming_test.key_value (key VARCHAR PRIMARY KEY, value INT)")
    session.execute(s"TRUNCATE streaming_test.key_value")
  }

Custom embedded class which starts an (embedded local Zookeeper server and an (embedded Kafka server and client (all very basic here):

1
val kafka = new EmbeddedKafka

Create the Spark Streaming context.

1
val ssc =  new StreamingContext(sc, Seconds(2))

One of several ways to insure shutdown of ZooKeeper and Kafka for this basic prototype:

1
SparkEnv.get.actorSystem.registerOnTermination(kafka.shutdown())

Assuming you already have or are creating a new topic in Kafka and are sending messages to Kafka…. but here is a very basic way to do it if not:

1
2
3
4
5
6
7
8
9
10
11
12
def createTopic(topic: String): Unit = 
    CreateTopicCommand.createTopic(client, topic, 1, 1, "0")
 
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
 
    val producer = new Producer[String, String](new ProducerConfig(p))
    producer.send(createTestMessage(topic, sent): _*)
    producer.close()
  }

Where the createTestMessage function just for these purposes generates test data.

Creates an input stream that pulls messages from a Kafka Broker.

1
2
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)

Import the implicits enabling the DStream’s ‘saveToCassandra’ functions:

1
import com.datastax.spark.connector.streaming._

This defines the work to do in the stream. Configure the stream computations (here we do a very rudimentary computation) and write the DStream output to Cassandra:

1
2
3
4
5
6
stream.map { case (_, v) => v }
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("streaming_test", "key_value", SomeColumns("key", "value"), 1)
 
    ssc.start()

Now we can query our Cassandra keyspace and table to see if the expected data has been persisted:

1
2
3
4
5
 import scala.concurrent.duration._
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value")
    awaitCond(rdd.collect.size == sent.size, 5.seconds)
    val rows = rdd.collect
    sent.forall { rows.contains(_)}

And finally, our assertions being successful, we can explicitly do a graceful shutdown of the Spark Streaming context with:

1
ssc.stop(stopSparkContext = true, stopGracefully = true)

Share/Save