基于hadoop、hive的离线数据分析问题集锦
1、小文件合并。nginx+flume+hdfs的架构采集、存储日志,但是flume采集的日志最终会有很多的小文件存储到hdfs,令人难受的是hdfs并不适用于处理大量小文件,但是好在hadoop的mapreduce提供了可以批量合并小文件的方式,这里直接上干货代码:
在这里插入代码片
package baobei.data.etl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* flume会产生多个文件到hdfs,这里设置将小文件聚合为一个大文件,
* 方便后面的数据处理
*
*/
public class SmallFileCombiner {
static class SmallFileCombinerMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
NullWritable v = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//通过这种方式相当于是直接把值打印到磁盘文件中。value其实就是每一样的的文件内容
context.write(value, v);
}
}
/**
* 如果生产环境中,小文件的数量太多,那么累计起来的数量也是很庞大的,那么这时候就要设置切片的大小了。
*
* 即使用:CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150);
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SmallFileCombiner.class);
job.setMapperClass(SmallFileCombinerMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//下面的方式将小文件划分为一个切片。
job.setInputFormatClass(CombineTextInputFormat.class);
//如果小文件的总和为224M,将setMaxInputSplitSize中的第二个参数设置成300M的时候,在
//hdfs://master:9000/output下只会生成一个part-m-00000这种文件
//如果将setMaxInputSplitSize中的第二个参数设置成150M的时候,在
//hdfs://master:9000/output下会生成part-m-00000 和 part-m-00001 两个文件
CombineTextInputFormat.setMaxInputSplitSize(job, 1024*1024*150);
CombineTextInputFormat.setInputPaths(job, new Path("hdfs://maoyunchao:9000/in/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://maoyunchao:9000/output/"));
// job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
2、集群内存不足。进行集群扩容,增加Datanode。
3、在写shell脚本的时候最好在脚本的前面加上source/etc/profile,这样就会去识别环境变量,否则在使用crontab定时执行脚本中的hive -f、hive -e等指令是会报错没有该指令。
4、在终端使用hive -e ''后面跟的是单引号,但是在shell脚本中hive -e ""后面的语句需要使用双引号。
5、在sqoop导出hive数据到mysql时报错check the manual that corresponds to your MySQL server version for the right syntax to use near。经过分析查询发现是mysql jar报的版本问题,刚开始我使用的是5.1.6版本,mysql jar包最好使用5.1.32版本,比较稳定,没有Bug。
6、查询速度过慢。12台机器(16G+8核),14个G,2000万左右的数据量,刚开始业务全部跑完大概需要1个40分钟左右,最后经过各种调优 ,像:Jvm重用、job并行执行、推测执行的关闭、reduce数目的设置、数据倾斜加盐优化(双重group随机后缀操作)、join优化(reduce join转化为map join、SMB join),最后执行时间1小时左右即可完成。其中执行速度最慢的就是外链分析的数据倾斜了,大概需要15分钟左右,优化后大概10分钟左右。
7、在写hql语句时在表前面加上数据库名eg:select * from baobei.pv的方式,否则会去默认的default库下寻找表。
8、把一些udf及一些配置写到$HIVE_HOME/bin下的.hiverc文件中,在每次执行hive语句时都会默认去执行。
9、调试的时候有时候会用到kill一些僵尸任务,杀掉mapreduce任务时可以使用 hadoop job -kill jobID、yarn application -kill ApplicationId的方式。
10、使用orcFile、parquet等的储存方式时直接建表然后导入数据,这样在查询时是会报错的,必须从另一张表上去查询出某些数据插入。eg:insert overwrite table xxx select …不能直接使用load data inpath …。
上一篇: 基于docker的服务器端使用jupyter notebook的方法
下一篇: 离线宝的调用方法