hive中使用udf
首先如果在hive中使用udf函数的话,如果使用java编写udf,则需要导入的包:
import org.apache.hadoop.hive.ql.exec.UDF;
比如创建一个udf:
import com.yxwork.github.davidmoten.geo.GeoHash;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class UDFGeohashEncodeH extends UDF{
public String evaluate(String latitude, String longitude, Integer precision) throws Exception {
if (latitude == null || longitude == null || precision == null) {
return null;
}
return (new Text(GeoHash.encodeHash(new DoubleWritable(Double.parseDouble(latitude)).get(),
new DoubleWritable(Double.parseDouble(longitude)).get(),
new IntWritable(precision).get()))).toString();
}
}
先不用管GeoHash.encodeHash是干什么的,总之就是传入两个String类型的参数以及一个Integer类型的参数,会返回一个Sring类型的字符串。
接着在hive中使用:
//我是先将jar包上传到了hdfs的
//先要在hql中注册临时函数
beeline -e "create temporary function Encode as 'com.myudf.UDFGeohashEncodeH' using jar 'hdfs:///user/udf/firstHadoop-1.0-SNAPSHOT.jar'"
//在hql中使用
beeline -e "select Encode('32.23134','123.12312',9) from table1"
如果是打成jar包的udf要在spark-sql或者hive on spark中使用
前面导包以及上传的步骤不变,注册时用spark注册:
// spark-sql中注册并调用
spark.sql("create temporary function Encode as 'com.myudf.UDFGeohashEncodeH' using jar 'hdfs:///user/udf/firstHadoop-1.0-SNAPSHOT.jar'")
//注意此时table1应该以及注册了临时表才能查询到
spark.sql("select Encode('32.23134','123.12312',9) from table1")
//hive on spark中注册
//获取hive on spark对象
val hiveCtx = new HiveContext(sc)
hiveCtx.sql("create temporary function Encode as 'com.myudf.UDFGeohashEncodeH' using jar 'hdfs:///user/udf/firstHadoop-1.0-SNAPSHOT.jar'")
//如果table1是hive中的表应该指定那个database下的,不然也要注册临时表
hiveCtx.sql("select Encode('32.23134','123.12312',9) from table1")
但是这种方式得注意一点:在启动spark程序的时候也得要指定jar包,不然可能会报一个错误:
No handler for UDF/UDAF/UDTF 'com.******.***': java.lang.ClassNotFoundException: com.***; line 1 pos 7
org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF 'com.***': java.lang.ClassNotFoundException: com.***; line 1 pos 7
参考原文
解决方案就是启动spark的时候用 –jars指定一下jar包
spark-shell --jars hdfs://user/udf/firstHadoop-1.0-SNAPSHOT.jar --master yarn --executor-memory 12g --executor-cores 8 --num-executors 64 --driver-memory 16g -i $scala_file
spark-sql中直接使用udf
如果在spark中使用的udf只是简单的逻辑,则可以直接在scala代码中注册
需要导包:
import org.apache.spark.sql.functions.udf
接着直接定义一个方法然后注册就可以了
//定义方法
def myUdfFunc(inPram:String):String = {
val startNum = inPram.split(",")(0).toInt
val endNum = inPram.split(",")(1).toInt
val list = new util.ArrayList[Int]
for (i <- startNum to endNum-1) {
list.add(i)
}
StringUtils.join(list.toArray(), ",")
}
定义的方法需要注册成udf,在spark中有两种注册方式:
参考原文
//这种方式对sql和DataFrame API都可见
val myUdf= sqlContext.udf(myUdfFunc _)
//使用
spark.sql("select myUdf('1,10') from table1")//success
df.select($"c1",myUdf('1,10'))//success
//这种方式只能在sql()中可见,对DataFrame API不可见
sqlContext.udf.register("myUdf", myUdfFunc _)
//使用
spark.sql("select myUdf('1,10') from table1")//success
df.select($"c1",myUdf('1,10'))//fail
以上就是我整理的一些关于udf的知识啦
第一次写博客,如有不足之处或者有补充的欢迎评论区留言呀
版权声明:本文为Bayerngu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。