欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

hive的实践部分

程序员文章站 2022-05-25 19:56:34
一.hive的事务 (1)什么是事务 要知道hive的事务,首先要知道什么是transaction(事务)?事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。 事务有四大特性:A、C、I、D (原子性、一致性、隔离性、持久性) Atomicity: 不可再分割的工作 ......
 

一.hive的事务

(1)什么是事务

要知道hive的事务,首先要知道什么是transaction(事务)?事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。

事务有四大特性:a、c、i、d (原子性、一致性、隔离性、持久性)

 atomicity: 不可再分割的工作单位,事务中的所有操作要么都发,要么都不发。

consistency: 事务开始之前和事务结束以后,数据库的完整性约束没有被破坏。这是说数据库事务不能破坏关系数据的完整性以及业务逻辑上的 一致性。 

isolation: 多个事务并发访问,事务之间是隔离的

durability: 意味着在事务完成以后,该事务锁对数据库所作的更改便持久的保存在数据库之中,并不会被回滚。 

(2)hive事务的特点与局限性

从hive的0.14版本开始支持低等级的事务

支持事务的增删改查,从hive的2.2版本开始,开始支持merge

不支持事务的begin、commit以及rollback(事务的回滚)

不支持使用update更新分桶列和分区列

想使用事务的话,文件格式必须是orc

需要压缩工作,需要时间,资源和空间

支持s(共享锁)和x(排它锁)

不允许从一个非acid连接写入/读取acid表

(3)hive的事务开启

hive的事务开启有三种方式:a.通过ambari ui-hive config 

               b.通过hive-xml 的配置文件添加如下内容

<property>  
<name>hive.support.concurrency</name>  
<value>true</value>  
</property>  
<property>  
<name>hive.txn.manager</name> 
<value>org.apache.hadoop.hive.ql.lockmgr.dbtxnmanager</value> 
</property>

  

                                     c.通过命令行,在beeline这种交互式环境下:

set hive.support.concurrency = true; 
set hive.enforce.bucketing = true; 
set hive.exec.dynamic.partition.mode = nonstrict; 
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.dbtxnmanager; 
set hive.compactor.initiator.on = true; 
set hive.compactor.worker.threads = 1;

(4)hive的merge

merge的语法:

merge into <target table> as t using <source expression/table> as s on <boolean expression1>

when matched [and <boolean expression2>] then update set <set clause list>

when matched [and <boolean expression3>] then delete

when not matched [and <boolean expression4>] then insert values<value list>

merge的局限性:

最多三条when语句,只支持update/delete/insert。when not matched 必须在when语句的最后面。

如果出现update和delete的时候 ,两个条件是分开的,而且必须在条件前面加上and.像 [and <boolean expression>]

(5)例子

a.创建两个事务表

create table if not exists employee ( 
emp_id int,  
emp_name string,  
dept_name string, 
work_loc string )  
partitioned by (start_date string) 
clustered by (emp_id) into 2 buckets stored as orc tblproperties('transactional'='true'); 


create table employee_state(
emp_id int,
emp_name string,
dept_name string,
work_loc string,
start_date string,
state string
)
stored as orc;

b.开启事务(见上面的开启事务的c,一般有些默认的设置是开的,我这里就只开了自动分区和分桶)

c.插入数据

 

insert into table employee partition (start_date) values (1,'will','it','toronto','20100701'), 
(2,'wyne','it','toronto','20100701'), 
(3,'judy','hr','beijing','20100701'), 
(4,'lili','hr','beijing','20101201'), 
(5,'mike','sales','beijing','20101201'), (6,'bang','sales','toronto','20101201'), (7,'wendy','finance','beijing','20101201');

insert into table employee_state values
(2,’wyne’,’it’,’beijing’,’20100701’,’update’),
(4,’lili’,’hr’,’beijing’,’20101201’,’quit’),
(8,’james’,’it’,’toronto’,’20170101’,’new’)

d.检验数据是否被插入

hive的实践部分

hive的实践部分

 

e.这里通过merge操作,完成更新、删除、插入操作。

employe字段解释:id为2的员工之前的工作地在toronto,现在在beijing,state的状态为update。所以需要更新表employee中员工2的信息

id为4的员工的state状态为quit,说明目前员工已经离职,所以需要在employee表中删除关于id为4的员工的信息。

id为8的员工的state状态为new,说明是新员工,所以需要插入empoyee中。

merge into employee as t
using employee_state as s
on t.emp_id = s.emp_id and t.start_date = s.start_date
when matched and s.state = 'update' then update set dept_name = s.dept_name,work_loc = s.work_loc
when matched and s.state = 'quit' then delete
when not matched then insert values(s.emp_id,s.emp_name,s.dept_name,s.work_loc,s.start_date);
--这里目标表为employee,源表为employee_state
--这里新员工是属于第三中情况,未在目标表中匹配到,所以直接插入到目标表中。

  hive的实践部分

二.hive的udf

(1)什么是hive的udf?

user-defined function (udf): 这提供了一种使用外部函数(在java中)扩展功能的方法,可以在hql中进行评估

(2)hive的udf分类

hive的udf一般分为三种:

  a.udf:用户定义的简单函数,按行操作并为一行输出一个结果,例如大多数内置数学和字符串函数

  b.udaf: 用户定义的聚合函数,按行或按组操作,并为每个组输出一行或一行,例如max和count内置函数。

  c.udtf:用户定义的表生成函数也按行运行,但结果会生成多行/表,例如explode函数。 udtf可以在select之后或在lateral view语句之后使用。

 (3)hive的udf使用举例

  a.对于hive的udf,这里我写了一个把字符串的大写全部换成小写和一个判断字符串是否在一个array数组里面的函数

--将字符串的所有大写改成小写
import org.apache.hadoop.hive.ql.exec.udf;
import org.apache.hadoop.io.text;
 
public final class stringlower extends udf {
  public text evaluate(final text s) {
    if (s == null) { return null; }
    return new text(s.tostring().tolowercase());
  }
}
--判断当前字符串是否在数组里面
import org.apache.hadoop.hive.ql.exec.description;
import org.apache.hadoop.hive.ql.exec.udfargumentexception;
import org.apache.hadoop.hive.ql.exec.udfargumenttypeexception;
import org.apache.hadoop.hive.ql.metadata.hiveexception;
import org.apache.hadoop.hive.ql.udf.generic.genericudf;
import org.apache.hadoop.hive.serde.serdeconstants;
import org.apache.hadoop.hive.serde2.objectinspector.listobjectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.objectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.objectinspectorutils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.primitiveobjectinspectorfactory;
import org.apache.hadoop.io.booleanwritable;

@description(name = "arraycontains",
        value="_func_(array, value) - returns true if the array contains value.",
        extended="example:\n"
                + "  > select _func_(array(1, 2, 3), 2) from src limit 1;\n"
                + "  true")


public class arraycontains extends genericudf {
    private static final int array_idx = 0;
    private static final int value_idx = 1;
    private static final int arg_count = 2;//这个udf函数需要参数的个数
    private static final string func_name = "arraycontains";//外部名字

    private transient  objectinspector valueoi;
    private transient listobjectinspector arrayoi;
    private transient objectinspector arrayelementoi;
    private booleanwritable result;
    @override
    public objectinspector initialize(objectinspector[] arguments) throws udfargumentexception {
        //检查是否传入了两个参数
        if (arguments.length != arg_count) {
            throw new udfargumentexception("the function" + func_name + "accepts"
                    + arg_count + "arguments");
        }
        //检查参数是否是属于list类型
        if (!arguments[array_idx].getcategory().equals(objectinspector.category.list)) {
            throw new udfargumenttypeexception(array_idx, "\"" + serdeconstants.list_type_name + "\""
                    + "expected at function array_contains,but"
                    + "\"" + arguments[array_idx].gettypename() + "\""
                    + "is found");

        }

        arrayoi = (listobjectinspector) arguments[array_idx];
        arrayelementoi = arrayoi.getlistelementobjectinspector();

        valueoi = arguments[value_idx];

        //检查list的元素和传入的值是否属于同一个类型
        if (!objectinspectorutils.comparetypes(arrayelementoi, valueoi)) {
            throw new udfargumenttypeexception(value_idx, "\"" + arrayelementoi.gettypename() + "\""
                    + "expectd at function array_contains,but"
                    + "\"" + valueoi.gettypename() + "\""
                    + "is found");
        }

        //检查此类型是否支持比较
        if (!objectinspectorutils.comparesupported(valueoi)){
            throw new udfargumentexception("this function" + func_name
                    +"does not support comparison for"
                    +"\"" + valueoi.gettypename() + "\""
                    + "types");

        }
        result = new booleanwritable(false);
        return primitiveobjectinspectorfactory.writablebooleanobjectinspector;
    }
    @override
    public object evaluate(deferredobject[] arguments) throws hiveexception {
       result.set(false);
       object array = arguments[array_idx].get();
       object value = arguments[value_idx].get();

       int arraylength = arrayoi.getlistlength(array);

       //检查数组是否null还是空value是否为null
        if (value == null || arraylength <= 0)//判断value是否为空,若真则不判断右边,不然为假继续判断右边
        {
            return result;//满足条件直接返回result初始状态值
        }

        //将值与数组的每个元素进行比较,直到找到匹配项
        for (int i=0;i<arraylength;i++){
            object listelement = arrayoi.getlistelement(array,i);
            if(listelement != null){
                if (objectinspectorutils.compare(value,valueoi,listelement,arrayelementoi) == 0){
                        result.set(true);//找到匹配,直接将result置于真
                        break;
                }
            }
        }

        return  result;//返回真值result
    }

    @override
    public string getdisplaystring(string[] childeren) {
        assert (childeren.length == arg_count);
        return "array_contains(" + childeren[array_idx] + ","
                + childeren[value_idx] + ")";
    }
}

  b.然后通过编译器打包到hdfs文件系统上,通过执行hive命令构造函数

drop function if exists str_lower; 
drop function if exists array_contains; 
create function str_lower as 'com.data.hiveudf.udf.stringlower'  
using jar 'hdfs:////apps/hive/functions/df-hiveudf-1.0-snapshot.jar'; 
create function array_contains as 'com.data.hiveudf.gudf.arraycontains'  
using jar 'hdfs:////apps/hive/functions/df-hiveudf-1.0-snapshot.jar';

  c.使用自定义函数

这里使用了另一库里的一张employee表,里面使用了string类型、array类型...。表描述与内容如下:

hive的实践部分

hive的实践部分

然后使用str_lower的函数:

hive的实践部分

使用array_contains函数

hive的实践部分