Hive的UDTF函数

在一些应用场景中,需要对一个字段进行分割,形成多个字段,比如日志信息使用#号连接字段,那么在导入数据仓库的时候,使用UDTF函数就显得比较方便和得心应手了。
在Hive的官网上,可以看到最UDTF的介绍是这样的:

GenericUDTF Interface

A custom UDTF can be created by extending the GenericUDTF abstract class and then implementing the initialize, process, and possibly close methods. The initialize method is called by Hive to notify the UDTF the argument types to expect. The UDTF must then return an object inspector corresponding to the row objects that the UDTF will generate. Once initialize() has been called, Hive will give rows to the UDTF using the process() method. While in process(), the UDTF can produce and forward rows to other operators by calling forward(). Lastly, Hive will call the close() method when all the rows have passed to the UDTF.

那么自定义UDTF函数需要继承GenericUDTF 抽象方法,实现initializeprocessclose 这三个方法。

其中initialize方法中声明了Hive中需要的参数类型。

process方法中进行我们所期望的操作,并调用forward方法把内容写到hive对应的行中。

当所有的行都写完后,会执行close方法。

下面代码实现了输入一个String类型的字符串,输入分隔符。会按照分隔符分割,之后把内容进行输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package tech.mapan.hive.udtf;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.List;

public class ExplodUDTF extends GenericUDTF {

private ArrayList<String> outList = new ArrayList<>();

@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {


//1.定义输出数据的列名和类型
List<String> fieldNames = new ArrayList<>();
List<ObjectInspector> fieldOIs = new ArrayList<>();

//2.添加输出数据的列名和类型
fieldNames.add("lineToWord");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {

//1.获取原始数据
String arg = args[0].toString();

//2.获取数据传入的第二个参数,此处为分隔符
String splitKey = args[1].toString();

//3.将原始数据按照传入的分隔符进行切分
String[] fields = arg.split(splitKey);

//4.遍历切分后的结果,并写出
for (String field : fields) {

//集合为复用的,首先清空集合
outList.clear();

//将每一个单词添加至集合
outList.add(field);

//将集合内容写出
forward(outList);
}
}

@Override
public void close() throws HiveException {

}
}

打好jar包并放到对应目录后,创建函数,尝试执行一句简单的,可以看到1001#Jack#18#1999-01-02#Male已经被按照期望的形式分割出来。其中linetoword是在UDTF函数中定义的列名

1
2
3
4
5
6
7
8
9
10
11
12
13
hive (default)> create function mp_explod as "tech.mapan.hive.udtf.ExplodUDTF";
OK
Time taken: 0.03 seconds

hive (default)> select mp_explod('1001#Jack#18#1999-01-02#Male','#');
OK
linetoword
1001
Jack
18
1999-01-02
Male
Time taken: 12.29 seconds, Fetched: 5 row(s)

实际上,UDF还有一种更直接的使用方法,可以直接把一个字段变成两个字段(或多个字段)输出,但这样的写法比较相对比较固定。其实在initialize方法里只需要多定义一个(或多个)字段即可,输出时,会自动按顺序填充到对应的字段位置上。写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package tech.mapan.hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

/**
* @program: udtf
* @description: UDTF函数,分割成两个字段
* @author: MaPan
* @create: 2020-03-22 22:40
**/
public class ExplodUDTF2 extends GenericUDTF {
private ArrayList<String> outList = new ArrayList<>();

@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1.定义输出数据的列名和类型
List<String> fieldNames = new ArrayList<>();

// 2.添加输出数据的列名
fieldNames.add("word1");
fieldNames.add("word2");

// 3.定义输出数据的类型
List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
//1.获取原始数据
String arg = args[0].toString();

//2.获取数据传入的第二个参数,此处为分隔符
String splitKey = args[1].toString();

//3.将原始数据按照传入的分隔符进行切分
String[] fields = arg.split(splitKey);

//4.遍历切分后的结果,并写出

//集合为复用的,首先清空集合
outList.clear();
outList.add(fields[0]);
outList.add(fields[1]);
forward(outList);
}

@Override
public void close() throws HiveException {

}
}

运行效果如下,这样就把一个字段直接拆成了两个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
create function mp_explod as "tech.mapan.hive.udtf.ExplodUDTF2";
OK
Time taken: 0.029 seconds

hive (prac)> select * from test03;
OK
test03.id test03.name
1001 jack#ma
1002 dong#liu
1003 poney#ma
Time taken: 0.522 seconds, Fetched: 3 row(s)

hive (prac)> select id,first_name,last_name from test03 lateral view mp_explod2(name,"#") temp as first_name,last_name;
OK
id first_name last_name
1001 jack ma
1002 dong liu
1003 poney ma
Time taken: 13.393 seconds, Fetched: 3 row(s)

补充一点:

关于Lateral View在官网这样介绍:

Lateral View Syntax

lateralView: LATERAL VIEW udtf(expression) tableAlias AS columnAlias (','columnAlias)*
fromClause: FROM baseTable (lateralView)*

Description

Lateral view is used in conjunction with user-defined table generating functions such as explode(). As mentioned in Built-in Table-Generating Functions, a UDTF generates zero or more output rows for each input row. A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias.

Lateral View一般与用户自定义表生成函数(如explode())结合使用。 如内置表生成函数 中所述,UDTF为每个输入行生成零个或多个输出行。 Lateral View 首先将UDTF应用于基表的每一行,然后将结果输出行连接到输入行,以形成具有提供的表别名的虚拟表。

附:jar包