CarbonData跨库查询异常BUG分析与解决
程序员文章站
2022-03-23 19:58:56
0x0 背景
同事最近发现CarbonData跨库多表联查时会报异常:
JAVA代码如下:
carbon.sql("select * from event.student...
0x0 背景
同事最近发现CarbonData跨库多表联查时会报异常:
JAVA代码如下:
carbon.sql("select * from event.student as s left join test.user as u on u.name=s.name").show();
异常如下:
java.io.FileNotFoundException: File does not exist: /opt\test\user at org.apache.hadoop.hdfs.server.namenode.FSDirectory.getContentSummary(FSDirectory.java:2404) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getContentSummary(FSNamesystem.java:4575) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getContentSummary(NameNodeRpcServer.java:1087) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getContentSummary(AuthorizationProviderProxyClientProtocol.java:563) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getContentSummary(ClientNamenodeProtocolServerSideTranslatorPB.java:873) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
可以看到,主要是路径的分割符有问题,导致job在HDFS上找不到表,从而报错。
File does not exist: /opt\test\user
0x1 分析
查看异常栈
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.getContentSummary(DFSClient.java:2778) at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:656) at org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:652) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getContentSummary(DistributedFileSystem.java:652) at org.apache.carbondata.core.datastore.impl.FileFactory.getDirectorySize(FileFactory.java:534) at org.apache.spark.sql.hive.CarbonRelation.sizeInBytes(CarbonRelation.scala:207) at org.apache.spark.sql.CarbonDatasourceHadoopRelation.sizeInBytes(CarbonDatasourceHadoopRelation.scala:90) at org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$statistics$2.apply(LogicalRelation.scala:77) at org.apache.spark.sql.execution.datasources.LogicalRelation$$anonfun$statistics$2.apply(LogicalRelation.scala:77) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.datasources.LogicalRelation.statistics$lzycompute(LogicalRelation.scala:76) at org.apache.spark.sql.execution.datasources.LogicalRelation.statistics(LogicalRelation.scala:75) at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.statistics(LogicalPlan.scala:319) at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.canBroadcast(SparkStrategies.scala:117) at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:159) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
可以找到这一句:
at org.apache.carbondata.core.datastore.impl.FileFactory.getDirectorySize(FileFactory.java:534) at org.apache.spark.sql.hive.CarbonRelation.sizeInBytes(CarbonRelation.scala:207)
可见这里开始,Carbondata获取该路径的大小。进去org.apache.carbondata.core.datastore.impl.FileFactory.getDirectorySize看下代码:
public static long getDirectorySize(String filePath) throws IOException { FileFactory.FileType fileType = getFileType(filePath); switch(fileType) { case LOCAL: default: filePath = getUpdatedFilePath(filePath, fileType); File file = new File(filePath); return FileUtils.sizeOfDirectory(file); case HDFS: case ALLUXIO: case VIEWFS: Path path = new Path(filePath); FileSystem fs = path.getFileSystem(configuration); return fs.getContentSummary(path).getLength(); } }
这里传入了filePath,显然就是上文中的:/opt\test\user
那么,从哪里传入的呢?继续进入org.apache.spark.sql.hive.CarbonRelation.sizeInBytes
def sizeInBytes: Long = { val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime( tableMeta.carbonTable.getAbsoluteTableIdentifier) if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { val tablePath = CarbonStorePath.getCarbonTablePath( tableMeta.storePath, tableMeta.carbonTableIdentifier).getPath val fileType = FileFactory.getFileType(tablePath) if(FileFactory.isFileExist(tablePath, fileType)) { tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath) } } sizeInBytesLocalValue }
可以看到,这里传入了参数tablePath,而该参数来自于下面这一句:
val tablePath = CarbonStorePath.getCarbonTablePath( tableMeta.storePath, tableMeta.carbonTableIdentifier).getPath
继续跟进看看:
public static CarbonTablePath getCarbonTablePath(String storePath, CarbonTableIdentifier tableIdentifier) { return new CarbonTablePath(tableIdentifier, storePath + File.separator + tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName()); }
果然这里有问题!
storePath + File.separator + tableIdentifier.getDatabaseName() + File.separator + tableIdentifier.getTableName()
这里的分隔符用的是File.separator,在windows下默认为\,而在Linux下则为/。
这就导致代码在windows下运行时,会将SQL语句解析后的路径解析成windows下的分割符,当把任务提交到spark后,linux下发现路径无法识别,从而导致错误!