所用软件版本:
spark2.3.0
IDEA2019.1
kafka_2.11-01.0.2.2
spark-streaming-kafka-0-10_2.11-2.3.0
先贴出代码:
package com.bd.spark
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream
object kafkaSparkStreaming_10version {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]")
val ssc =
所用软件版本:
spark2.3.0
IDEA2019.1
kafka_2.11-01.0.2.2
spark-streaming-kafka-0-10_2.11-2.3.0
先贴出代码:
package com.bd.spark
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream
object kafkaSparkStreaming_10version {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]")
val ssc =