Spark Kafka Streaming to Cassandra

Download Spark Kafka Streaming jar

cd /home/hadoop/Spark/spark-2.2.1-bin-hadoop2.7

wget https://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.3.0/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
mv remotecontent\?filepath\=org%2Fapache%2Fspark%2Fspark-streaming-kafka-0-8-assembly_2.11%2F2.3.0%2Fspark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar

Download Kafka

mkdir -p /home/hadoop/Kafka/
cd /home/hadoop/Kafka/

wget http://www.eu.apache.org/dist/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz

echo 'export PATH="/home/hadoop/Kafka/kafka_2.11-1.0.1/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

Start Kafka Server

zookeeper-server-start.sh /home/hadoop/Kafka/kafka_2.11-1.0.1/config/zookeeper.properties

kafka-server-start.sh /home/hadoop/Kafka/kafka_2.11-1.0.1/config/server.properties

Create and test test topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh --list --zookeeper localhost:2181

kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Create a minimal Scala Seed project

mkdir -p /home/hadoop/Spark/Streaming/
cd /home/hadoop/Spark/Streaming/

sbt new scala/scala-seed.g8
[info] Set current project to streaming (in build file:/home/hadoop/Spark/Streaming/)
[info] Set current project to streaming (in build file:/home/hadoop/Spark/Streaming/)

A minimal Scala project.

name [Scala Seed Project]: kafka_cassandra

Template applied in /home/hadoop/Spark/Streaming/./kafka_cassandra
cd kafka_cassandra/

Add Spark and Cassandra connector dependencies

vim build.sbt
import Dependencies._

lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization := "com.example",
      scalaVersion := "2.11.12",
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "Windowing Netcat",
    libraryDependencies += scalaTest % Test,
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0",
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.7",
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
  )

Kafka Spark Streaming to Cassandra

vim src/main/scala/example/KafkaCassandraWordCount.scala
import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

import com.datastax.spark.connector.streaming._

object KafkaCassandraWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>  <hostname> <port>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("KafkaCassandraWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val sc = ssc.sparkContext.setLogLevel("ERROR")

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.saveToCassandra("streaming_test", "words")
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Run the App

sbt package
spark-submit --conf spark.cassandra.connection.host=127.0.0.1 --packages datastax:spark-cassandra-connector:2.0.7-s_2.11 --jars /home/hadoop/Spark/spark-2.2.1-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar,/home/hadoop/Spark/spark-2.2.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.1.jar --master local[4] --class KafkaCassandraWordCount target/scala-2.11/windowing-netcat_2.11-0.1.0-SNAPSHOT.jar localhost:9092 test
kafka-console-producer.sh --broker-list localhost:9092 --topic test
>god is great god is great

Verify in cqlsh

cqlsh

SELECT * FROM streaming_test.words;

 key    | value
--------+-------
  great |     2
    god |     2
     is |     2

(3 rows)

Produce a big text file

cd /home/hadoop/Kafka
wget https://norvig.com/big.txt --no-check-certificate

kafka-console-producer.sh --broker-list localhost:9092 --topic test < big.txt

Verify in cqlsh

cqlsh

SELECT * FROM streaming_test.words WHERE value > 500 LIMIT 10 ALLOW FILTERING;

 key      | value
----------+-------
     time |   663
    being |   523
   seemed |   572
        a | 10939
       up |  1348
    could |  1092
       He |  1667
     long |   525
       if |  1063
 Princess |   543

(10 rows)

results matching ""

    No results matching ""