Spark Streaming Integration

The Couchbase Spark connector works with Spark Streaming by using the Couchbase Server replication protocol (called DCP) to receive mutations on the server side as they happen and provide them to you in the form of a DStream. The DStream is the primary format used by Spark Streaming. You can create and persist DStreams.

DStream Creation

Creating DStreams from DCP through Couchbase is still rough around the edges and experimental. We appreciate any feedback until we stabilize this module.

The following example will print count all mutations that have happened and all mutations which are going to happen. You can then utilize them through all the common DStream methods.

// Create the Spark Config and instruct to use the travel-sample bucket
  // with no password.
  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("StreamingExample")
    .set("com.couchbase.bucket.travel-sample", "")

  // Initialize StreamingContext with a Batch interval of 5 seconds
  val ssc = new StreamingContext(conf, Seconds(5))

  // Consume the DCP Stream from the beginning and never stop.
  // This counts the messages per interval and prints their count.
  ssc
    .couchbaseStream(from = FromBeginning, to = ToInfinity)
    .count()
    .print()

  // Start the Stream and await termination
  ssc.start()
  ssc.awaitTermination()

Don’t forget to add the implicit import:

import com.couchbase.spark.streaming._
In the previous example the connector issues a backfill, which means that all data stored in the bucket will be streamed first, so you might want to try it on an empty bucket or use from = FromNow right away.

DStream Persistence

Since DStreams are RDDs, the same persistence mechanisms apply (see Working with RDDs for more information). Here is an example that streams tweets from Twitter and persists them in a Couchbase bucket:

// Configure Spark
val cfg = new SparkConf()
  .setAppName("TwitterFeed")
  .setMaster("local[*]")
  .set("com.couchbase.bucket.twitter", "")

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val ssc = new StreamingContext(cfg, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, Seq())

val hashTags = stream

hashTags.foreachRDD(rdd => {
  rdd
    .map(status => JsonDocument.create(status.getId.toString, JsonObject.create().put("text", status.getText)))
    .saveToCouchbase()

})

ssc.start()
ssc.awaitTermination()