Spark needs a connector library to access HBase.
Two connectors are available, which one should you use?
-
- This is part of the Apache HBase-Spark
Hortonworks Spark-HBase connector
- The Hortonworks connector has been quite popular over the years, with Spark 2.x. However, it appears to be no more supported nor updated?
Configuration and setup
Client-side (Spark) configuration:
You need the HBase client configuration file
hbase-site.xml
This points to the HBase you want to connect to
Copy
hbase-site.xml
toSPARK_CONF_DIR
(default is $SPARK_HOME/conf`)
Server-side (HBase region servers) configuration:
When using the Apache Hbase-Spark connector there is also a server-side configuration
This requires additional configuration on the HBase server side, in particular one needs to have a few jars in the HBase region servers CLASSPATH (for example copy it to /usr/hdp/hbase-2.3/lib:
scala-library
hbase-spark
hbase-spark-protocol-shaded.
Build the connector from GitHub as explained below (see Spark 3.x section). In this example we use pre-built jars JAR1 and JAR2.
# Download connector jars to HBase region servers $HBASE_HOME/lib # From CERN network # wget http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar # wget http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark-protocol-shaded/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar # stopgap copy wget http://canali.web.cern.ch/res/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar wget http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar # Scala library, match the Scala version used for building wget https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar
Spark 2.x
Spark 2.4 and the Spark-HBase Hortonworks connector
The connector for Spark 2.4 is available in Maven Central
You need also the configuration steps, see above "Configuration and setup"
See additional details:
# Example of how to use the connector with Spark (note: need hbase-site.xml in SPARK_CONF_DIR)
bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
--repositories http://repo.hortonworks.com/content/groups/public/ \
--packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.2-1
// Example of how to use the Hortonworks connector to read into a DataFrame
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
val catalog = s"""{
|"table":{"namespace":"default", "name":"testspark"},
|"rowkey":"key",
|"columns":{
|"id":{"col":"key", "type":"int"},
|"name":{"cf":"cf", "col":"name", "type":"string"}
|}
|}""".stripMargin
val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").option("hbase.spark.use.hbasecontext", false).load()
df.show()
Spark 2.x and the Apache Spark HBase connector
The connector for Spark 2.x is available in Maven Central
You need the configuration steps, see above "Configuration and setup"
- Note that you need to configure the server-side too
# Example of how to use the connector with Spark
# Note: you need hbase-site.xml in SPARK_CONF_DIR
# Note: you need to set up the server-side component too for filter pushdown
bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \
--repositories https://repository.cloudera.com/artifactory/libs-release-local \
--packages org.apache.hbase.connectors.spark:hbase-spark-protocol-shaded:1.0.0.7.2.2.2-1,org.apache.hbase.connectors.spark:hbase-spark:1.0.0.7.2.2.2-1,org.apache.hbase:hbase-shaded-mapreduce:2.2.4
// Example of how to use the Apache Hbase-Spark connector to read into a DataFrame
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
val catalog = s"""{
|"table":{"namespace":"default", "name":"testspark"},
|"rowkey":"key",
|"columns":{
|"id":{"col":"key", "type":"int"},
|"name":{"cf":"cf", "col":"name", "type":"string"}
|}
|}""".stripMargin
val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).load()
// alternative, more compact, syntax for HBase catalog
// val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).load()
df.show()
Spark 3.x
Use the Apache Spark HBase connector with Spark 3.x
Build the connector from the Github repo Apache HBase Connectors
As of December 2021, the release of hbase-connectors in maven central is only available for Scala 2.11 and cannot be used with Spark 3
The connector has to be compiled from source for Spark 3.x, see also HBASE-25326 Allow hbase-connector to be used with Apache Spark 3.0
Build as in this example (customize HBase, Spark and Hadoop versions, as needed):
mvn -Dspark.version=3.1.2 -Dscala.version=2.12.10 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 -Dhadoop-three.version=3.2.0 -DskipTests clean package
Deploy using Spark 3.x, as in this example:
# Customize the JARs path to your filesystem location # For convenience I have also uploaded the jars on a web server # From CERN network #JAR1=http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar #JAR2=http://artifactory.cern.ch/beco-thirdparty-local/org/apache/hbase/connectors/spark/hbase-spark-protocol-shaded/1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar # stopgap copy JAR1 = http://canali.web.cern.ch/res/hbase-spark-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar JAR2 = http://canali.web.cern.ch/res/hbase-spark-protocol-shaded-1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1.jar bin/spark-shell --master yarn --num-executors 1 --executor-cores 2 \ --jars $JAR1,$JAR2 --packages org.apache.hbase:hbase-shaded-mapreduce:2.4.9
Option for CERN users:
deploy from artifactory.cern.ch (only visible from the CERN network):
bin/spark-shell --master yarn --num-executors 1 --executor-memory 8g --repositories
https://artifactory.cern.ch/beco-thirdparty-local
--packages org.apache.hbase.connectors.spark:hbase-spark:1.0.1_spark-3.2.0-hbase-2.4.9-cern1_1
SQL Filter pushdown and server-side library configuration
This allows to push down filter predicates to HBase
It is configured with
.option("hbase.spark.pushdown.columnfilter", true)
which is the default.This requires additional configuration on the HBase server side, in particular one needs to have a few jars in the HBase region servers CLASSPATH: scala-library, hbase-spark and hbase-spark-protocol-shaded.
See "Configuration and setup" section for details
If filter pushdown jars are not configured you will get an error:
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder$
See: HBASE-22769
Example of how to use SQL filter pushdown
```plaintext
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).option("hbase.spark.pushdown.columnfilter", true).load()
df.filter("id<10").show()
```
Apache HBase-Spark connector tunables
There are several tunable in the Apache Hbase-Spark connector, for example:
hbase.spark.query.batchsize
- Set the maximum number of values to return for each call to next() in scan.hbase.spark.query.cachedrows
- The number of rows for caching that will be passed to scan.Details of the available configuration at this link
Spark-HBase connector: write and read example
On HBase, create the test table and grant the related privileges to your user (use
hbase shell
):create 'testspark', 'cf' grant '<your_username_here>', 'XRW', 'testspark'
- Note this may be too needed:
grant '<your_username_here>', 'X', 'hbase:meta'
- Note this may be too needed:
On Spark:
Start Spark 2.x or 3.x as detailed above
Write:
```plaintext
val df = sql("select id, 'myline_'||id name from range(10)")
df.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).save()
```
Read the HBase table from Spark
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).load() df.show()
Read back from Spark using a filter, without server-side installation for SQL filter pushdown HBASE-22769
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", false).option("hbase.spark.pushdown.columnfilter", false).load() df.show()
Another option for writing and reading by explicitly specifying the catalog in JSON format
see also HbaseTableCatalog
On HBase, grant create table privilege to your user:
grant '<your_username_here>', 'C'
On Spark:
Start Spark 2.x or 3.x as detailed above
Write:
```plaintext
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
val catalog = s"""{
|"table":{"namespace":"default", "name":"testspark"},
|"rowkey":"key",
|"columns":{
|"id":{"col":"key", "type":"int"},
|"name":{"cf":"cf", "col":"name", "type":"string"}
|}
|}""".stripMargin
val df = sql("select id, 'myline_'||id name from range(10)")
// HBaseTableCatalog.newTable -> If defined and larger than 3, a new table will be created with the number of region specified.
df.write.options(Map(HBaseTableCatalog.tableCatalog->catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).save()
```
Read HBase from Spark
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog val catalog = s"""{ |"table":{"namespace":"default", "name":"testspark"}, |"rowkey":"key", |"columns":{ |"id":{"col":"key", "type":"int"}, |"name":{"cf":"cf", "col":"name", "type":"string"} |} |}""".stripMargin val df = spark.read.options(Map(HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", false).load() df.show()
PySpark
Note: this is almost the same as with Scala examples above, with the important change when running Spark of substituting spark-shell with pyspark. See also above for "Configuration and setup" how to configure Hbase client and server.
Write to HBase with Spark
df = spark.sql("select id, 'myline_'||id name from range(10)") df.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.namespace", "default").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).save()
Read from HBase with Spark
df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping","id INT :key, name STRING cf:name").option("hbase.table", "testspark").option("hbase.spark.use.hbasecontext", False).load() df.show() df.filter("id<10").show() # see above note on SQL filter pushdown server-side configuration
Another option for reading: explicitly specify the catalog in JSON format:
import json catalog = json.dumps( { "table":{"namespace":"default", "name":"testspark"}, "rowkey":"key", "columns":{ "id":{"col":"key", "type":"int"}, "name":{"cf":"cf", "col":"name", "type":"string"} } }) df = spark.read.options(catalog=catalog).format("org.apache.hadoop.hbase.spark").option("hbase.spark.use.hbasecontext", False).load() df.show()