Structured Streaming Integration

Couchbase allows you to integrate with Spark Structured Streaming as a Source as well as a Sink, making it possible to query incoming data in a structural and efficient manner.

Couchbase as a Structured Streaming Source

Since Spark 2.0 it is possible to combine Spark Streaming and Spark SQL to what is called "structured streaming". You can think of it as a way to operate on batches of a DataFrame where each row is stored in an every growing append-only table. You can use it for all kinds of analysis, including aggregations.

The first thing you need to define is the SparkSession as usual:

// The SparkSession is the main entry point into spark
val spark = SparkSession
  .builder()
  .appName("N1QLExample")
  .master("local[*]") // use the JVM as the master, great for testing
  .config("spark.couchbase.nodes", "127.0.0.1") // connect to couchbase on localhost
  .config("spark.couchbase.bucket.travel-sample", "") // open the travel-sample bucket with empty password
  .getOrCreate()

Next up you need to define a schema. Note that all the records coming in don’t need to fit the schema, attributes not in the schema are ignored and those that do not existed are represented as null:

// Very simple schema, feel free to add more properties here. Properties that do not
// exist in a streamed document show as null.
val schema = StructType(
  StructField("META_ID", StringType) ::
  StructField("type", StringType) ::
  StructField("name", StringType) :: Nil
)

Now we define the format and start the stream from Couchbase:

// Define the Structured Stream from Couchbase with the given Schema
val records = spark.readStream
  .format("com.couchbase.spark.sql")
  .schema(schema)
  .load()

Since it is lazy we also need to consume it. In this simple example we use the built-in logger which dumps everything to the screen. This example also performs grouping first, so it groups every incoming record by the type field and counts them up:

// Count per type and print to screen
records
  .groupBy("type")
  .count()
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()
  .awaitTermination()

If all goes well you’ll see this on the console:

+--------+-----+
|    type|count|
+--------+-----+
|   hotel|  917|
|    null|    1|
|landmark| 4495|
| airline|  187|
| airport| 1968|
|   route|24024|
+--------+-----+

Since it keeps the counts as a total value, if you then modify a document in the UI, for example a airport you’ll see the airport type count increasing by one.

Nearly all of the implementation details are hidden, so please consult the Spark documentation on Structured Streaming for more information. By default if you specify an META_ID field in your schema it will set it as the document ID. You can customize this field via the idField param. Also if you want to start streaming at the current point in time set the streamFrom param to now, by default it will start streaming at the very beginning of the bucket.

Couchbase as a Structured Streaming Sink

In addition to streaming from Couchbase you can also use it as a Sink and store the results of your structured streaming transformation inside Couchbase.

Note that since it is a safe streaming source you need to provide it with a checkpoint location, ideally in a HDFS-compatible file system.

The following example builds on the Spark sample when streaming from a network socket, doing word count and then storing the result in Couchbase.

Define your SparkSession and create the socket stream:

val spark = SparkSession
  .builder
  .master("local[*]")
  .appName("StructuredWordCount")
  .getOrCreate()

import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 5050)
  .load()

Now perform the word count on the split words:

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

Write the result into Couchbase, here we also define that the document ID is extracted from the value out of the dataframe:

val query = wordCounts.writeStream
  .outputMode("complete")
  .option("checkpointLocation", "mycheckpointlocation")
  .option("idField", "value")
  .format("com.couchbase.spark.sql")
  .start()

query.awaitTermination()