Getting Started

To get started with the Couchbase Spark connector quickly, learn how to add the connector to your Spark project and run simple queries.

Quickstart

Because most people using Spark are in the Scala ecosystem, the following examples use Scala and its sbt dependency manager.

Create a new sbt project and add the following content to the build.sbt file. This code includes all the Spark dependencies, as well as the Couchbase Spark connector. Just enough to get you started!

Here is a reference to the Scala docs.

name := "my-first-couchbase-spark-project"

organization := "my.organization"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0",
  "org.apache.spark" %% "spark-streaming" % "2.2.0",
  "org.apache.spark" %% "spark-sql" % "2.2.0",
  "com.couchbase.client" %% "spark-connector" % "2.2.0"
)

Now, under src/main/scala/, create a Quickstart.scala class with the following skeleton:

object Quickstart {

	def main(args: Array[String]): Unit = {

	}

}

If you are not familiar with Scala, this is more or less the equivalent to Java’s public static void main(String[] args) method. Because empty methods make the compiler sad, fill it with proper code now.

When it comes to Spark, you always need to set up a configuration and initialize the SparkSession (or as in pre-2.0 versions the SparkContext directly) object. The following snippet does that and also instructs the Couchbase Spark connector to open a bucket in the background.

// Configure Spark
val spark = SparkSession
      .builder()
      .appName("KeyValueExample")
      .master("local[*]") // use the JVM as the master, great for testing
      .config("spark.couchbase.nodes", "127.0.0.1") // connect to Couchbase Server on localhost
      .config("spark.couchbase.username", "Administrator") // with given credentials
      .config("spark.couchbase.password", "password")
      .config("spark.couchbase.bucket.travel-sample", "") // open the travel-sample bucket
      .getOrCreate()

// The SparkContext for easy access
val sc = spark.sparkContext

The above example assumes Couchbase Server 5.X+ is being used, and hence a username and password are required. If using an earlier version of Couchbase Server, the username and password lines should be omitted and a non-empty bucket password may be required.

By default, the Spark Connector connects to the default bucket. However, this example uses actual data from the travel-sample bucket that ships with Couchbase Server, so the code connects to that bucket. The Couchbase Spark connector also supports opening more buckets in parallel.

Creating a JAR for use with spark-submit

By compiling the application into a JAR file, it can be run with the standard Spark tool spark-submit.

In this case, the Spark dependencies can be declared as provided, as they will be supplied by spark-submit:

name := "my-first-couchbase-spark-project"

organization := "my.organization"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.2.0" % "provided",
  "com.couchbase.client" %% "spark-connector" % "2.2.0"
)

spark-submit allows the Spark session’s configuration to be supplied on the command line, so the Quickstart example above can be simplified to:

// Configure Spark
val spark = SparkSession
      .builder()
      .appName("KeyValueExample")
      .getOrCreate()

The next step is to build a “fat JAR” containing all the application’s dependencies so it can easily be run with spark-submit. One good option is to use the sbt-assembly plugin.

Finally, the created JAR can be run, supplying the required Couchbase configuration options:

spark-submit --master local[*]
                     --conf spark.couchbase.username=Administrator
                     --conf spark.couchbase.password=password
                     --conf spark.couchbase.bucket.travel-sample=""
                     --class Quickstart target/scala-2.11/quickstart-assembly-0.1.jar

Creating and saving RDDs

Now that the SparkContext is created, you can perform operations against Couchbase. A common scenario is to create resilient distributed datasets (RDDs) out of documents stored in Couchbase. The easiest one is probably by passing in Document IDs as strings.

Before getting further into the actual code, make sure to have the following items imported, because otherwise the implicit methods won’t be available.

import com.couchbase.spark._

Use the couchbaseGet method on the SparkContext to fetch documents from Couchbase and create an RDD.

sc
	.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
	.collect()
	.foreach(println)

Make sure to specify which document type you want (in this case, a JsonDocument). If you forget to specify the document type, an exception is thrown because the connector does not know what format to use for the results. Then call collect() to aggregate the results and print them out to the command line.

Spark outputs lots of information, so you’ll find the output somewhere in the logs:

JsonDocument{id='airline_10748', cas=314538279043072, expiry=0, content={"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}}
JsonDocument{id='airline_10123', cas=314538278125568, expiry=0, content={"country":"United States","iata":"TQ","name":"Texas Wings","callsign":"TXW","icao":"TXW","id":10123,"type":"airline"}}

But since just loading data is half the fun, the connector also provides a convenient way to save documents. The following code loads documents like before, but then modifies its contents and ID before saving it back. You can imagine taking any kind of data source, mapping them to documents and just storing them back in Couchbase.

sc
  .couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
  .map(oldDoc => {
    val id = "my_" + oldDoc.id()
    val content = JsonObject.create().put("name", oldDoc.content().getString("name"))
    JsonDocument.create(id, content)
  })
  .saveToCouchbase()

We utilize the saveToCouchbase() method available on our RDD to store a modified version of our original JsonDocument. Go find your modified document in the Couchbase Server UI! Look for "my_airline_10123" which will just have the name of the airline as its content.

Congratulations! You’ve successfully performed your first ETL job (extract-transform-load) using Couchbase and Spark. Next up is a whirlwind tour of N1QL and Spark DataFrames.

Working with DataFrames

DataFrames were introduced in Spark 1.3 and have matured even further in Spark 1.4. The nature of the queries fits very well with what Couchbase N1QL provides.

To try this, you need Couchbase Server version 4.0 or later.
You need to at least have a primary index created on the travel-sample bucket to make the following examples work. If you haven’t done already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.

In older Spark versions you had to create a SQLContext like this:

val sql = new SQLContext(sc)

But if you are using the SparkSession you can access most of the methods directly from it. Note that you can also always get the SQLContext out of the session via:

val sql = spark.sqlContext

Also, don’t forget the Couchbase imports again for all the automatic method goodness:

import com.couchbase.spark.sql._

Because a DataFrame is like an RDD but with a schema and Couchbase is a schemaless database at its heart, you need a way to either define or infer a schema. The connector has built-in schema inference, but if you have a large or diverse data set, you need to give it some clues on filtering.

Suppose you want a DataFrame for all airlines, and you know that the JSON content has a type field with the value airline. You can pass this information to the connector for automatic schema inference:

// Create a DataFrame with Schema Inference
val airlines = sql.read.couchbase(schemaFilter = EqualTo("type", "airline"))

// Print The Schema
airlines.printSchema()

The code automatically infers the schema and prints it in this format:

root
 |-- META_ID: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)

Next you can perform an actual query where you are interested only in the name and callsign. This example sorts it by the callsign and loads only the first 10 rows.

airlines
  .select("name", "callsign")
  .sort(df("callsign").desc)
  .show(10)

The code prints the results on the console like this:

+-------+--------------------+
|   name|            callsign|
+-------+--------------------+
|   EASY|             easyJet|
|   BABY|             bmibaby|
|MIDLAND|                 bmi|
|   null|          Yellowtail|
|   null|               XOJET|
|STARWAY|   XL Airways France|
|   XAIR|            XAIR USA|
|  WORLD|       World Airways|
|WESTERN|    Western Airlines|
|   RUBY|Vision Airlines (V2)|
+-------+--------------------+

Working with Datasets

Spark 1.6 introduces Datasets, a typesafe way to work on top of Spark SQL. Since they are built on top of DataFrames, using them with Couchbase is easy.

The following example creates a Dataset out of a Dataframe and maps it to a case class. It then uses the case class to extract fields out of the result set in a typesafe way:

// Spark SQL Setup
val spark: SparkSession = .... /*setup your spark session as usual*/
import spark.implicits._

val airlines = sql.read.couchbase(schemaFilter = EqualTo("type", "airline")).as[Airline]

// Print schema
airlines.printSchema()

// Print airlines that start with A
println(airlines.map(_.name).filter(_.toLowerCase.startsWith("a")).foreach(println(_)))