Spark Structured Streaming HWC integration

1. Setup Kafka topic

cd /usr/hdp/current/kafka-broker/bin/

./kafka-topics.sh --create --zookeeper c420-node2.coelab.cloudera.com:2181 --replication-factor 2 --partitions 3 --topic ss_input
./kafka-topics.sh --list --zookeeper c420-node2.coelab.cloudera.com:2181

2. Create a Hive Transational table

beeline> 
CREATE DATABASE streamdb;
USE streamdb;

CREATE TABLE `hivestreamTable`(
  `word` string)  
STORED AS ORC TBLPROPERTIES ('transactional' = 'true');

3. Let's try two options to start Spark structured streaming

3.1. Option 1: using scala code => Start spark-shell with Spark-sql-kafka jar support + HWC

spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2   --conf spark.datasource.hive.warehouse.load.staging.dir=/tmp --conf spark.datasource.hive.warehouse.metastoreUri=thrift://c420-node3.coelab.cloudera.com:9083 --conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0 --conf spark.jars=/usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.4.0-315.jar --conf spark.submit.pyFiles=/usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.1.4.0-315.zip --conf spark.security.credentials.hiveserver2.enabled=false --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://c420-node2.coelab.cloudera.com:2181,c420-node3.coelab.cloudera.com:2181,c420-node4.coelab.cloudera.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive" --conf spark.sql.hive.zookeeper.quorum="c420-node2.coelab.cloudera.com:2181,c420-node3.coelab.cloudera.com:2181,c420-node4.coelab.cloudera.com:2181"

Paste this scala code

import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()

spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c420-node2.coelab.cloudera.com:6667").option("subscribe", "ss_input").load().selectExpr("CAST(value AS STRING)").as[String].writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "streamdb").option("table", "hivestreamtable").option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation", "/tmp/checkpoint").start()

3.2. Option 2 : using python code => Start pyspark with Spark-sql-kafka jar support + HWC

pyspark --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2   --conf spark.datasource.hive.warehouse.load.staging.dir=/tmp --conf spark.datasource.hive.warehouse.metastoreUri=thrift://c420-node3.coelab.cloudera.com:9083 --conf spark.hadoop.hive.llap.daemon.service.hosts=@llap0 --conf spark.jars=/usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.4.0-315.jar --conf spark.submit.pyFiles=/usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.1.4.0-315.zip --conf spark.security.credentials.hiveserver2.enabled=false --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://c420-node2.coelab.cloudera.com:2181,c420-node3.coelab.cloudera.com:2181,c420-node4.coelab.cloudera.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive" --conf spark.sql.hive.zookeeper.quorum="c420-node2.coelab.cloudera.com:2181,c420-node3.coelab.cloudera.com:2181,c420-node4.coelab.cloudera.com:2181"

Paste this python code

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c420-node2.coelab.cloudera.com:6667").option("subscribe", "ss_input").load().selectExpr("CAST(value AS STRING)").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "streamdb").option("table", "hivestreamtable").option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation", "/tmp/checkpoint").start()

Note: Ensure to replace value of kafka.bootstrap.servers

4. produce some data to kafka topic ss_input

./kafka-console-producer.sh --broker-list c420-node2.coelab.cloudera.com:6667 --topic ss_input
>hello team
>hello world

5. Run a test query on hive table

beeline>select * from streamdb.hivestreamtable;
+-----------------------+
| hivestreamtable.word  |
+-----------------------+
| hello team            |
| hello world           |
+-----------------------+