最新消息: USBMI致力于为网友们分享Windows、安卓、IOS等主流手机系统相关的资讯以及评测、同时提供相关教程、应用、软件下载等服务。

解决sparkstreaming读取kafka中的json数据,消费后保存到MySQL中,报_corrupt_record和name错误的!!

业界 admin 6浏览 0评论

所用软件版本:

spark2.3.0

IDEA2019.1

kafka_2.11-01.0.2.2

spark-streaming-kafka-0-10_2.11-2.3.0

先贴出代码:

package com.bd.spark

import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream



object kafkaSparkStreaming_10version {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]")
    val ssc =

所用软件版本:

spark2.3.0

IDEA2019.1

kafka_2.11-01.0.2.2

spark-streaming-kafka-0-10_2.11-2.3.0

先贴出代码:

package com.bd.spark

import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafkamon.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.InputDStream



object kafkaSparkStreaming_10version {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("kafka-spark-demo").setMaster("local[4]")
    val ssc =

与本文相关的文章

发布评论

评论列表 (0)

  1. 暂无评论