Spark Kafka Streaming to Cassandra
Download Spark Kafka Streaming jar
cd /home/hadoop/Spark/spark-2.2.1-bin-hadoop2.7
wget https://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.3.0/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
mv remotecontent\?filepath\=org%2Fapache%2Fspark%2Fspark-streaming-kafka-0-8-assembly_2.11%2F2.3.0%2Fspark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
Download Kafka
mkdir -p /home/hadoop/Kafka/
cd /home/hadoop/Kafka/
wget http://www.eu.apache.org/dist/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
echo 'export PATH="/home/hadoop/Kafka/kafka_2.11-1.0.1/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
Start Kafka Server
zookeeper-server-start.sh /home/hadoop/Kafka/kafka_2.11-1.0.1/config/zookeeper.properties
kafka-server-start.sh /home/hadoop/Kafka/kafka_2.11-1.0.1/config/server.properties
Create and test test topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh --list --zookeeper localhost:2181
kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Create a minimal Scala Seed project
mkdir -p /home/hadoop/Spark/Streaming/
cd /home/hadoop/Spark/Streaming/
sbt new scala/scala-seed.g8
[info] Set current project to streaming (in build file:/home/hadoop/Spark/Streaming/)
[info] Set current project to streaming (in build file:/home/hadoop/Spark/Streaming/)
A minimal Scala project.
name [Scala Seed Project]: kafka_cassandra
Template applied in /home/hadoop/Spark/Streaming/./kafka_cassandra
cd kafka_cassandra/
Add Spark and Cassandra connector dependencies
vim build.sbt
import Dependencies._
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.12",
version := "0.1.0-SNAPSHOT"
)),
name := "Windowing Netcat",
libraryDependencies += scalaTest % Test,
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0",
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0",
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.7",
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
)
Kafka Spark Streaming to Cassandra
vim src/main/scala/example/KafkaCassandraWordCount.scala
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.datastax.spark.connector.streaming._
object KafkaCassandraWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics> <hostname> <port>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("KafkaCassandraWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val sc = ssc.sparkContext.setLogLevel("ERROR")
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.saveToCassandra("streaming_test", "words")
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
Run the App
sbt package
spark-submit --conf spark.cassandra.connection.host=127.0.0.1 --packages datastax:spark-cassandra-connector:2.0.7-s_2.11 --jars /home/hadoop/Spark/spark-2.2.1-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar,/home/hadoop/Spark/spark-2.2.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.1.jar --master local[4] --class KafkaCassandraWordCount target/scala-2.11/windowing-netcat_2.11-0.1.0-SNAPSHOT.jar localhost:9092 test
kafka-console-producer.sh --broker-list localhost:9092 --topic test
>god is great god is great
Verify in cqlsh
cqlsh
SELECT * FROM streaming_test.words;
key | value
--------+-------
great | 2
god | 2
is | 2
(3 rows)
Produce a big text file
cd /home/hadoop/Kafka
wget https://norvig.com/big.txt --no-check-certificate
kafka-console-producer.sh --broker-list localhost:9092 --topic test < big.txt
Verify in cqlsh
cqlsh
SELECT * FROM streaming_test.words WHERE value > 500 LIMIT 10 ALLOW FILTERING;
key | value
----------+-------
time | 663
being | 523
seemed | 572
a | 10939
up | 1348
could | 1092
He | 1667
long | 525
if | 1063
Princess | 543
(10 rows)