每天都在和你在一起

        Spark Group By函数将相同的数据收集到DataFrame/DataSet上的组,并对分组后的数据执行聚合函数。

  1. count() 返回每个组的行数
  2. mean() 返回每个组的平均值
  3. max() 返回每个组的最大值
  4. min() 返回每个组的最小值
  5. sum() 返回每个组的值的总计
  6. avg() 返回每个组的平均值

        使用agg函数,可以一次进行多次聚合运算

创建DataFrame

   val data = Seq(("James", "Sales", "NY", 90000, 34, 10000),
 ("Michael", "Sales", "NY", 86000, 56, 20000), 
("Robert", "Sales", "CA", 81000, 30, 23000), 
("Maria", "Finance", "CA", 90000, 24, 23000), 
("Raman", "Finance", "CA", 99000, 40, 24000),
 ("Scott", "Finance", "NY", 83000, 36, 19000),
 ("Jen", "Finance", "NY", 79000, 53, 15000), 
("Jeff", "Marketing", "CA", 80000, 25, 18000), 
("Kumar", "Marketing", "NY", 91000, 50, 21000) ) 
val dataFrame = data.toDF("employee_name", "department",
 "state", "salary", "age", "bonus") 

dataFrame.show()

group By聚合在DataFrame列上

dataFrame.groupBy(dataFrame("department")).sum("salary").show(false)

        如果用SQL来理解的话,是这样的

    select sum(salary) from table group by department

        总的来说,group by之后,后面跟的sum,min,max,avg等等可以是其它的列。如果使用count的话,那么就是求分组字段出现的次数。

下面介绍一下agg

dataFrame.groupBy("department").agg(
 sum("salary").as("sum_salary"), 
avg("salary").as("avg_Salary"), 
sum("bonus").as("sum_bonus") )
.filter(col("sum_bonus") >= 50000).show()

        可以在agg里面进行多个聚合,不过需要注意的是,我们在上面sum(“salary”)之后,起别名用的是sum_salary。如果下面filter过滤的时候使用

        filter(datafrmae(“sum_salary”))是会报错的,很简单,因为在group by +agg之后就形成了一个新的DataFrame,如果还在使用原来的DataFrame,就会报错,因为原来的DataFrame里面就没有这个sum_Salary。

        不过可以使用col进行过滤

使用Sort进行排序

        创建DataFrame

val simpleData = Seq(("James","Sales","NY",90000,34,10000), ("Michael","Sales","NY",86000,56,20000), 
("Robert","Sales","CA",81000,30,23000), 
("Maria","Finance","CA",90000,24,23000),
 ("Raman","Finance","CA",99000,40,24000),
 ("Scott","Finance","NY",83000,36,19000), 
("Jen","Finance","NY",79000,53,15000), 
("Jeff","Marketing","CA",80000,25,18000),
 ("Kumar","Marketing","NY",91000,50,21000) ) 
val dataFrame = simpleData.toDF("employee_name","department","state","salary","age","bonus") dataFrame.show()

        使用sort可以进行排序,看一下示例

val value = dataFrame.sort(dataFrame("department").desc, dataFrame("salary").asc) 

value.show()

根据department降序排列,salary进行升序排列。

Spark SQL中的Union

        union并不会去重,如果模式不同,会报错误

        创建两个dataFrame

val simpleData = Seq(("James","Sales","NY",90000,34,10000), ("michael","Sales","NY",86000,56,20000), 
("Robert","Sales","CA",81000,30,23000), 
("Maria","Finance","CA",90000,24,23000))

 val simpleData2 = Seq(("James","Sales","NY",90000,34,10000), ("Maria","Finance","CA",90000,24,23000), 
("Jen","Finance","NY",79000,53,15000), 
("Jeff","Marketing","CA",80000,25,18000),
 ("Kumar","Marketing","NY",91000,50,21000) ) 

val dataFrame2 = simpleData2.toDF("employee_name","department","state","salary","age","bonus") 
val dataFrame = simpleData.toDF("employee_name", "department",
 "state", "salary", "age", "bonus")

        上面是两个数据集,可以看到上面的数据集中有两条数据是重复的。

        Maria这条记录是重复的,因此如果直接union的话,并不会去重。

   val value = dataFrame.union(dataFrame2)

对于重复数据进行去重

        val l = dataFrame.union(dataFrame2).distinct()

Map和MapPartitions的例子

        Spark 中的map和mappartitions将函数应用于DataFrame/DataSet的每个元素。

        Spark中的map()转换将一个函数应用于DataFrame/DataSet中的每一行并返回新的转换后的DataSet。

        mapPartitions()这与map完全相同,不同之处在于mapPartitions提供了一种工具为每个分区进行一次繁重的初始化,而不是像map在每一行的DataFrame上进行

        关键点

        这两种转换都返回的是DataSet[U]但不返回DataFrame。

        为了看清楚map和mapPartitions之间的区别,下面通过一个例子来读懂map和mapPartitions的区别。

        创建dataFtame

val structData = Seq( Row("James","","Smith","36636","NewYork",3100), Row("Michael","Rose","","40288","California",4300), Row("Robert","","Williams","42114","Florida",1400), Row("Maria","Anne","Jones","39192","Florida",5500), Row("Jen","Mary","Brown","34561","NewYork",3000) )

 val schema = new StructType()
 .add("firstname", StringType) 
.add("middlename", StringType) 
.add("lastname", StringType) 
.add("id", StringType) 
.add("location", StringType) 
.add("salary", IntegerType) 

val dataFrame = sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(structData), schema) 

dataFrame.printSchema()

 dataFrame.show()

        首先创建一个Util类,里面有一个方法combine,combine方法可以接受三个参数,然后将这三个参数通过逗号分隔符组合起来。下面来看一下Util类

class Util extends Serializable{ 
def combine(fname:String,mname:String,lname:String) = { 
fname + "," + mname + "," + lname } }

        先来讲一下思路,将dataFrame的前三个字段,firstname,middlename,lastname三个字段合在一起。然后将结合起来的字段和id,salary进行合并。

val value = dataFrame.map(row => {
 val util = new Util() 
val fullName = util.combine(row.getString(0), 
row.getString(1), 
row.getString(2)) 
(fullName, row.getString(3), row.getInt(5)) })
 val dataFrame1 = value.toDF("fullName", "id", "salary") 
dataFrame1.printSchema()

        创建Util对象,使用combine方法将三个字段合并起来。

        然后返回一个元组(结合字段,id,salary)

        上面的元组转换成DataFrame

如果使用mapPartitions应该怎么做呢

val value1 = dataFrame.mapPartitions(iter => { 
val util = new Util() 
val iterator = iter.map(row => { 
val str = util.combine(row.getString(0), row.getString(1), row.getString(2)) 
(str, row.getString(3), row.getInt(5)) }) 
iterator })

        mapPartitions是每一个分区里面的数据,装载在迭代器里面,在迭代器里面使用map,对每个迭代器里面的数据进行操作。

总结

        有时候 不想自己陷进去,但有时候还是会陷进去,想要独善其身也是需要很大的勇气的。

        希望我的她能够每天快乐的生活着,也希望我能给他带来更多的欢乐


版权声明:本文为weixin_46300771原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_46300771/article/details/123340929