Walt You - 行是知之始

Hive UDF

2018-05-30

Hive 内置了很多函数,可以参考Hive Built-In Functions

但是有些情况下,这些内置函数还是不能满足我们的需求,这时候就需要UDF出场了。



前言

UDF(User Defined Function),即用户自定义函数。

Hive 自定义函数主要包含以下三种:

  1. UDF(user-defined function)
  2. UDAF(user-defined aggregate function)
  3. UDTF(user-defined table function)

准备

如果想要使用UDF,需要先引入一个名为hive-exec的jar包,我选择用Maven来引入。

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.0.0</version>
</dependency>

我使用的是3.0.0版本,你也可以以在这里找到其他版本的。


UDF

1. 简介

单独处理一行,输出也是以行输出。

许多Hive内置字符串,数学函数,时间函数都是这种类型。大多数情况下编写对应功能的处理函数就能满足需求。如:concat, split, length ,rand等。

这种UDF主要有两种写法:继承实现UDF类和继承GenericUDF类(通用UDF)。

不过在3.0中,UDF类已经弃用,所以我们还是使用org.apache.hadoop.hive.ql.udf.generic.GenericUDF。

2. 实例

1)准备文件

我现在有一个文件,内容如下:

034,1:2:3:4
035,5:6
036,7:8:9:10

2)创建一张表

CREATE EXTERNAL TABLE
class_test(name string, student_id_list array<INT>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY ':'
location 'hdfs:/test/file/path/';

3)开始写UDF

创建一个类,名字叫做ListSizeUDF,它的功能是重新实现一下Hive内置函数size()的功能,只是它只接收list一种输入类型。


package yyy.test.hadoop.hive.udf;

import java.util.List;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class ListSizeUDF extends GenericUDF {

	ListObjectInspector listOI;

	@Override
	public String getDisplayString(String[] children) {
		return "listSize()";
	}

	@Override
	public ObjectInspector initialize(ObjectInspector[] arguments)
			throws UDFArgumentException {
		if (arguments.length != 1) {
			throw new UDFArgumentLengthException(
					"listSize only takes 1 arguments: List<String>");
		}
		ObjectInspector a = arguments[0];
		if (!(a instanceof ListObjectInspector)) {
			throw new UDFArgumentException(
					"first argument must be a list / array");
		}
		this.listOI = (ListObjectInspector) a;
		return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
	}

	@Override
	public Object evaluate(DeferredObject[] arguments) throws HiveException {
		List<String> list = (List<String>) this.listOI.getList(arguments[0]
				.get());
		if (list != null) {
			return list.size();
		}
		return 0;
	}
}

继承GenericUDF类后,需要实现三个函数:

  1. getDisplayString:

    显示这个函数的名字

  2. initialize:

    检验输入类型是否是预期类型,同时创建一个对象检查员,在这个例子中就是“ListObjectInspector listOI”。 最后它返回一个对象检查员,这个对象检查员类型是和我们UDF返回值一致的。 在这个例子中,我们想得到list的size,中是个int值,所以返回javaIntObjectInspector。

  3. evaluate:

    主要逻辑都将在这里实现。它先使用对象检查器从延迟对象中获取输入对象。接着,实现内在逻辑,这里直接返回“list.size()”即可。

4)使用自己的UDF

  1. 将该类打包为jar包。
  2. 将jar包传入可执行hive的机器上,路径为“/path/udf/jar/in/”
  3. 进入hive shell中,执行:

     add jar /path/udf/jar/in/;
    

    需要注意的是,这里是本地地址,不是HDFS地址。

  4. 在hive shell中,执行:

     create temporary function listSize as 'yyy.test.hadoop.hive.udf.ListSizeUDF';
    

    as后面跟的是你UDF类所在的路径。

  5. 这时候,我们就拥有了一个名为 listSize 的UDF了。来试试看。

     hive> select listSize(student_id_list) from class_test;
     OK
     4
     2
     4
     Time taken: 0.409 seconds, Fetched: 3 row(s)
    

此刻就大功告成了。


UDAF

1. 简介

用于处理多行数据并形成累加结果。一般配合group by使用。主要用于累加操作,常见的函数有max, min, count, sum,collect_set等。 这种UDF主要有两种写法:继承实现 UDAF类和继承实现AbstractGenericUDAFResolver类。

org.apache.hadoop.hive.ql.udf.generic 包下提供了很多的官方实现的UDAF,可以自己查看源码来参考实现。 这里是链接

2. 实例

这次我参考的是GenericUDAFMax类。

来实现一个找出年纪最大的用户id。

1)创建一张表

CREATE EXTERNAL TABLE
class_test(id string, info string, class string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
location 'hdfs:/test/file/path/';

内容如下:

1,tom#21,1
2,jack#22,1
3,alan#24,1

2)创建 MaxAgeUDAF 类

MaxAgeUDAF 类需要继承 AbstractGenericUDAFResolver。

继承之后,同时要重写 AbstractGenericUDAFResolver 的 getEvaluator 方法。这个方法会返回一个自定义的Evaluator内部类,内部类继承GenericUDAFEvaluator。

在getEvaluator方法里,会对输出参数进行类型的检验。在Evaluator内部类中,来实现UDAF的逻辑。

public class MaxAgeUDAF extends AbstractGenericUDAFResolver {

	@Override
	public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
			throws SemanticException {
		// Type-checking goes here!
		return new GenericUDAFMaxAgeEvaluator();
	}

	public static class GenericUDAFMaxAgeEvaluator extends GenericUDAFEvaluator {
		// UDAF logic goes here!
	}
}

3)完成getEvaluator函数

这个函数除了负责返回一个自定义的Evaluator之外,它还负责对输入类型进行检测。

函数如下:

@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
        throws SemanticException {
    if (parameters.length != 1) {
        throw new UDFArgumentTypeException(parameters.length - 1,
                "Exactly one argument is expected.");
    }
    ObjectInspector oi = TypeInfoUtils
            .getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
    if (!ObjectInspectorUtils.compareSupported(oi)) {
        throw new UDFArgumentTypeException(parameters.length - 1,
                "Cannot support comparison of map<> type or complex type containing map<>.");
    }
    return new GenericUDAFMaxAgeEvaluator();
}

4)完成Resolver类

参考 GenericUDAFMax 类, 可以发现,自定义的Resolver类需要继承了 GenericUDAFEvaluator,同时实现如下6个方法:

函数 目的
init 由Hive调用以初始化UDAF evaluator类的实例
getNewAggregationBuffer 返回将存储临时aggregation结果的对象。
iterate 将新的一行数据处理到aggregation buffer中
terminatePartial 以可持久的方式返回当前聚合的内容。这里持久化意味着返回值只能根据Java基元,arrays, 原始包装(比如Double), Hadoop Writables, Lists, 和 Maps。不要使用你自己的类,即使它们实现了Serializable接口,不然会有奇怪的错误发生或者得到错误的结果。
merge 将terminatePartial返回的部分aggregation合并到当前aggregation中
terminate 将聚合的最终结果返回给Hive
@UDFType(distinctLike = true)
@VectorizedUDAFs({ VectorUDAFMaxString.class })
public static class GenericUDAFMaxAgeEvaluator extends GenericUDAFEvaluator {

    private transient ObjectInspector inputOI;
    private transient ObjectInspector outputOI;

    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
            throws HiveException {
        super.init(m, parameters);
        assert (parameters.length == 1);
        super.init(m, parameters);
        inputOI = parameters[0];
        outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
                ObjectInspectorCopyOption.JAVA);
        return outputOI;
    }

    @AggregationType(estimable = true)
    static class MaxAgg extends AbstractAggregationBuffer {
        Object o;

        @Override
        public int estimate() {
            return JavaDataModel.PRIMITIVES2;
        }
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        MaxAgg result = new MaxAgg();
        return result;
    }

    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
            throws HiveException {
        assert (parameters.length == 1);
        merge(agg, parameters[0]);
    }

    @Override
    public Object terminatePartial(AggregationBuffer agg)
            throws HiveException {
        return terminate(agg);
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial)
            throws HiveException {
        if (partial != null) {
            MaxAgg myagg = (MaxAgg) agg;
            int r = compare(myagg.o, outputOI, partial, inputOI);
            if (myagg.o == null || r < 0) {
                myagg.o = ObjectInspectorUtils.copyToStandardObject(
                        partial, inputOI, ObjectInspectorCopyOption.JAVA);
            }
        }
    }

    private int compare(Object o1, ObjectInspector oi1, Object o2,
            ObjectInspector oi2) {
        PrimitiveObjectInspector poi1 = ((PrimitiveObjectInspector) oi1);
        PrimitiveObjectInspector poi2 = ((PrimitiveObjectInspector) oi2);
        String s1 = null;
        String s2 = null;
        if (poi1.preferWritable() || poi2.preferWritable()) {
            Text t1 = (Text) poi1.getPrimitiveWritableObject(o1);
            Text t2 = (Text) poi2.getPrimitiveWritableObject(o2);
            s1 = t1 != null ? t1.toString() : null;
            s2 = t2 != null ? t2.toString() : null;
        } else {
            s1 = (String) poi1.getPrimitiveJavaObject(o1);
            s2 = (String) poi2.getPrimitiveJavaObject(o2);
        }
        int i = s1 == null ? (s2 == null ? 0 : -1) : (s2 == null ? 1
                : comparePersonAge(s1, s2));
        return 0;
    }

    private int comparePersonAge(String s1, String s2) {
        int age1 = Integer.valueOf(s1.split("#")[1]);
        int age2 = Integer.valueOf(s2.split("#")[1]);
        int compare = age1 - age2;
        return compare > 0 ? 1 : (compare == 0 ? 0 : -1);
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
        MaxAgg myagg = (MaxAgg) agg;
        return myagg.o;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
        MaxAgg myagg = (MaxAgg) agg;
        myagg.o = null;
    }
}

5)使用UDAF

将自己的UDAF类放入 src/java/org/apache/hadoop/hive/ql/udf/generic 包下。

然后在 src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java 类中注册自己的UDAF类。

最后重新打包和编译hive。

详细可参考:GenericUDAFCaseStudy


UDTF

1. 简介

处理一行数据产生多行数据或者将一列打成多列。 如explode, 通常配合Lateral View使用,实现列转行的功能。parse_url_tuple将一列转为多列。

2. 例子

可以参考官方文档: DeveloperGuide UDTF


参考链接

  1. Hive UDF
  2. hive array、map、struct使用

Similar Posts

Content