欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

CarbonData跨库查询异常BUG分析与解决

程序员文章站 2022-03-23 19:58:56
0x0 背景 同事最近发现CarbonData跨库多表联查时会报异常: JAVA代码如下: carbon.sql("select * from event.student...

0x0 背景

同事最近发现CarbonData跨库多表联查时会报异常:

JAVA代码如下:

carbon.sql("select * from event.student as s left join test.user as u on u.name=s.name").show();

异常如下:

java.io.FileNotFoundException: File does not exist: /opt\test\user
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.getContentSummary(FSDirectory.java:2404)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getContentSummary(FSNamesystem.java:4575)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getContentSummary(NameNodeRpcServer.java:1087)
    at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getContentSummary(AuthorizationProviderProxyClientProtocol.java:563)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getContentSummary(ClientNamenodeProtocolServerSideTranslatorPB.java:873)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)

可以看到,主要是路径的分割符有问题,导致job在HDFS上找不到表,从而报错。

File does not exist: /opt\test\user

0x1 分析

查看异常栈

at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.getContentSummary(DFSClient.java:2778)
    at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:656)
    at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:652)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getContentSummary(DistributedFileSystem.java:652)
    at org.apache.carbondata.core.datastore.impl.FileFactory.getDirectorySize(FileFactory.java:534)
    at org.apache.spark.sql.hive.CarbonRelation.sizeInBytes(CarbonRelation.scala:207)
    at org.apache.spark.sql.CarbonDatasourceHadoopRelation.sizeInBytes(CarbonDatasourceHadoopRelation.scala:90)
    at org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$statistics$2.apply(LogicalRelation.scala:77)
	at org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$statistics$2.apply(LogicalRelation.scala:77)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.LogicalRelation.statistics$lzycompute(LogicalRelation.scala:76)
    at org.apache.spark.sql.execution.datasources.LogicalRelation.statistics(LogicalRelation.scala:75)
    at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.statistics(LogicalPlan.scala:319)
    at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.canBroadcast(SparkStrategies.scala:117)
    at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:159)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)

可以找到这一句:

    at org.apache.carbondata.core.datastore.impl.FileFactory.getDirectorySize(FileFactory.java:534)
    at org.apache.spark.sql.hive.CarbonRelation.sizeInBytes(CarbonRelation.scala:207)

可见这里开始,Carbondata获取该路径的大小。进去org.apache.carbondata.core.datastore.impl.FileFactory.getDirectorySize看下代码:

public static long getDirectorySize(String filePath) throws IOException {
        FileFactory.FileType fileType = getFileType(filePath);
        switch(fileType) {
        case LOCAL:
        default:
            filePath = getUpdatedFilePath(filePath, fileType);
            File file = new File(filePath);
            return FileUtils.sizeOfDirectory(file);
        case HDFS:
        case ALLUXIO:
        case VIEWFS:
            Path path = new Path(filePath);
            FileSystem fs = path.getFileSystem(configuration);
            return fs.getContentSummary(path).getLength();
        }
    }

这里传入了filePath,显然就是上文中的:/opt\test\user
那么,从哪里传入的呢?继续进入org.apache.spark.sql.hive.CarbonRelation.sizeInBytes

 def sizeInBytes: Long = {
    val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
      tableMeta.carbonTable.getAbsoluteTableIdentifier)

    if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
      val tablePath = CarbonStorePath.getCarbonTablePath(
        tableMeta.storePath,
        tableMeta.carbonTableIdentifier).getPath
      val fileType = FileFactory.getFileType(tablePath)
      if(FileFactory.isFileExist(tablePath, fileType)) {
        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
      }
    }
    sizeInBytesLocalValue
  }

可以看到,这里传入了参数tablePath,而该参数来自于下面这一句:

val tablePath = CarbonStorePath.getCarbonTablePath(
        tableMeta.storePath,
        tableMeta.carbonTableIdentifier).getPath

继续跟进看看:

public static CarbonTablePath getCarbonTablePath(String storePath, CarbonTableIdentifier tableIdentifier) {
        return new CarbonTablePath(tableIdentifier, storePath + File.separator + tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName());
    }

果然这里有问题!
storePath + File.separator + tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName()
这里的分隔符用的是File.separator,在windows下默认为\,而在Linux下则为/。
这就导致代码在windows下运行时,会将SQL语句解析后的路径解析成windows下的分割符,当把任务提交到spark后,linux下发现路径无法识别,从而导致错误!