欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用JDBC执行impala SQL出现的问题

程序员文章站 2024-01-19 10:27:40
...
   impala版本:1.1.1
   hive版本:0.10

   最近在使用JDBC执行impala sql的时候遇到一个问题,使用JDBC执行insert overwrite/into table...select...语句的时候,执行的结果显示是成功,但是查看表中的数据的时候,发现数据并没有插入到目标表中。通过查看http://impala-node-hostname:25000/queries 发现刚才执行的SQL的状态是Exception.说明确实执行失败。

   出现这种情况的原因是由于hive驱动的bug造成的,因为sessions在执行的时候,impala将取消正在运行的查询。现在hive 0.10以及 0.11的驱动都存在这样的问题,估计能在0.12的版本中解决这个问题。

   解决方案: 我们到/hive/src/jdbc/src/java/org/apache/hive/jdbc这个目录下找到HiveStatement.java这个文件,在这个类中有个execute方法,我们提交的SQL就是通过这个方法来执行的,它的代码如下:
  public boolean execute(String sql) throws SQLException {
    if (isClosed) {
      throw new SQLException("Can't execute after statement has been closed");
    }

    try {
      closeClientOperation();
      TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
      execReq.setConfOverlay(sessConf);
      TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
      if (execResp.getStatus().getStatusCode().equals(TStatusCode.STILL_EXECUTING_STATUS)) {
        warningChain = Utils.addWarning(warningChain, new SQLWarning("Query execuing asynchronously"));
      } else {
        Utils.verifySuccessWithInfo(execResp.getStatus());
      }
      stmtHandle = execResp.getOperationHandle();
    } catch (SQLException eS) {
      throw eS;
    } catch (Exception ex) {
      throw new SQLException(ex.toString(), "08S01", ex);
    }

    if (!stmtHandle.isHasResultSet()) {
      return false;
    }
    resultSet =  new HiveQueryResultSet.Builder().setClient(client).setSessionHandle(sessHandle)
        .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
        .setScrollable(isScrollableResultset)
        .build();
    return true;
  }


修改上述代码的
    if (!stmtHandle.isHasResultSet()) {
      return false;
    }

部分,修改后的代码如下:
  public boolean execute(String sql) throws SQLException {
    if (isClosed) {
      throw new SQLException("Can't execute after statement has been closed");
    }

    try {
      closeClientOperation();
      TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
      execReq.setConfOverlay(sessConf);
      TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
      if (execResp.getStatus().getStatusCode().equals(TStatusCode.STILL_EXECUTING_STATUS)) {
        warningChain = Utils.addWarning(warningChain, new SQLWarning("Query execuing asynchronously"));
      } else {
        Utils.verifySuccessWithInfo(execResp.getStatus());
      }
      stmtHandle = execResp.getOperationHandle();
    } catch (SQLException eS) {
      throw eS;
    } catch (Exception ex) {
      throw new SQLException(ex.toString(), "08S01", ex);
    }

    if (!stmtHandle.isHasResultSet()) {
       // Poll until the query has completed one way or another. DML queries will not return a result
       // set, but we should not return from this method until the query has completed to avoid
       // racing with possible subsequent session shutdown, or queries that depend on the results
       // materialised here.
       TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
       boolean requestComplete = false;
       while (!requestComplete) {
       try {
       TGetOperationStatusResp statusResp = client.GetOperationStatus(statusReq);
       Utils.verifySuccessWithInfo(statusResp.getStatus());
       if (statusResp.isSetOperationState()) {
       switch (statusResp.getOperationState()) {
       case CLOSED_STATE:
       case FINISHED_STATE:
         return false;
       case CANCELED_STATE:
       // 01000 -> warning
       throw new SQLException("Query was cancelled", "01000");
       case ERROR_STATE:
       // HY000 -> general error
       throw new SQLException("Query failed", "HY000");
       case UKNOWN_STATE:
       throw new SQLException("Unknown query", "HY000");
       case INITIALIZED_STATE:
       case RUNNING_STATE:
        break;
         }
        }
       } catch (Exception ex) {
         throw new SQLException(ex.toString(), "08S01", ex);
       }
       try {
          Thread.sleep(100);
       } catch (InterruptedException ex) {
          // Ignore
         }
       }
      return false;
    }
    resultSet =  new HiveQueryResultSet.Builder().setClient(client).setSessionHandle(sessHandle)
        .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
        .setScrollable(isScrollableResultset)
        .build();
    return true;
  }

通过上面的修改,会一直等待查询结束。
然后,我们使用ant把hive重新编译一遍,替换掉其中的驱动包。

期望hive能在0.12版本中解决这个问题。
相关标签: impala hive