Hudi
工具类
package com.zhen.hudi.didiimport org.apache.spark.sql.SparkSession/*** @Author FengZhen* @Date 3/1/22 9:34 PM * @Description SparkSql操作数据(加载读取和保存写入)时工具类* 比如获取SparkSession实例对象等*/
object SparkUtils {/*** 构建SparkSession实例对象时,默认情况下本地模式运行* @return*/def createSparkSession(clazz: Class[_], master: String = "local[4]", partitions: Int = 4): SparkSession = {SparkSession.builder().appName(clazz.getSimpleName.stripSuffix("$")).master(master).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.shuffle.partitions", partitions).getOrCreate()}def main(args: Array[String]): Unit = {val spark = createSparkSession(this.getClass)println(spark)Thread.sleep(1000 * 100)spark.stop()}}
CSV文件入hudi
package com.zhen.hudi.didiimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._/*** @Author FengZhen* @Date 3/1/22 9:29 PM * @Description 滴滴海口出行运营数据分析,使用sparkSQL操作数据,先读取CSV,保存至hudi表* 1.构建SparkSession实例对象(集成Hudi和HDFS)* 2.加载本地CSV文件格式弟弟出行数据* 3.滴滴出行数据ETL处理* 4.保存转换后数据至Hudi表* 5.应用结束关闭资源*/
object DidiStorageSpark {val datasPath: String = "file:Users/FengZhen/Desktop/accumulate/0_project/hudi-learning/datas/didi/dwv_order_make_haikou_1.txt"val hudiTableName: String = "tbl_didi_haikou"val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"def main(args: Array[String]): Unit = {//1.构建SparkSession实例对象(集成Hudi和HDFS)val spark: SparkSession = SparkUtils.createSparkSession(this.getClass)//2.加载本地CSV文件格式弟弟出行数据val didiDF = readCsvFile(spark, datasPath)
// didiDF.printSchema()
// didiDF.show(10, false)//3.滴滴出行数据ETL处理val etlDF: DataFrame = process(didiDF)
// etlDF.printSchema()
// etlDF.show(10, false)//4.保存转换后数据至Hudi表saveToHudi(etlDF, hudiTableName, hudiTablePath)//5.应用结束关闭资源spark.stop()}/*** 读取CSV格式文本文件数据,封装到DataFrame中* @param spark* @param path* @return*/def readCsvFile(spark: SparkSession, path: String): DataFrame = {spark.read//设置分隔符为制表符.option("sep", "\\t")//文件首行为列名称.option("header", "true")//依据数值自动推断数据类型.option("inferSchema", "true")//指定文件路径.csv(path)}/*** 对滴滴出行海口数据进行ETL转换操作:指定ts和partitionpath* @param dataFrame* @return*/def process(dataFrame: DataFrame): DataFrame = {dataFrame//添加字段,就是hudi表分区字段,三级分区 -> yyyy-MM-dd.withColumn("partitionpath",concat_ws("-", col("year"), col("month"), col("day")))//删除列.drop("year", "month", "day")//添加timestamp列,作为hudi标记录数据预合并字段,使用发车时间.withColumn("ts",unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss"))}/*** 将数据集DataFrame保存至hudi表中,表的类型为COW,属于批量保存数据,写少读多* @param dataFrame* @param table* @param path*/def saveToHudi(dataFrame: DataFrame, table: String, path: String) = {import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._//保存数据dataFrame.write.mode(SaveMode.Overwrite).format("hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")//hudi表的属性值的设置//主键.option(RECORDKEY_FIELD.key(), "order_id")//预合并.option(PRECOMBINE_FIELD.key(), "ts")//分区.option(PARTITIONPATH_FIELD.key(), "partitionpath")//表名.option(TBL_NAME.key(), table).save(path)}}
业务分析
package com.zhen.hudi.didiimport java.util.{Calendar, Date}import org.apachemons.lang3.time.FastDateFormat
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._/*** @Author FengZhen* @Date 3/1/22 9:31 PM * @Description 滴滴海口出行运营数据分析,使用sparkSQL操作数据,加载hudi表数据,按照业务需求统计* Spark DataSource数据源接口方式* spark.read.format("hudi")..* dataframe.write.format("hudi")** Spark2.x开始,程序入口SparkSession*/
object DidiAnalysisSpark {//hudi表属性,存储数据HDFS路径val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"def main(args: Array[String]): Unit = {//1.构建SparkSession实例对象(集成Hudi和HDFS)val spark:SparkSession = SparkUtils.createSparkSession(this.getClass, partitions = 8)//2.加载hudi表的数据,指定字段val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath)hudiDF.printSchema()hudiDF.show(10, false)//由于数据被使用多次,所以建议缓存hudiDF.persist(StorageLevel.MEMORY_AND_DISK)//3.按照业务指标进行统计分析//指标1:订单类型(product_id)统计//reportProduct(hudiDF)//指标2:订单时效统计//reportType(hudiDF)//指标3:交通类型统计//reportTraffic(hudiDF)//指标4:订单价格统计//reportPrice(hudiDF)//指标5:订单距离统计//reportDistance(hudiDF)//指标6:日期类型 -> 星期,进行统计reportWeek(hudiDF)//当数据不再使用时,释放资源hudiDF.unpersist()//4.应用结束,关闭资源spark.stop()}/*** 加载hudi表数据,封装到dataframe中* @param spark* @param path* @return*/def readFromHudi(spark: SparkSession, path: String): DataFrame = {val didiDF: DataFrame = spark.read.format("hudi").load(path)//选择字段didiDF.select("product_id", "type", "traffic_type", "pre_total_fee", "start_dest_distance", "departure_time")}/*** 订单类型统计,字段:product_id* @param dataFrame*/def reportProduct(dataFrame: DataFrame): Unit = {//1滴滴专车,2滴滴企业专车,3滴滴快车,4滴滴企业快车//a.按照产品线ID分组统计即可val reportDF: DataFrame = dataFrame.groupBy("product_id").count()//b.自定义UDF函数val to_name = udf((productId: Int) => {productId match {case 1 => "滴滴专车"case 2 => "滴滴企业专车"case 3 => "滴滴快车"case 4 => "滴滴企业快车"}})//c.转换名称val resultDF: DataFrame = reportDF.select(to_name(col("product_id")).as("order_type"),col("count").as("total"))resultDF.printSchema()resultDF.show(10, false)}/*** 订单时效性统计,字段:type* @param dataFrame*/def reportType(dataFrame: DataFrame): Unit = {//a.按照时效性id分组统计即可val reportDF: DataFrame = dataFrame.groupBy("type").count()//b.自定义UDF函数val to_name = udf((realtimeType: Int) => {realtimeType match {case 0 => "实时"case 1 => "预约"}})//c.转换名称val resultDF: DataFrame = reportDF.select(to_name(col("type")).as("order_realtime"),col("count").as("total"))resultDF.printSchema()resultDF.show(10, false)}/*** 根据交通类型统计,字段:traffic_type* @param dataFrame*/def reportTraffic(dataFrame: DataFrame): Unit = {//a.按照交通类型分组统计即可val reportDF: DataFrame = dataFrame.groupBy("traffic_type").count()//b.自定义UDF函数val to_name = udf((realtimeType: Int) => {realtimeType match {case 0 => "普通散客"case 1 => "企业时租"case 2 => "企业接机套餐"case 3 => "企业送机套餐"case 4 => "拼车"case 5 => "接机"case 6 => "送机"case 302 => "跨域拼车"case _ => "未知"}})//c.转换名称val resultDF: DataFrame = reportDF.select(to_name(col("traffic_type")).as("traffic_type"),col("count").as("total"))resultDF.printSchema()resultDF.show(10, false)}/*** 订单价格统计,先将订单价格划分阶段,再统计各个阶段的数目,字段:pre_total_fee* @param dataFrame* @return*/def reportPrice(dataFrame: DataFrame): Unit = {val resultDF: DataFrame = dataFrame.agg(//价格0-15sum(when(col("pre_total_fee").between(0, 15), 1).otherwise(0)).as("0-15"),//价格16-30sum(when(col("pre_total_fee").between(16, 30), 1).otherwise(0)).as("16-30"),//价格31-50sum(when(col("pre_total_fee").between(31, 50), 1).otherwise(0)).as("31-50"),//价格51-100sum(when(col("pre_total_fee").between(51, 100), 1).otherwise(0)).as("51-100"),//价格100+sum(when(col("pre_total_fee").gt(100), 1).otherwise(0)).as("100+"),)resultDF.printSchema()resultDF.show(10, false)}/*** 订单距离统计,先将订单距离划分阶段,再统计各个阶段的数目,字段:start_dest_distance* @param dataFrame* @return*/def reportDistance(dataFrame: DataFrame): Unit = {val resultDF: DataFrame = dataFrame.agg(//距离0-10kmsum(when(col("start_dest_distance").between(0, 10000), 1).otherwise(0)).as("0-10km"),//距离10-20kmsum(when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0)).as("10-20km"),//距离20-30sum(when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0)).as("20-30km"),//距离30-50sum(when(col("pre_total_fee").between(30001, 50001), 1).otherwise(0)).as("30-50km"),//距离50+sum(when(col("pre_total_fee").gt(500001), 1).otherwise(0)).as("50km+"),)resultDF.printSchema()resultDF.show(10, false)}/*** 交易完成时间转换为星期,根据星期统计,字段:departure_time* @param dataFrame*/def reportWeek(dataFrame: DataFrame): Unit = {//b.自定义UDF函数val to_week = udf((departureTime: String) => {val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd")val calendar: Calendar = Calendar.getInstance()val date: Date = format.parse(departureTime)calendar.setTime(date)val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match{case 1 => "星期日"case 2 => "星期一"case 3 => "星期二"case 4 => "星期三"case 5 => "星期四"case 6 => "星期五"case 7 => "星期六"}dayWeek})val resultDF: DataFrame = dataFrame.select(to_week(col("departure_time")).as("week")).groupBy(col("week")).count().select(col("week"),col("count").as("total")).orderBy(col("total").desc)resultDF.printSchema()resultDF.show(10, false)}}
Hudi
工具类
package com.zhen.hudi.didiimport org.apache.spark.sql.SparkSession/*** @Author FengZhen* @Date 3/1/22 9:34 PM * @Description SparkSql操作数据(加载读取和保存写入)时工具类* 比如获取SparkSession实例对象等*/
object SparkUtils {/*** 构建SparkSession实例对象时,默认情况下本地模式运行* @return*/def createSparkSession(clazz: Class[_], master: String = "local[4]", partitions: Int = 4): SparkSession = {SparkSession.builder().appName(clazz.getSimpleName.stripSuffix("$")).master(master).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.shuffle.partitions", partitions).getOrCreate()}def main(args: Array[String]): Unit = {val spark = createSparkSession(this.getClass)println(spark)Thread.sleep(1000 * 100)spark.stop()}}
CSV文件入hudi
package com.zhen.hudi.didiimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._/*** @Author FengZhen* @Date 3/1/22 9:29 PM * @Description 滴滴海口出行运营数据分析,使用sparkSQL操作数据,先读取CSV,保存至hudi表* 1.构建SparkSession实例对象(集成Hudi和HDFS)* 2.加载本地CSV文件格式弟弟出行数据* 3.滴滴出行数据ETL处理* 4.保存转换后数据至Hudi表* 5.应用结束关闭资源*/
object DidiStorageSpark {val datasPath: String = "file:Users/FengZhen/Desktop/accumulate/0_project/hudi-learning/datas/didi/dwv_order_make_haikou_1.txt"val hudiTableName: String = "tbl_didi_haikou"val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"def main(args: Array[String]): Unit = {//1.构建SparkSession实例对象(集成Hudi和HDFS)val spark: SparkSession = SparkUtils.createSparkSession(this.getClass)//2.加载本地CSV文件格式弟弟出行数据val didiDF = readCsvFile(spark, datasPath)
// didiDF.printSchema()
// didiDF.show(10, false)//3.滴滴出行数据ETL处理val etlDF: DataFrame = process(didiDF)
// etlDF.printSchema()
// etlDF.show(10, false)//4.保存转换后数据至Hudi表saveToHudi(etlDF, hudiTableName, hudiTablePath)//5.应用结束关闭资源spark.stop()}/*** 读取CSV格式文本文件数据,封装到DataFrame中* @param spark* @param path* @return*/def readCsvFile(spark: SparkSession, path: String): DataFrame = {spark.read//设置分隔符为制表符.option("sep", "\\t")//文件首行为列名称.option("header", "true")//依据数值自动推断数据类型.option("inferSchema", "true")//指定文件路径.csv(path)}/*** 对滴滴出行海口数据进行ETL转换操作:指定ts和partitionpath* @param dataFrame* @return*/def process(dataFrame: DataFrame): DataFrame = {dataFrame//添加字段,就是hudi表分区字段,三级分区 -> yyyy-MM-dd.withColumn("partitionpath",concat_ws("-", col("year"), col("month"), col("day")))//删除列.drop("year", "month", "day")//添加timestamp列,作为hudi标记录数据预合并字段,使用发车时间.withColumn("ts",unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss"))}/*** 将数据集DataFrame保存至hudi表中,表的类型为COW,属于批量保存数据,写少读多* @param dataFrame* @param table* @param path*/def saveToHudi(dataFrame: DataFrame, table: String, path: String) = {import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._//保存数据dataFrame.write.mode(SaveMode.Overwrite).format("hudi").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")//hudi表的属性值的设置//主键.option(RECORDKEY_FIELD.key(), "order_id")//预合并.option(PRECOMBINE_FIELD.key(), "ts")//分区.option(PARTITIONPATH_FIELD.key(), "partitionpath")//表名.option(TBL_NAME.key(), table).save(path)}}
业务分析
package com.zhen.hudi.didiimport java.util.{Calendar, Date}import org.apachemons.lang3.time.FastDateFormat
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._/*** @Author FengZhen* @Date 3/1/22 9:31 PM * @Description 滴滴海口出行运营数据分析,使用sparkSQL操作数据,加载hudi表数据,按照业务需求统计* Spark DataSource数据源接口方式* spark.read.format("hudi")..* dataframe.write.format("hudi")** Spark2.x开始,程序入口SparkSession*/
object DidiAnalysisSpark {//hudi表属性,存储数据HDFS路径val hudiTablePath: String = "/hudi-warehouse/tbl_didi_haikou"def main(args: Array[String]): Unit = {//1.构建SparkSession实例对象(集成Hudi和HDFS)val spark:SparkSession = SparkUtils.createSparkSession(this.getClass, partitions = 8)//2.加载hudi表的数据,指定字段val hudiDF: DataFrame = readFromHudi(spark, hudiTablePath)hudiDF.printSchema()hudiDF.show(10, false)//由于数据被使用多次,所以建议缓存hudiDF.persist(StorageLevel.MEMORY_AND_DISK)//3.按照业务指标进行统计分析//指标1:订单类型(product_id)统计//reportProduct(hudiDF)//指标2:订单时效统计//reportType(hudiDF)//指标3:交通类型统计//reportTraffic(hudiDF)//指标4:订单价格统计//reportPrice(hudiDF)//指标5:订单距离统计//reportDistance(hudiDF)//指标6:日期类型 -> 星期,进行统计reportWeek(hudiDF)//当数据不再使用时,释放资源hudiDF.unpersist()//4.应用结束,关闭资源spark.stop()}/*** 加载hudi表数据,封装到dataframe中* @param spark* @param path* @return*/def readFromHudi(spark: SparkSession, path: String): DataFrame = {val didiDF: DataFrame = spark.read.format("hudi").load(path)//选择字段didiDF.select("product_id", "type", "traffic_type", "pre_total_fee", "start_dest_distance", "departure_time")}/*** 订单类型统计,字段:product_id* @param dataFrame*/def reportProduct(dataFrame: DataFrame): Unit = {//1滴滴专车,2滴滴企业专车,3滴滴快车,4滴滴企业快车//a.按照产品线ID分组统计即可val reportDF: DataFrame = dataFrame.groupBy("product_id").count()//b.自定义UDF函数val to_name = udf((productId: Int) => {productId match {case 1 => "滴滴专车"case 2 => "滴滴企业专车"case 3 => "滴滴快车"case 4 => "滴滴企业快车"}})//c.转换名称val resultDF: DataFrame = reportDF.select(to_name(col("product_id")).as("order_type"),col("count").as("total"))resultDF.printSchema()resultDF.show(10, false)}/*** 订单时效性统计,字段:type* @param dataFrame*/def reportType(dataFrame: DataFrame): Unit = {//a.按照时效性id分组统计即可val reportDF: DataFrame = dataFrame.groupBy("type").count()//b.自定义UDF函数val to_name = udf((realtimeType: Int) => {realtimeType match {case 0 => "实时"case 1 => "预约"}})//c.转换名称val resultDF: DataFrame = reportDF.select(to_name(col("type")).as("order_realtime"),col("count").as("total"))resultDF.printSchema()resultDF.show(10, false)}/*** 根据交通类型统计,字段:traffic_type* @param dataFrame*/def reportTraffic(dataFrame: DataFrame): Unit = {//a.按照交通类型分组统计即可val reportDF: DataFrame = dataFrame.groupBy("traffic_type").count()//b.自定义UDF函数val to_name = udf((realtimeType: Int) => {realtimeType match {case 0 => "普通散客"case 1 => "企业时租"case 2 => "企业接机套餐"case 3 => "企业送机套餐"case 4 => "拼车"case 5 => "接机"case 6 => "送机"case 302 => "跨域拼车"case _ => "未知"}})//c.转换名称val resultDF: DataFrame = reportDF.select(to_name(col("traffic_type")).as("traffic_type"),col("count").as("total"))resultDF.printSchema()resultDF.show(10, false)}/*** 订单价格统计,先将订单价格划分阶段,再统计各个阶段的数目,字段:pre_total_fee* @param dataFrame* @return*/def reportPrice(dataFrame: DataFrame): Unit = {val resultDF: DataFrame = dataFrame.agg(//价格0-15sum(when(col("pre_total_fee").between(0, 15), 1).otherwise(0)).as("0-15"),//价格16-30sum(when(col("pre_total_fee").between(16, 30), 1).otherwise(0)).as("16-30"),//价格31-50sum(when(col("pre_total_fee").between(31, 50), 1).otherwise(0)).as("31-50"),//价格51-100sum(when(col("pre_total_fee").between(51, 100), 1).otherwise(0)).as("51-100"),//价格100+sum(when(col("pre_total_fee").gt(100), 1).otherwise(0)).as("100+"),)resultDF.printSchema()resultDF.show(10, false)}/*** 订单距离统计,先将订单距离划分阶段,再统计各个阶段的数目,字段:start_dest_distance* @param dataFrame* @return*/def reportDistance(dataFrame: DataFrame): Unit = {val resultDF: DataFrame = dataFrame.agg(//距离0-10kmsum(when(col("start_dest_distance").between(0, 10000), 1).otherwise(0)).as("0-10km"),//距离10-20kmsum(when(col("start_dest_distance").between(10001, 20000), 1).otherwise(0)).as("10-20km"),//距离20-30sum(when(col("start_dest_distance").between(20001, 30000), 1).otherwise(0)).as("20-30km"),//距离30-50sum(when(col("pre_total_fee").between(30001, 50001), 1).otherwise(0)).as("30-50km"),//距离50+sum(when(col("pre_total_fee").gt(500001), 1).otherwise(0)).as("50km+"),)resultDF.printSchema()resultDF.show(10, false)}/*** 交易完成时间转换为星期,根据星期统计,字段:departure_time* @param dataFrame*/def reportWeek(dataFrame: DataFrame): Unit = {//b.自定义UDF函数val to_week = udf((departureTime: String) => {val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd")val calendar: Calendar = Calendar.getInstance()val date: Date = format.parse(departureTime)calendar.setTime(date)val dayWeek = calendar.get(Calendar.DAY_OF_WEEK) match{case 1 => "星期日"case 2 => "星期一"case 3 => "星期二"case 4 => "星期三"case 5 => "星期四"case 6 => "星期五"case 7 => "星期六"}dayWeek})val resultDF: DataFrame = dataFrame.select(to_week(col("departure_time")).as("week")).groupBy(col("week")).count().select(col("week"),col("count").as("total")).orderBy(col("total").desc)resultDF.printSchema()resultDF.show(10, false)}}