Spark netcat Streaming to Cassandra

Create a minimal Scala Seed project

mkdir -p /home/hadoop/AdithyaJ/Spark/Streaming/
cd /home/hadoop/AdithyaJ/Spark/Streaming/

sbt new scala/scala-seed.g8
...
A minimal Scala project.

name [Scala Seed Project]: cassandra-streaming

Template applied in /home/hadoop/AdithyaJ/Spark/Streaming/./cassandra-streaming
cd cassandra-streaming/

Add Spark and Cassandra connector dependencies

vim build.sbt
import Dependencies._

lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization := "com.example",
      scalaVersion := "2.11.12",
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "Windowing Netcat",
    libraryDependencies += scalaTest % Test,
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0",
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.7"
  )

Spark Streaming to Cassandra

vim src/main/scala/example/CassandraStreaming.scala
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

import com.datastax.spark.connector.streaming._

object CassandraStreaming {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: CassandraStreaming <hostname> <port>")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("CassandraStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val sc = ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.saveToCassandra("streaming_test", "words")
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Run the App

sbt package
spark-submit --conf spark.cassandra.connection.host=127.0.0.1 --packages datastax:spark-cassandra-connector:2.0.7-s_2.11 --class "CassandraStreaming" --master local[4] target/scala-2.11/windowing-netcat_2.11-0.1.0-SNAPSHOT.jar localhost 9999
nc -lk 9999
god
is
great

Verify in cqlsh

cqlsh

SELECT * FROM streaming_test.words;

 key   | value
-------+-------
 great |     1
   god |     1
    is |     1

(3 rows)

results matching ""

    No results matching ""