一、公司代码,年度,1月-12月的收入金额
数据:
burk,year,tsl01,tsl02,tsl03,tsl04,tsl05,tsl06,tsl07,tsl08,tsl09,tsl10,tsl11,tsl12

853101,2010,100200,25002,19440,20550,14990,17227,40990,28778,19088,29889,10990,20990
853101,2011,19446,20556,14996,17233,40996,28784,19094,28779,19089,29890,10991,20991
853101,2012,19447,20557,14997,17234,20560,15000,17237,28780,19090,29891,10992,20992
853101,2013,20560,15000,17237,41000,17234,20560,15000,17237,41000,29892,10993,20993
853101,2014,19449,20559,14999,17236,41000,28788,28786,19096,29897,41000,28788,20994
853101,2015,100205,25007,19445,20555,17236,40999,28787,19097,29898,29894,10995,20995
853101,2016,100206,25008,19446,20556,17237,41000,28788,19098,29899,29895,10996,20996
853101,2017,100207,25009,17234,20560,15000,17237,41000,15000,17237,41000,28788,20997
853101,2018,100208,25010,41000,28788,28786,19096,29897,28786,19096,29897,10998,20998
853101,2019,100209,25011,17236,40999,28787,19097,29898,28787,19097,29898,10999,20999
846271,2010,100210,25012,17237,41000,28788,19098,29899,28788,19098,29899,11000,21000
846271,2011,100211,25013,19451,20561,15001,17238,41001,28789,19099,29900,11001,21001
846271,2012,100212,100213,20190,6484,46495,86506,126518,166529,206540,246551,286562,326573
846271,2013,100213,100214,21297,5008,44466,83924,123382,162839,202297,241755,281213,320671
846271,2014,100214,100215,22405,3531,42436,81341,120245,159150,198055,236959,275864,314769
846271,2015,100215,100216,23512,2055,19096,29897,28786,19096,29897,41000,29892,308866
846271,2016,100216,100217,24620,579,38377,76175,28788,28786,19096,29897,41000,302964
846271,2017,100217,100218,25727,898,36347,73592,40999,28787,19097,29898,29894,297062
846271,2018,100218,100219,26835,2374,34318,71009,41000,28788,19098,29899,29895,291159
846271,2019,100219,100220,27942,3850,32288,68427,17237,41000,15000,17237,41000,285257
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

object Cade01 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("burk")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()


    import org.apache.spark.sql.functions._
    import spark.implicits._


    val burkDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("burk INT,year STRING,tsl01 INT,tsl02 INT,tsl03 INT,tsl04 INT,tsl05 INT,tsl06 INT,tsl07 INT,tsl08 INT,tsl09 INT,tsl10 INT,tsl11 INT,tsl12 INT")
      .load("data/1126/company_data.txt")

    burkDF.show()


    // 1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数
    //输出结果 : 公司代码,年度,月份,当月收入,累计收入
    val columns: Column = map(
      expr("1"), $"tsl01",
      expr("2"), $"tsl02",
      expr("3"), $"tsl03",
      expr("4"), $"tsl04",
      expr("5"), $"tsl05",
      expr("6"), $"tsl06",
      expr("7"), $"tsl07",
      expr("8"), $"tsl08",
      expr("9"), $"tsl09",
      expr("10"), $"tsl10",
      expr("11"), $"tsl11",
      expr("12"), $"tsl12"
    )
    burkDF
      .select($"burk",$"year",explode(columns) as Array("month","month_income"))
      .withColumn("count_income",sum($"month_income") over Window.partitionBy($"burk",$"year").orderBy($"month"))
      .show(1000)


    // 2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
    //公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
    burkDF
      .select($"burk",$"year",explode(columns) as Array("month","month_income"))
      .withColumn("last_year_monthIncome",lag($"month_income",1) over Window.partitionBy($"burk",$"month").orderBy("year"))

      .withColumn("growth_rate",round($"month_income" / $"last_year_monthIncome" - 1,6))
      .select($"burk",$"year",$"month",$"growth_rate")
      .show()
      


SPARK SQL:
    //注册临时表
    burkDF.createTempView("burk_data")
    // 1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数
    //输出结果 : 公司代码,年度,月份,当月收入,累计收入
    spark.sql(
      """
        |SELECT T1.burk,T1.year,T1.month,T1.month_income
        |,sum(T1.month_income) over (partition by T1.burk,T1.year order by T1.month) as count_income
        |FROM
        |(
        |SELECT burk,year,month,month_income
        |FROM burk_data lateral view explode (map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) v1 as month,month_income
        |) T1
        |""".stripMargin)
      .show()


    // 2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
    //公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
    spark.sql(
      """
        |SELECT TT1.burk,TT1.year,TT1.month,ROUND(TT1.month_income / TT1.last_year_monthIncome - 1, 5)  as growth_rate
        |FROM(
        |    SELECT T1.burk,T1.year,T1.month,T1.month_income
        |     ,LAG(T1.month_income) over(PARTITION BY T1.burk,T1.month ORDER BY year) AS last_year_monthIncome
        |     FROM
        |     (
        |       SELECT burk,year,month,month_income
        |       FROM burk_data lateral view explode (map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) v1 as month,month_income
        |     ) T1
        |)TT1
        |""".stripMargin)
      .show()

  }
}

版权声明:本文为luckyhoney1224原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/luckyhoney1224/article/details/128086810