Hive 内置了很多函数,可以参考Hive Built-In Functions。
但是有些情况下,这些内置函数还是不能满足我们的需求,这时候就需要UDF出场了。
前言
UDF(User Defined Function),即用户自定义函数。
Hive 自定义函数主要包含以下三种:
- UDF(user-defined function)
- UDAF(user-defined aggregate function)
- UDTF(user-defined table function)
准备
如果想要使用UDF,需要先引入一个名为hive-exec的jar包,我选择用Maven来引入。
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.0.0</version>
</dependency>
我使用的是3.0.0版本,你也可以以在这里找到其他版本的。
UDF
1. 简介
单独处理一行,输出也是以行输出。
许多Hive内置字符串,数学函数,时间函数都是这种类型。大多数情况下编写对应功能的处理函数就能满足需求。如:concat, split, length ,rand等。
这种UDF主要有两种写法:继承实现UDF类和继承GenericUDF类(通用UDF)。
不过在3.0中,UDF类已经弃用,所以我们还是使用org.apache.hadoop.hive.ql.udf.generic.GenericUDF。
2. 实例
1)准备文件
我现在有一个文件,内容如下:
034,1:2:3:4
035,5:6
036,7:8:9:10
2)创建一张表
CREATE EXTERNAL TABLE
class_test(name string, student_id_list array<INT>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY ':'
location 'hdfs:/test/file/path/';
3)开始写UDF
创建一个类,名字叫做ListSizeUDF,它的功能是重新实现一下Hive内置函数size()的功能,只是它只接收list一种输入类型。
package yyy.test.hadoop.hive.udf;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class ListSizeUDF extends GenericUDF {
ListObjectInspector listOI;
@Override
public String getDisplayString(String[] children) {
return "listSize()";
}
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
if (arguments.length != 1) {
throw new UDFArgumentLengthException(
"listSize only takes 1 arguments: List<String>");
}
ObjectInspector a = arguments[0];
if (!(a instanceof ListObjectInspector)) {
throw new UDFArgumentException(
"first argument must be a list / array");
}
this.listOI = (ListObjectInspector) a;
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
List<String> list = (List<String>) this.listOI.getList(arguments[0]
.get());
if (list != null) {
return list.size();
}
return 0;
}
}
继承GenericUDF类后,需要实现三个函数:
-
getDisplayString:
显示这个函数的名字
-
initialize:
检验输入类型是否是预期类型,同时创建一个对象检查员,在这个例子中就是“ListObjectInspector listOI”。 最后它返回一个对象检查员,这个对象检查员类型是和我们UDF返回值一致的。 在这个例子中,我们想得到list的size,中是个int值,所以返回javaIntObjectInspector。
-
evaluate:
主要逻辑都将在这里实现。它先使用对象检查器从延迟对象中获取输入对象。接着,实现内在逻辑,这里直接返回“list.size()”即可。
4)使用自己的UDF
- 将该类打包为jar包。
- 将jar包传入可执行hive的机器上,路径为“/path/udf/jar/in/”
-
进入hive shell中,执行:
add jar /path/udf/jar/in/;
需要注意的是,这里是本地地址,不是HDFS地址。
-
在hive shell中,执行:
create temporary function listSize as 'yyy.test.hadoop.hive.udf.ListSizeUDF';
as后面跟的是你UDF类所在的路径。
-
这时候,我们就拥有了一个名为 listSize 的UDF了。来试试看。
hive> select listSize(student_id_list) from class_test; OK 4 2 4 Time taken: 0.409 seconds, Fetched: 3 row(s)
此刻就大功告成了。
UDAF
1. 简介
用于处理多行数据并形成累加结果。一般配合group by使用。主要用于累加操作,常见的函数有max, min, count, sum,collect_set等。 这种UDF主要有两种写法:继承实现 UDAF类和继承实现AbstractGenericUDAFResolver类。
org.apache.hadoop.hive.ql.udf.generic 包下提供了很多的官方实现的UDAF,可以自己查看源码来参考实现。 这里是链接。
2. 实例
这次我参考的是GenericUDAFMax类。
来实现一个找出年纪最大的用户id。
1)创建一张表
CREATE EXTERNAL TABLE
class_test(id string, info string, class string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
location 'hdfs:/test/file/path/';
内容如下:
1,tom#21,1
2,jack#22,1
3,alan#24,1
2)创建 MaxAgeUDAF 类
MaxAgeUDAF 类需要继承 AbstractGenericUDAFResolver。
继承之后,同时要重写 AbstractGenericUDAFResolver 的 getEvaluator 方法。这个方法会返回一个自定义的Evaluator内部类,内部类继承GenericUDAFEvaluator。
在getEvaluator方法里,会对输出参数进行类型的检验。在Evaluator内部类中,来实现UDAF的逻辑。
public class MaxAgeUDAF extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
// Type-checking goes here!
return new GenericUDAFMaxAgeEvaluator();
}
public static class GenericUDAFMaxAgeEvaluator extends GenericUDAFEvaluator {
// UDAF logic goes here!
}
}
3)完成getEvaluator函数
这个函数除了负责返回一个自定义的Evaluator之外,它还负责对输入类型进行检测。
函数如下:
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Exactly one argument is expected.");
}
ObjectInspector oi = TypeInfoUtils
.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
if (!ObjectInspectorUtils.compareSupported(oi)) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Cannot support comparison of map<> type or complex type containing map<>.");
}
return new GenericUDAFMaxAgeEvaluator();
}
4)完成Resolver类
参考 GenericUDAFMax 类, 可以发现,自定义的Resolver类需要继承了 GenericUDAFEvaluator,同时实现如下6个方法:
函数 | 目的 |
---|---|
init | 由Hive调用以初始化UDAF evaluator类的实例 |
getNewAggregationBuffer | 返回将存储临时aggregation结果的对象。 |
iterate | 将新的一行数据处理到aggregation buffer中 |
terminatePartial | 以可持久的方式返回当前聚合的内容。这里持久化意味着返回值只能根据Java基元,arrays, 原始包装(比如Double), Hadoop Writables, Lists, 和 Maps。不要使用你自己的类,即使它们实现了Serializable接口,不然会有奇怪的错误发生或者得到错误的结果。 |
merge | 将terminatePartial返回的部分aggregation合并到当前aggregation中 |
terminate | 将聚合的最终结果返回给Hive |
@UDFType(distinctLike = true)
@VectorizedUDAFs({ VectorUDAFMaxString.class })
public static class GenericUDAFMaxAgeEvaluator extends GenericUDAFEvaluator {
private transient ObjectInspector inputOI;
private transient ObjectInspector outputOI;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
super.init(m, parameters);
assert (parameters.length == 1);
super.init(m, parameters);
inputOI = parameters[0];
outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
ObjectInspectorCopyOption.JAVA);
return outputOI;
}
@AggregationType(estimable = true)
static class MaxAgg extends AbstractAggregationBuffer {
Object o;
@Override
public int estimate() {
return JavaDataModel.PRIMITIVES2;
}
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
MaxAgg result = new MaxAgg();
return result;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
assert (parameters.length == 1);
merge(agg, parameters[0]);
}
@Override
public Object terminatePartial(AggregationBuffer agg)
throws HiveException {
return terminate(agg);
}
@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
MaxAgg myagg = (MaxAgg) agg;
int r = compare(myagg.o, outputOI, partial, inputOI);
if (myagg.o == null || r < 0) {
myagg.o = ObjectInspectorUtils.copyToStandardObject(
partial, inputOI, ObjectInspectorCopyOption.JAVA);
}
}
}
private int compare(Object o1, ObjectInspector oi1, Object o2,
ObjectInspector oi2) {
PrimitiveObjectInspector poi1 = ((PrimitiveObjectInspector) oi1);
PrimitiveObjectInspector poi2 = ((PrimitiveObjectInspector) oi2);
String s1 = null;
String s2 = null;
if (poi1.preferWritable() || poi2.preferWritable()) {
Text t1 = (Text) poi1.getPrimitiveWritableObject(o1);
Text t2 = (Text) poi2.getPrimitiveWritableObject(o2);
s1 = t1 != null ? t1.toString() : null;
s2 = t2 != null ? t2.toString() : null;
} else {
s1 = (String) poi1.getPrimitiveJavaObject(o1);
s2 = (String) poi2.getPrimitiveJavaObject(o2);
}
int i = s1 == null ? (s2 == null ? 0 : -1) : (s2 == null ? 1
: comparePersonAge(s1, s2));
return 0;
}
private int comparePersonAge(String s1, String s2) {
int age1 = Integer.valueOf(s1.split("#")[1]);
int age2 = Integer.valueOf(s2.split("#")[1]);
int compare = age1 - age2;
return compare > 0 ? 1 : (compare == 0 ? 0 : -1);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
MaxAgg myagg = (MaxAgg) agg;
return myagg.o;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
MaxAgg myagg = (MaxAgg) agg;
myagg.o = null;
}
}
5)使用UDAF
将自己的UDAF类放入 src/java/org/apache/hadoop/hive/ql/udf/generic 包下。
然后在 src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java 类中注册自己的UDAF类。
最后重新打包和编译hive。
详细可参考:GenericUDAFCaseStudy。
UDTF
1. 简介
处理一行数据产生多行数据或者将一列打成多列。 如explode, 通常配合Lateral View使用,实现列转行的功能。parse_url_tuple将一列转为多列。
2. 例子
可以参考官方文档: DeveloperGuide UDTF