Java API

In addition to the primary Scala API, the connector provides convenience APIs when accessed from Java.

Couchbase from the SparkContext

To use the Java API in spark, you need to initialize a JavaSparkContext:

SparkConf conf = new SparkConf()
    .setAppName("javaSample")
    .setMaster("local[*]")
    .set("com.couchbase.bucket.travel-sample", "");

JavaSparkContext sc = new JavaSparkContext(conf);

Since Spark 2.0 you can also use the SparkSession:

SparkSession spark = SparkSession
		.builder()
		.appName("JavaExample")
		.master("local[*]") // use the JVM as the master, great for testing
		.config("spark.couchbase.nodes", "127.0.0.1") // connect to couchbase on localhost
		.config("spark.couchbase.bucket.travel-sample", "") // open the travel-sample bucket with empty password
		.getOrCreate();

// The Java wrapper around the SparkContext
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

Since Java doesn’t have the implicit imports like Scala, the connector provides a helper class to achieve similar functionality:

// The Couchbase-Enabled spark context
CouchbaseSparkContext csc = couchbaseContext(sc);

The context is a static import. In general you want to statically import the following:

import static com.couchbase.spark.japi.CouchbaseDocumentRDD.couchbaseDocumentRDD;
import static com.couchbase.spark.japi.CouchbaseSparkContext.couchbaseContext;

Now you can create RDDs through Key/Value, Views or N1QL:

// Load docs through K/V
List<JsonDocument> docs = csc
    .couchbaseGet(Arrays.asList("airline_10226", "airline_10748"))
    .collect();

System.out.println(docs);
// Perform a N1QL query
List<CouchbaseQueryRow> results = csc
    .couchbaseQuery(N1qlQuery.simple("SELECT * FROM `travel-sample` LIMIT 10"))
    .collect();

System.out.println(results);

Mapping RDDs to Couchbase APIs

An RDD can be wrapped with the couchbaseRDD static method to expose all the functions available. So instead of fetching the documents right from the SparkContext it can also be done like this:

import static com.couchbase.spark.japi.CouchbaseRDD.couchbaseRDD;
//...
JavaRDD<String> ids = sc.parallelize(Arrays.asList("airline_10226", "airline_10748"));
docs = couchbaseRDD(ids).couchbaseGet().collect();
System.out.println(docs);

The CouchbaseRDD exposes the following methods:

  • couchbaseGet: Fetch documents via their unique Document ID.

  • couchbaseSubdocLookup: Fetch fragments of a document.

  • couchbaseView: Query a Couchbase View.

  • couchbaseSpatialView: Query a Couchbase Spatial View.

  • couchbaseQuery: Perform a N1QL Query.

Using SparkSQL with Couchbase from Java

Using SparkSQL from Java is possible because the Java API provides wrappers for both the DataFrameReader and DataFrameWriter APIs. All you need to do is wrap the ones that are returned by Spark and wrap them like in the following example to get access to all couchbase specific methods:

import static com.couchbase.spark.japi.CouchbaseDataFrameReader.couchbaseReader;
//...

// Use SparkSQL from Java
SQLContext sql = new SQLContext(sc);

// Wrap the Reader and create the DataFrame from Couchbase
Dataset<Row> airlines = couchbaseReader(sql.read()).couchbase(new EqualTo("type", "airline"));

// Print the number of airline
System.out.println("Number of Airlines: " + airlines.count());

Using Datasets with Couchbase

Since Datasets work with actual Java objects, first create one:

import java.io.Serializable;

public class Airport implements Serializable {
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

Next, you can convert a DataFrame to a Dataset through the .as() API in Spark 1.6:

Dataset<Airport> airports = couchbaseReader(sql.read())
	.couchbase(new EqualTo("type", "airport"))
	.select(new Column("airportname").as("name"))
	.as(Encoders.bean(Airport.class));

	List<Airport> allAirports = airports.collectAsList();
	System.out.println(allAirports.size());

Writing to Couchbase

If you want to store Documents in Couchbase, use the couchbaseDocumentRDD method:

couchbaseDocumentRDD(
    sc.parallelize(Arrays.asList(JsonDocument.create("doc1", JsonObject.empty())))
).saveToCouchbase();