`
xiaofengxbf
  • 浏览: 18165 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

hive中UDF、UDAF和UDTF使用

    博客分类:
  • hive
阅读更多

Hive进行UDF开发十分简单,此处所说UDF为Temporary的function,所以需要hive版本在0.4.0以上才可以。

 

一、背景:Hive是基于Hadoop中的MapReduce,提供HQL查询的数据仓库。Hive是一个很开放的系统,很多内容都支持用户定制,包括:

a)文件格式:Text File,Sequence File

b)内存中的数据格式: Java Integer/String, Hadoop IntWritable/Text

c)用户提供的 map/reduce 脚本:不管什么语言,利用 stdin/stdout 传输数据

d)用户自定义函数: Substr, Trim, 1 – 1

e)用户自定义聚合函数: Sum, Average…… n – 1

 

2、定义:UDF(User-Defined-Function),用户自定义函数对数据进行处理。

 

二、用法

1、UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容。

2、编写UDF函数的时候需要注意一下几点:

a)自定义UDF需要继承org.apache.hadoop.hive.ql.UDF。

b)需要实现evaluate函。

c)evaluate函数支持重载。

 

3、以下是两个数求和函数的UDF。evaluate函数代表两个整型数据相加,两个浮点型数据相加,可变长数据相加

    Hive的UDF开发只需要重构UDF类的evaluate函数即可。例:

package hive.connect; 

import org.apache.hadoop.hive.ql.exec.UDF; 

public final class Add extends UDF { 

public Integer evaluate(Integer a, Integer b) { 

               if (null == a || null == b) { 

                               return null; 

               } return a + b; 

} 

public Double evaluate(Double a, Double b) { 

               if (a == null || b == null) 

                               return null; 

                               return a + b; 

               } 

public Integer evaluate(Integer... a) { 

               int total = 0; 

               for (int i = 0; i < a.length; i++) 

                               if (a[i] != null) 

                                              total += a[i]; 

                                              return total; 

                               } 

}

 

4、步骤

 

a)把程序打包放到目标机器上去;

 

b)进入hive客户端,添加jar包:hive>add jar /run/jar/udf_test.jar;

 

c)创建临时函数:hive>CREATE TEMPORARY FUNCTION add_example AS 'hive.udf.Add';

 

d)查询HQL语句:

 

SELECT add_example(8, 9) FROM scores;

 

SELECT add_example(scores.math, scores.art) FROM scores;

 

SELECT add_example(6, 7, 8, 6.8) FROM scores;

 

e)销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example;

 

5、细节在使用UDF的时候,会自动进行类型转换,例如:

 

SELECT add_example(8,9.1) FROM scores;

 

注:

 

1.   UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF

 

(二)、UDAF

 

1、Hive查询数据时,有些聚类函数在HQL没有自带,需要用户自定义实现。

 

2、用户自定义聚合函数: Sum, Average…… n – 1

 

UDAF(User- Defined Aggregation Funcation)

 

一、用法

 

1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator。

 

2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。

 

3、Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数。

 

a)init函数实现接口UDAFEvaluator的init函数。

 

b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。

 

c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。

 

d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。

 

e)terminate返回最终的聚集函数结果。

 

package hive.udaf; 

import org.apache.hadoop.hive.ql.exec.UDAF;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; 

public class Avg extends UDAF { 

         public static class AvgState { 

         private long mCount; 

         private double mSum; 

} 

public static class AvgEvaluator implements UDAFEvaluator { 

         AvgState state; 

         public AvgEvaluator() { 

                   super(); 

                   state = new AvgState(); 

                   init(); 

} 

/** * init函数类似于构造函数,用于UDAF的初始化 */ 

public void init() { 

         state.mSum = 0; 

         state.mCount = 0;

} 

/** * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean * * @param o * @return */ 

public boolean iterate(Double o) { 

         if (o != null) { 

                   state.mSum += o; 

                   state.mCount++; 

         } return true; 

} 

/** * terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据, * terminatePartial类似于hadoop的Combiner * * @return */ 

public AvgState terminatePartial() {

         // combiner 

         return state.mCount == 0 ? null : state; 

} 

/** * merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean * * @param o * @return */ 

public boolean terminatePartial(Double o) {                 

         if (o != null) { 

                   state.mCount += o.mCount; 

                   state.mSum += o.mSum; 

         } 

         return true; 

} 

/** * terminate返回最终的聚集函数结果 * * @return */ 

public Double terminate() { 

         return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount); 

} 

} 

 

 

5、执行求平均数函数的步骤

 

a)将java文件编译成Avg_test.jar。

 

b)进入hive客户端添加jar包:

 

hive>add jar /run/jar/Avg_test.jar。

 

c)创建临时函数:

 

hive>create temporary function avg_test 'hive.udaf.Avg';

 

d)查询语句:

 

hive>select avg_test(scores.math) from scores;

 

e)销毁临时函数:

 

hive>drop temporary function avg_test;

 

五、总结

 

1、重载evaluate函数。

 

2、UDF函数中参数类型可以为Writable,也可为java中的基本数据对象。

 

3、UDF支持变长的参数。

 

4、Hive支持隐式类型转换。

 

5、客户端退出时,创建的临时函数自动销毁。

 

6、evaluate函数必须要返回类型值,空的话返回null,不能为void类型。

 

7、UDF是基于单条记录的列进行的计算操作,而UDFA则是用户自定义的聚类函数,是基于表的所有记录进行的计算操作。

 

8、UDF和UDAF都可以重载。

 

9、查看函数

 

SHOW FUNCTIONS;

 

UDTF介绍

UDTF(User-Defined Table-Generating Functions)  用来解决 输入一行输出多行(On-to-many maping) 的需求。

 

2. 编写自己需要的UDTF

 


继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF。
实现initialize, process, close三个方法
UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回。最后close()方法调用,对需要清理的方法进行清理。

 

下面是一个用来切分”key:value;key:value;”这种字符串,返回结果为key, value两个字段。供参考:

 

import java.util.ArrayList;
   
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
   import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
     public class ExplodeMap extends GenericUDTF{
         @Override
       public void close() throws HiveException {
           // TODO Auto-generated method stub    
       }
         @Override
       public StructObjectInspector initialize(ObjectInspector[] args)
               throws UDFArgumentException {
           if (args.length != 1) {
               throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
           }
           if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
               throw new UDFArgumentException("ExplodeMap takes string as a parameter");
           }
             ArrayList<String> fieldNames = new ArrayList<String>();
           ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
           fieldNames.add("col1");
           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
           fieldNames.add("col2");
           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
             return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
       }
  
      @Override
       public void process(Object[] args) throws HiveException {
           String input = args[0].toString();
           String[] test = input.split(";");
           for(int i=0; i<test.length; i++) {
               try {
                   String[] result = test[i].split(":");
                   forward(result);
               } catch (Exception e) {
                  continue;
              }
         }
       }
   }

 

 

3. 使用方法

 


 

 

UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用。

 

1:直接select中使用:select explode_map(properties) as (col1,col2) from src;

 


不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src
不可以嵌套调用:select explode_map(explode_map(properties)) from src
不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col2

 

2:和lateral view一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;

 


此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。

分享到:
评论

相关推荐

    spark-hive-udf:Spark Hive UDF示例

    Spark Hive UDF示例 建立项目 mvn clean package 将spark-hive-udf-1.0.0-SNAPSHOT.jar复制到边缘节点临时目录 spark-hive-udf]# cp target/spark-hive-udf-1.0.0-SNAPSHOT.jar /tmp 通过提供罐子来启动火花壳 spark...

    javasql笔试题-spark-hive-udf:展示如何在ApacheSpark中使用HiveUDF的示例项目

    和 DataFrames 一起使用。 用 Python 编写的原生 Spark UDF 很慢,因为它们必须在 Python 进程中执行,而不是基于 JVM 的 Spark Executor。 要让 Spark Executor 运行 Python UDF,它必须: 将数据从分区发送到与 ...

    mustached-hive-udfs:一些有用的 Hive UDF 和 UDAF

    这是一些有用的 Hive UDF 和 UDAF 的集合。 提供的功能 UDAF Mode ( de.frosner.hive.udaf.Mode ) - 计算组列的统计模式 从源头构建 git clone https://github.com/FRosner/mustached-hive-udfs.git cd mustached...

    自定义hive函数

    自定义 hive udf udaf 有url解析,获取网站主域名,根据ip获取区域码,有rownum,列聚合以及一些业务实现udf。

    hive-udf-tools:hive udf 部署工具,开发工具...

    hive-udf-hook UDF开发及发布过程 1 用户编写UDF实现类 2 编写完成后,在UDFHooks类中调用相关注册函数: 调用 FunctionRegistry.registerUDF 注册udf 调用 FunctionRegistry.registerUDAF 注册udaf 调用...

    hive:个人配置单元 UDAF

    个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...

    hive-udf:hive自定义函数

    hive-udfhive自定义函数主要实现hive3种自定义函数1,udf函数,主要用于处理一对一数据处理2,udtf函数,主要用于处理一对多数据处理2,udaf函数,主要用与处理多对一数据聚合处理

    datasketches-hive:Hive的草图适配器

    Hadoop Hive UDF / UDAF 请参阅Java Core文档中不同草图类型下的相关部分。制作说明注意:此组件访问资源文件以进行测试。 结果,目标安装目录的完整绝对路径的目录元素必须符合Java标识符的条件。 换句话说,目录...

    hive常用函数

    hive常用函数,包括时间、类型、udf、udaf等等的归纳。

    赵伟:HIVE在腾讯分布式数据仓库实践

    赵伟首先介绍了他们的TDW核心架构,HIVE,MapReduce,HDFS及PostgreSQL构成。赵伟分享了最核心的HIVE模块在TDW中的实践经验;HIVE是一个在Hadoop上构建数据仓库的软件,它...实现了基本的SQL功能,可扩充UDF/UDAF...

    hive

    Apache Hive(TM)数据仓库软件有助于查询和... HiveQL还可以使用自定义标量函数(UDF),聚合(UDAF)和表函数(UDTF)进行扩展。https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-standalone-metastore-3.0.0/

    Hadoop权威指南(中文版)2015上传.rar

    在MapReduce中使用压缩 序列化 Writable接口 Writable类 实现定制的Writable类型 序列化框架 Avro 依据文件的数据结构 写入SequenceFile MapFile 第5章 MapReduce应用开发 配置API 合并多个源文件 可变的扩展 配置...

    Hadoop权威指南 第二版(中文版)

     在MapReduce中使用压缩  序列化  Writable接口  Writable类  实现定制的Writable类型  序列化框架  Avro  依据文件的数据结构  写入SequenceFile  MapFile 第5章 MapReduce应用开发  配置API  合并多个...

    蜂巢:Apache蜂巢

    HiveSQL也可以通过用户定义的函数(UDF),用户定义的集合(UDAF)和用户定义的表函数(UDTF)扩展为用户代码。 Hive用户在执行SQL查询时可以选择3种运行时。 用户可以选择Apache Hadoop MapReduce,Apache Tez或...

    大数据场景化解决方案.pdf

    数据存储与管理: ⼤数据利⽤分布式⽂件系统HDFS、HBase、Hive,实现对结构化、半结构化和⾮结构化数据的存储和管理。 数据处理与分析: 利⽤分布式并⾏编程模型和计算框架,结合机器学习和数据挖掘算法,实现对...

    Spark:Apache Spark是一个快速的内存数据处理引擎,具有优雅且富有表现力的开发API,可让数据工作者高效执行需要快速迭代访问数据集的流,机器学习或SQL工作负载。该项目将在Scala中提供Spark的示例程序语

    使用Spark-2.1实现自定义UDF,UDAF,Partitioner 使用数据框(ComplexSchema,DropDuplicates,DatasetConversion,GroupingAndAggregation) 使用数据集处理Parquet文件按特定列对数据进行分区并按分区进行存储使用...

Global site tag (gtag.js) - Google Analytics