一、公司代码,年度,1月-12月的收入金额 数据: burk,year,tsl01,tsl02,tsl03,tsl04,tsl05,tsl06,tsl07,tsl08,tsl09,tsl10,tsl11,tsl12 853101,2010,100200,25002,19440,20550,14990,17227,40990,28778,19088,29889,10990,20990 853101,2011,19446,20556,14996,17233,40996,28784,19094,28779,19089,29890,10991,20991 853101,2012,19447,20557,14997,17234,20560,15000,17237,28780,19090,29891,10992,20992 853101,2013,20560,15000,17237,41000,17234,20560,15000,17237,41000,29892,10993,20993 853101,2014,19449,20559,14999,17236,41000,28788,28786,19096,29897,41000,28788,20994 853101,2015,100205,25007,19445,20555,17236,40999,28787,19097,29898,29894,10995,20995 853101,2016,100206,25008,19446,20556,17237,41000,28788,19098,29899,29895,10996,20996 853101,2017,100207,25009,17234,20560,15000,17237,41000,15000,17237,41000,28788,20997 853101,2018,100208,25010,41000,28788,28786,19096,29897,28786,19096,29897,10998,20998 853101,2019,100209,25011,17236,40999,28787,19097,29898,28787,19097,29898,10999,20999 846271,2010,100210,25012,17237,41000,28788,19098,29899,28788,19098,29899,11000,21000 846271,2011,100211,25013,19451,20561,15001,17238,41001,28789,19099,29900,11001,21001 846271,2012,100212,100213,20190,6484,46495,86506,126518,166529,206540,246551,286562,326573 846271,2013,100213,100214,21297,5008,44466,83924,123382,162839,202297,241755,281213,320671 846271,2014,100214,100215,22405,3531,42436,81341,120245,159150,198055,236959,275864,314769 846271,2015,100215,100216,23512,2055,19096,29897,28786,19096,29897,41000,29892,308866 846271,2016,100216,100217,24620,579,38377,76175,28788,28786,19096,29897,41000,302964 846271,2017,100217,100218,25727,898,36347,73592,40999,28787,19097,29898,29894,297062 846271,2018,100218,100219,26835,2374,34318,71009,41000,28788,19098,29899,29895,291159 846271,2019,100219,100220,27942,3850,32288,68427,17237,41000,15000,17237,41000,285257
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Cade01 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("burk")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val burkDF: DataFrame = spark
.read
.format("csv")
.option("sep", ",")
.schema("burk INT,year STRING,tsl01 INT,tsl02 INT,tsl03 INT,tsl04 INT,tsl05 INT,tsl06 INT,tsl07 INT,tsl08 INT,tsl09 INT,tsl10 INT,tsl11 INT,tsl12 INT")
.load("data/1126/company_data.txt")
burkDF.show()
// 1、统计每个公司每年按月累计收入 行转列 --> sum窗口函数
//输出结果 : 公司代码,年度,月份,当月收入,累计收入
val columns: Column = map(
expr("1"), $"tsl01",
expr("2"), $"tsl02",
expr("3"), $"tsl03",
expr("4"), $"tsl04",
expr("5"), $"tsl05",
expr("6"), $"tsl06",
expr("7"), $"tsl07",
expr("8"), $"tsl08",
expr("9"), $"tsl09",
expr("10"), $"tsl10",
expr("11"), $"tsl11",
expr("12"), $"tsl12"
)
burkDF
.select($"burk",$"year",explode(columns) as Array("month","month_income"))
.withColumn("count_income",sum($"month_income") over Window.partitionBy($"burk",$"year").orderBy($"month"))
.show(1000)
// 2、统计每个公司当月比上年同期增长率 行转列 --> lag窗口函数
//公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
burkDF
.select($"burk",$"year",explode(columns) as Array("month","month_income"))
.withColumn("last_year_monthIncome",lag($"month_income",1) over Window.partitionBy($"burk",$"month").orderBy("year"))
.withColumn("growth_rate",round($"month_income" / $"last_year_monthIncome" - 1,6))
.select($"burk",$"year",$"month",$"growth_rate")
.show()
SPARK SQL:
//注册临时表
burkDF.createTempView("burk_data")
// 1、统计每个公司每年按月累计收入 行转列 --> sum窗口函数
//输出结果 : 公司代码,年度,月份,当月收入,累计收入
spark.sql(
"""
|SELECT T1.burk,T1.year,T1.month,T1.month_income
|,sum(T1.month_income) over (partition by T1.burk,T1.year order by T1.month) as count_income
|FROM
|(
|SELECT burk,year,month,month_income
|FROM burk_data lateral view explode (map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) v1 as month,month_income
|) T1
|""".stripMargin)
.show()
// 2、统计每个公司当月比上年同期增长率 行转列 --> lag窗口函数
//公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
spark.sql(
"""
|SELECT TT1.burk,TT1.year,TT1.month,ROUND(TT1.month_income / TT1.last_year_monthIncome - 1, 5) as growth_rate
|FROM(
| SELECT T1.burk,T1.year,T1.month,T1.month_income
| ,LAG(T1.month_income) over(PARTITION BY T1.burk,T1.month ORDER BY year) AS last_year_monthIncome
| FROM
| (
| SELECT burk,year,month,month_income
| FROM burk_data lateral view explode (map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) v1 as month,month_income
| ) T1
|)TT1
|""".stripMargin)
.show()
}
}
版权声明:本文为luckyhoney1224原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。