Spark netcat Streaming to Cassandra
Create a minimal Scala Seed project
mkdir -p /home/hadoop/AdithyaJ/Spark/Streaming/
cd /home/hadoop/AdithyaJ/Spark/Streaming/
sbt new scala/scala-seed.g8
...
A minimal Scala project.
name [Scala Seed Project]: cassandra-streaming
Template applied in /home/hadoop/AdithyaJ/Spark/Streaming/./cassandra-streaming
cd cassandra-streaming/
Add Spark and Cassandra connector dependencies
vim build.sbt
import Dependencies._
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.12",
version := "0.1.0-SNAPSHOT"
)),
name := "Windowing Netcat",
libraryDependencies += scalaTest % Test,
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0",
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0",
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.7"
)
Spark Streaming to Cassandra
vim src/main/scala/example/CassandraStreaming.scala
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.datastax.spark.connector.streaming._
object CassandraStreaming {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: CassandraStreaming <hostname> <port>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("CassandraStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext.setLogLevel("ERROR")
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.saveToCassandra("streaming_test", "words")
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Run the App
sbt package
spark-submit --conf spark.cassandra.connection.host=127.0.0.1 --packages datastax:spark-cassandra-connector:2.0.7-s_2.11 --class "CassandraStreaming" --master local[4] target/scala-2.11/windowing-netcat_2.11-0.1.0-SNAPSHOT.jar localhost 9999
nc -lk 9999
god
is
great
Verify in cqlsh
cqlsh
SELECT * FROM streaming_test.words;
key | value
-------+-------
great | 1
god | 1
is | 1
(3 rows)