欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hive 自定义UDF UDAF UDTF

程序员文章站 2022-04-28 23:49:42
...
UDF步骤:
1.继承org.apache.hadoop.hive.ql.exec.UDF
2.实现evaluate函数,evaluate函数支持重载
package cn.sina.stat.hive.udf;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class SortFieldContent extends UDF {
        public String evaluate( final String str, String delimiter) {
               if (str == null ) {
                      return null ;
              }
               if (delimiter == null) {
                     delimiter = "," ;
              }
              String[] strs = str.split(delimiter);
              Arrays. sort(strs);
              String result = "" ;
               for (int i = 0; i < strs. length; i++) {
                      if (result.length() > 0) {
                           result.concat(delimiter);
                     }
                     result.concat(strs[i]);
              }
               return result;
       }

        public String evaluate( final String str, String delimiter, String order) {
               if (str == null ) {
                      return null ;
              }
               if (delimiter == null) {
                     delimiter = "," ;
              }
               if (order != null && order.toUpperCase().equals( "ASC" )) {
                      return evaluate(str, delimiter);
              } else {
                     String[] strs = str.split(delimiter);
                     Arrays. sort(strs);
                     String result = "" ;
                      for (int i = strs. length - 1; i >= 0; i--) {
                            if (result.length() > 0) {
                                  result.concat(delimiter);
                           }
                           result.concat(strs[i]);
                     }
                      return result;
              }
       }
}

UDAF步骤:
1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF
内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
具体执行过程如图:
hive 自定义UDF UDAF UDTF
package cn.sina.stat.hive.udaf;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class ConcatClumnGroupByKeyWithOrder extends UDAF {
     public static class ConcatUDAFEvaluator implements UDAFEvaluator {
          public static class PartialResult {
               String result;
               String delimiter;
               String order;
          }

          private PartialResult partial;

          public void init() {
               partial = null;
          }

          public boolean iterate(String value, String delimiter, String order) {

               if (value == null) {
                    return true;
               }
               if (partial == null) {
                    partial = new PartialResult();
                    partial.result = new String("");
                    if (delimiter == null || delimiter.equals("")) {
                         partial.delimiter = new String(",");
                    } else {
                         partial.delimiter = new String(delimiter);
                    }
                    if (order != null
                              && (order.toUpperCase().equals("ASC") || order
                                        .toUpperCase().equals("DESC"))) {
                         partial.order = new String(order);
                    } else {
                         partial.order = new String("ASC");
                    }

               }
               if (partial.result.length() > 0) {
                    partial.result = partial.result.concat(partial.delimiter);
               }

               partial.result = partial.result.concat(value);

               return true;
          }

          public PartialResult terminatePartial() {
               return partial;
          }

          public boolean merge(PartialResult other) {
               if (other == null) {
                    return true;
               }
               if (partial == null) {
                    partial = new PartialResult();
                    partial.result = new String(other.result);
                    partial.delimiter = new String(other.delimiter);
                    partial.order = new String(other.order);
               } else {
                    if (partial.result.length() > 0) {
                         partial.result = partial.result.concat(partial.delimiter);
                    }
                    partial.result = partial.result.concat(other.result);
               }
               return true;
          }

          public String terminate() {
               String[] strs = partial.result.split(partial.delimiter);
               Arrays.sort(strs);
               String result = new String("");
               if (partial.order.equals("DESC")) {
                    for (int i = strs.length - 1; i >= 0; i--) {
                         if (result.length() > 0) {
                              result.concat(partial.delimiter);
                         }
                         result.concat(strs[i]);
                    }
               } else {
                    for (int i = 0; i < strs.length; i++) {
                         if (result.length() > 0) {
                              result.concat(partial.delimiter);
                         }
                         result.concat(strs[i]);
                    }
               }
               return new String(result);
          }
     }
}

UDTF步骤:
1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
a.initialize初始化验证,返回字段名和字段类型
b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回
c.最后调用close()方法进行清理工作
package cn.sina.stat.hive.udtf;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class SortFieldExplodeToPair extends GenericUDTF {

     @Override
     public void close() throws HiveException {
          // TODO Auto-generated method stub
     }

     @Override
     public StructObjectInspector initialize(ObjectInspector[] args)
               throws UDFArgumentException {
          if (args.length != 3) {
               throw new UDFArgumentLengthException(
                         "SortFieldExplodeToPair takes only three argument");
          }
          if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
               throw new UDFArgumentException(
                         "SortFieldExplodeToPair takes string as first parameter");
          }
          if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
               throw new UDFArgumentException(
                         "SortFieldExplodeToPair takes string as second parameter");
          }
          if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) {
               throw new UDFArgumentException(
                         "SortFieldExplodeToPair takes string as third parameter");
          }
          if (args[2] == null
                    || !(args[2].toString().toUpperCase().equals("ASC") || args[2]
                              .toString().toUpperCase().equals("DESC"))) {
               throw new UDFArgumentException(
                         "SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\"");
          }

          ArrayList<String> fieldNames = new ArrayList<String>();
          ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
          fieldNames.add("col1");
          fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

          return ObjectInspectorFactory.getStandardStructObjectInspector(
                    fieldNames, fieldOIs);
     }

     private final String[] forwardStr = new String[1];

     @Override
     public void process(Object[] args) throws HiveException {
          String input = args[0].toString();
          String delimiter = args[1].toString();
          String order = args[2].toString();
          String[] strList = input.split(delimiter);
          Arrays.sort(strList);
          if (strList.length > 1) {
               if (order.toUpperCase().equals("DESC")) {
                    for (int i = strList.length - 1; i > 0; i--) {
                         forwardStr[0] = strList[i].concat(delimiter).concat(
                                   strList[i - 1]);
                         forward(forwardStr);
                    }
               } else {
                    for (int i = 0; i < strList.length - 1; i++) {
                         forwardStr[0] = strList[i].concat(delimiter).concat(
                                   strList[i + 1]);
                         forward(forwardStr);
                    }
               }
          } else {
               forward(strList);
          }
     }
}