HIVESERER2 java Thrift API 接口使用事例(CDH5.4.0及以后版本)
程序员文章站
2022-07-10 23:18:09
...
参考:https://my.oschina.net/u/2874009/blog/853633 进行修改
如hive版本为5.3.10前版本,请参考: https://blog.csdn.net/cai5/article/details/81388489 hiveserver2 java thrift api 接口使用事例(CDH5.3.10及之前版本)
hive CDH-5.4.0版本后,thrift api获取日志方法、获取查询结果等方法变化较大,在hive-1.1.0-cdh5.12.1中测试通过.
package com.cai;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hive.com.esotericsoftware.minlog.Log;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TCancelOperationReq;
import org.apache.hive.service.cli.thrift.TColumn;
import org.apache.hive.service.cli.thrift.TColumnDesc;
import org.apache.hive.service.cli.thrift.TExecuteStatementReq;
import org.apache.hive.service.cli.thrift.TExecuteStatementResp;
import org.apache.hive.service.cli.thrift.TFetchOrientation;
import org.apache.hive.service.cli.thrift.TFetchResultsReq;
import org.apache.hive.service.cli.thrift.TFetchResultsResp;
import org.apache.hive.service.cli.thrift.TGetOperationStatusReq;
import org.apache.hive.service.cli.thrift.TGetOperationStatusResp;
import org.apache.hive.service.cli.thrift.TGetResultSetMetadataReq;
import org.apache.hive.service.cli.thrift.TGetResultSetMetadataResp;
import org.apache.hive.service.cli.thrift.TOpenSessionResp;
import org.apache.hive.service.cli.thrift.TOperationHandle;
import org.apache.hive.service.cli.thrift.TOperationState;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
import org.apache.hive.service.cli.thrift.TRowSet;
import org.apache.hive.service.cli.thrift.TSessionHandle;
import org.apache.hive.service.cli.thrift.TTableSchema;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class QueryInstance {
private static String host = "10.17.35.60";
private static int port = 10000;
private static String username = "hive";
private static String passsword = "hive";
private static TTransport transport;
private static TCLIService.Client client;
private TOperationState tOperationState = null;
private Map<String, Object> resultMap = new HashMap<String, Object>();
static {
try {
transport = QueryTool.getSocketInstance(host, port);
client = new TCLIService.Client(new TBinaryProtocol(transport));
transport.open();
} catch (TTransportException e) {
Log.error("hive collection error!");
}
}
/**
* 提交查询
* @param command
* @return
* @throws Exception
*/
public TOperationHandle submitQuery(String command) throws Exception {
TOperationHandle tOperationHandle;
TExecuteStatementResp resp = null;
TOpenSessionResp sessionResp = QueryTool.openSession(client, username, passsword) ;
// TSessionHandle sessHandle = QueryTool.openSession(client).getSessionHandle();
TSessionHandle sessHandle = sessionResp.getSessionHandle();
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, command);
// 异步运行
execReq.setRunAsync(true);
// 执行sql
resp = client.ExecuteStatement(execReq);// 执行语句
tOperationHandle = resp.getOperationHandle();// 获取执行的handle
if (tOperationHandle == null) {
//语句执行异常时,会把异常信息放在resp.getStatus()中。
throw new Exception(resp.getStatus().getErrorMessage());
}
return tOperationHandle;
}
/*
// 获取查询日志 hive.version: 0.12.0-cdh5.0.1(5.3.10前,使用此方法获取查询日志)
@Override
public String getQueryLog(TOperationHandle tOperationHandle) throws Exception {
if(tOperationHandle!=null){
TGetLogReq tGetLogReq = new TGetLogReq(tOperationHandle);
TGetLogResp logResp = client.GetLog(tGetLogReq);
log = logResp.getLog();
}
return log;
}*/
/**
* CDH5.4.0及之后版本 使用此方法获取查询日志
* @param tOperationHandle
* @return
* @throws Exception
*/
public String getQueryLog(TOperationHandle tOperationHandle) throws Exception {
String log = "";
if(tOperationHandle != null){
StringBuffer sbLog = new StringBuffer() ;
TFetchResultsReq fetchReq = new TFetchResultsReq(tOperationHandle, TFetchOrientation.FETCH_NEXT, 1000 );
fetchReq.setFetchType((short) 1); //主要需要设置为1
TFetchResultsResp resp = client.FetchResults(fetchReq);
TRowSet rs = resp.getResults() ;
if (null != rs ){
RowSet rowSet = RowSetFactory.create(rs,TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
for (Object[] row : rowSet) {
sbLog.append(String.valueOf(row[0])).append("\n") ;
}
}
log = sbLog.toString() ;
}
return log;
}
/*
* 获取查询状态
* 执行状态在TOperationState 类中,包括:
* INITIALIZED_STATE(0),
* RUNNING_STATE(1),
* FINISHED_STATE(2),
* CANCELED_STATE(3),
* CLOSED_STATE(4),
* ERROR_STATE(5),
* UKNOWN_STATE(6),
* PENDING_STATE(7);
*/
public TOperationState getQueryHandleStatus(
TOperationHandle tOperationHandle) throws Exception {
if (tOperationHandle != null) {
TGetOperationStatusReq statusReq = new TGetOperationStatusReq(
tOperationHandle);
TGetOperationStatusResp statusResp = client
.GetOperationStatus(statusReq);
tOperationState = statusResp.getOperationState();
}
return tOperationState;
}
/**
* 获取查询字段名
* @param tOperationHandle
* @return
* @throws Throwable
*/
public List<String> getColumns(TOperationHandle tOperationHandle)
throws Throwable {
TGetResultSetMetadataResp metadataResp;
TGetResultSetMetadataReq metadataReq;
TTableSchema tableSchema;
metadataReq = new TGetResultSetMetadataReq(tOperationHandle);
metadataResp = client.GetResultSetMetadata(metadataReq);
List<TColumnDesc> columnDescs;
List<String> columns = null;
tableSchema = metadataResp.getSchema();
if (tableSchema != null) {
columnDescs = tableSchema.getColumns();
columns = new ArrayList<String>();
for (TColumnDesc tColumnDesc : columnDescs) {
columns.add(tColumnDesc.getColumnName());
}
}
return columns;
}
/**
* 获取执行结果 select语句
* 得到的结果为以列的形式返回
*/
public List<Object> getResults(TOperationHandle tOperationHandle) throws Throwable{
TFetchResultsReq fetchReq = new TFetchResultsReq();
fetchReq.setOperationHandle(tOperationHandle);
fetchReq.setMaxRows(1000);
TFetchResultsResp re=client.FetchResults(fetchReq);
List<TColumn> list = re.getResults().getColumns();
List<Object> list_row = new ArrayList<Object>();
for(TColumn field:list){
if (field.isSetStringVal()) {
list_row.add(field.getStringVal().getValues());
} else if (field.isSetDoubleVal()) {
list_row.add(field.getDoubleVal().getValues());
} else if (field.isSetI16Val()) {
list_row.add(field.getI16Val().getValues());
} else if (field.isSetI32Val()) {
list_row.add(field.getI32Val().getValues());
} else if (field.isSetI64Val()) {
list_row.add(field.getI64Val().getValues());
} else if (field.isSetBoolVal()) {
list_row.add(field.getBoolVal().getValues());
} else if (field.isSetByteVal()) {
list_row.add(field.getByteVal().getValues());
}
}
/*for(Object obj:list_row){
System.out.println(obj);
}*/
return list_row;
}
/**
* 转换查询结果,由原来的列转为行
* @param objs
* @return
*/
public List<Object> toResults(List<Object> objs){
List<Object> rets = new ArrayList<Object>() ;
if (objs != null){
List row = null ;
List list = (List) objs.get(0) ;
int rowCnt = list.size() ;
for(int i = 0; i < rowCnt ; i++){
rets.add(new ArrayList()) ;
}
for(int i = 0; i < objs.size() ; i++){
list = (List) objs.get(i) ;
for (int j = 0; j < rowCnt; j++){
((List)rets.get(j)).add(list.get(j)) ;
}
}
System.out.println("---------------------------------");
System.out.println(rets);
System.out.println("---------------------------------");
}
return rets ;
}
/**
* 取消查询
* @param tOperationHandle
* @throws Throwable
*/
public void cancelQuery(TOperationHandle tOperationHandle) throws Throwable {
if (tOperationState != TOperationState.FINISHED_STATE) {
TCancelOperationReq cancelOperationReq = new TCancelOperationReq();
cancelOperationReq.setOperationHandle(tOperationHandle);
client.CancelOperation(cancelOperationReq);
}
}
}
package com.cai;
import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TOpenSessionReq;
import org.apache.hive.service.cli.thrift.TOpenSessionResp;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class QueryTool {
public static TTransport getSocketInstance(String host,int port) throws TTransportException{
TSocket tSocket = new TSocket(host, port);
tSocket.setTimeout(100000);
return tSocket;
}
/**
* 如果使用此方法中设置的user进行访问,则需要
* HiveServer2 启用模拟
* hive.server2.enable.impersonation, hive.server2.enable.doAs = true即HiveServer2 Default Group打钩
* 获取TOpenSessionResp
* @return
* @throws TException
*/
/**/
public static TOpenSessionResp openSession(TCLIService.Client client,String user,String pwd) throws TException{
TOpenSessionReq openSessionReq = new TOpenSessionReq();
openSessionReq.setUsername(user);
openSessionReq.setPassword(pwd);
openSessionReq.setUsernameIsSet(true);
return client.OpenSession(openSessionReq);
}
/* public static TOpenSessionResp openSession(TCLIService.Client client) throws TException{
TOpenSessionReq openSessionReq = new TOpenSessionReq();
return client.OpenSession(openSessionReq);
} */
}
package com.cai.test;
import java.util.List;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.thrift.TGetOperationStatusReq;
import org.apache.hive.service.cli.thrift.TOperationHandle;
import org.apache.hive.service.cli.thrift.TOperationState;
import com.cai.QueryInstance;
public class Test {
public static void main(String[] args) {
try {
QueryInstance base = new QueryInstance();
TOperationHandle handle = base.submitQuery(" select count(*) from zxs_test limit 15");//show databases //select count(*) from cdm_test1
String log = base.getQueryLog(handle) ;
System.out.println("LOG : " + log);
while(base.getQueryHandleStatus(handle) == TOperationState.RUNNING_STATE){
Thread.sleep(5000);
System.out.println("LOG : " + base.getQueryLog(handle) );
}
System.out.println(base.getColumns(handle));
List<Object> listRets = base.getResults(handle);
base.toResults(listRets) ;
for (Object obj: listRets ){
System.out.println(obj);
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cai</groupId>
<artifactId>HiveLog</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>HiveLog</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hive.version>1.1.0-cdh5.12.1</hive.version><!-- 0.12.0-cdh5.0.1 -->
</properties>
<dependencies>
<!-- hive -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-ant</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-beeline</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-contrib</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hwi</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.shims</groupId>
<artifactId>hive-shims</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.shims</groupId>
<artifactId>hive-shims-0.23</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.shims</groupId>
<artifactId>hive-shims-common</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
</project>