Apache Spark-HBase Connector

Spark needs a connector library to access HBase.
Two connectors are available, which one should you use?

Configuration and setup

Client-side (Spark) configuration:

  • You need the HBase client configuration file hbase-site.xml

  • This points to the HBase you want to connect to

  • Copy hbase-site.xml to SPARK_CONF_DIR (default is $SPARK_HOME/conf`)

Server-side (HBase region servers) configuration:

  • When using the Apache Hbase-Spark connector there is also a server-side configuration

  • This requires additional configuration on the HBase server side, in particular one needs to have a few jars in the HBase region servers CLASSPATH (for example copy it to /usr/hdp/hbase-2.3/lib:

    • scala-library

    • hbase-spark

    • hbase-spark-protocol-shaded.

  • Build the connector from GitHub as explained below (see Spark 3.x section). In this example we use pre-built jars JAR1 and JAR2.

      # Download connector jars to HBase region servers $HBASE_HOME/lib
    
      # From CERN network
      # wget http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      # wget http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark-protocol-shaded/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      # stopgap copy
      wget http://canali.web.cern.ch/res/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      wget http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      # Scala library, match the Scala version used for building
      wget https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar
    

Spark 2.x

Spark 2.4 and the Spark-HBase Hortonworks connector

# Example of how to use the connector with Spark (note: need hbase-site.xml in SPARK_CONF_DIR) 
bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
--repositories http://repo.hortonworks.com/content/groups/public/ \
--packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.2-1
// Example of how to use the Hortonworks connector to read into a DataFrame
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

val catalog = s"""{
  |"table":{"namespace":"default", "name":"testspark"},
  |"rowkey":"key",
  |"columns":{
  |"id":{"col":"key", "type":"int"},
  |"name":{"cf":"cf", "col":"name", "type":"string"}
  |}
  |}""".stripMargin

val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").option("hbase.spark.use.hbasecontext", false).load()

df.show()

Spark 2.x and the Apache Spark HBase connector

  • The connector for Spark 2.x is available in Maven Central

  • You need the configuration steps, see above "Configuration and setup"

    • Note that you need to configure the server-side too
# Example of how to use the connector with Spark 
# Note: you need hbase-site.xml in SPARK_CONF_DIR
# Note: you need to set up the server-side component too for filter pushdown
bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
--repositories https://repository.cloudera.com/artifactory/libs-release-local \
--packages org.apache.hbase.connectors.spark:hbase-spark-protocol-shaded:1.0.0.7.2.2.2-1,org.apache.hbase.connectors.spark:hbase-spark:1.0.0.7.2.2.2-1,org.apache.hbase:hbase-shaded-mapreduce:2.2.4
// Example of how to use the Apache Hbase-Spark connector to read into a DataFrame
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog

val catalog = s"""{
  |"table":{"namespace":"default", "name":"testspark"},
  |"rowkey":"key",
  |"columns":{
  |"id":{"col":"key", "type":"int"},
  |"name":{"cf":"cf", "col":"name", "type":"string"}
  |}
  |}""".stripMargin

val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).load()
// alternative, more compact, syntax for HBase catalog
// val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).load()

df.show()

Spark 3.x

Use the Apache Spark HBase connector with Spark 3.x

  • Build the connector from the Github repo Apache HBase Connectors

  • Build as in this example (customize HBase, Spark and Hadoop versions, as needed):

      mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 -Dhadoop-three.version=3.2.0 -DskipTests clean package
    
  • Deploy using Spark 3.x, as in this example:

      # Customize the JARs path to your filesystem location
      # For convenience I have also uploaded the jars on a web server
    
      # From CERN network
      #JAR1=http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      #JAR2=http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark-protocol-shaded/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      # stopgap copy
      JAR1 = http://canali.web.cern.ch/res/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
      JAR2 = http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar
    
      bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
      --jars $JAR1,$JAR2 --packages org.apache.hbase:hbase-shaded-mapreduce:2.4.9
    
  • Option for CERN users:

SQL Filter pushdown and server-side library configuration

  • This allows to push down filter predicates to HBase

    • It is configured with .option("hbase.spark.pushdown.columnfilter", true) which is the default.

    • This requires additional configuration on the HBase server side, in particular one needs to have a few jars in the HBase region servers CLASSPATH: scala-library, hbase-spark and hbase-spark-protocol-shaded.

    • See "Configuration and setup" section for details

    • If filter pushdown jars are not configured you will get an error: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder$

      • See: HBASE-22769

      • Example of how to use SQL filter pushdown

```plaintext
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).option("hbase.spark.pushdown.columnfilter", true).load()
df.filter("id<10").show()
```

Apache HBase-Spark connector tunables

  • There are several tunable in the Apache Hbase-Spark connector, for example:

    • hbase.spark.query.batchsize - Set the maximum number of values to return for each call to next() in scan.

    • hbase.spark.query.cachedrows - The number of rows for caching that will be passed to scan.

    • Details of the available configuration at this link


Spark-HBase connector: write and read example

  • On HBase, create the test table and grant the related privileges to your user (use hbase shell):

      create 'testspark', 'cf'
      grant '<your_username_here>', 'XRW', 'testspark'
    
    • Note this may be too needed: grant '<your_username_here>', 'X', 'hbase:meta'
  • On Spark:

    • Start Spark 2.x or 3.x as detailed above

    • Write:

```plaintext
val df = sql("select id, 'myline_'||id  name from range(10)")
df.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).save()
```
  • Read the HBase table from Spark

      val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).load()
      df.show()
    
  • Read back from Spark using a filter, without server-side installation for SQL filter pushdown HBASE-22769

      val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).option("hbase.spark.pushdown.columnfilter", false).load()
      df.show()
    

Another option for writing and reading by explicitly specifying the catalog in JSON format

  • see also HbaseTableCatalog

  • On HBase, grant create table privilege to your user:

      grant '<your_username_here>', 'C'
    
  • On Spark:

    • Start Spark 2.x or 3.x as detailed above

    • Write:

```plaintext
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog

val catalog = s"""{
  |"table":{"namespace":"default", "name":"testspark"},
  |"rowkey":"key",
  |"columns":{
  |"id":{"col":"key", "type":"int"},
  |"name":{"cf":"cf", "col":"name", "type":"string"}
  |}
  |}""".stripMargin

val df = sql("select id, 'myline_'||id  name from range(10)")

// HBaseTableCatalog.newTable -> If defined and larger than 3, a new table will be created with the number of region specified.
df.write.options(Map(HBaseTableCatalog.tableCatalog->catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).save()
```
  • Read HBase from Spark

      import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
    
      val catalog = s"""{
        |"table":{"namespace":"default", "name":"testspark"},
        |"rowkey":"key",
        |"columns":{
        |"id":{"col":"key", "type":"int"},
        |"name":{"cf":"cf", "col":"name", "type":"string"}
        |}
        |}""".stripMargin
    
      val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).load()
      df.show()
    

PySpark

  • Note: this is almost the same as with Scala examples above, with the important change when running Spark of substituting spark-shell with pyspark. See also above for "Configuration and setup" how to configure Hbase client and server.

  • Write to HBase with Spark

      df = spark.sql("select id, 'myline_'||id  name from range(10)")
      df.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).save()
    
  • Read from HBase with Spark

      df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).load()
      df.show()
      df.filter("id<10").show() # see above note on SQL filter pushdown server-side configuration
    
  • Another option for reading: explicitly specify the catalog in JSON format:

      import json
    
      catalog = json.dumps(
        {
            "table":{"namespace":"default", "name":"testspark"},
            "rowkey":"key",
            "columns":{
                "id":{"col":"key", "type":"int"},
                "name":{"cf":"cf", "col":"name", "type":"string"}
            }
        })
    
      df = spark.read.options(catalog=catalog).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", False).load()
    
      df.show()