欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

项目整理-大数据分析项目

程序员文章站 2022-07-06 09:43:32
...

有大半年时间在做大数据分析,主要产品为OI。

OI 用到了flumesparkgraphite。学习了大数据的开发。

优点:flumespark源代码的学习,performance调优

 

OI

项目的框架:用flume做数据收集,spark做数据分析,graphite做数据显示,seyren做告警系统

项目的难点:sparkflume的框架的学习,performance的调优

遇到的问题:

1.这个问题整个小组研究了大半个月左右,一开始以为是scala list代码的问题,scala的某一版本中list的序列化是会按照列表成员的个数做递归调用的。后来发现用的不是那个版本。后来定位到是提交taskderiver 调用JavaSerialize反序列化RDD时抛出的问题,就在反序列化之前将byte数组保存到文件中。并调用代码发现是spark两次调用了UpdateStat 方法,导致了RDD的依赖没有被切断

2.经常抛出OutOfMemory问题,发现是spark太消耗内存了,spark专门有一章是说memory tuning的,对照着修改了一些参数解决了这个问题

 

3.死锁的问题,解决方法是设置wait的时间,或者用block queue

private void dequeueEvents(List<byte[]> events)  {
    	while ( eventQueue.size() == 0) {
			synchronized (dequeueMonitor) {
				dequeueMonitor.wait(1000);
			}
    	}
    	synchronized (eventQueue) {
    		events.addAll(eventQueue);
    		eventQueue.clear();
    	}
	synchronized (queueMonitor) {
		queueMonitor.notify();
	}
}

public void queueEvent (byte[] body) throws InterruptedException {
		while (eventQueue.size() >= maxEventQueueSize) {
			synchronized (queueMonitor) {
				queueMonitor.wait(1000);
			}
		}
		synchronized (eventQueue) {
			eventQueue.add(body);
		}
		synchronized (dequeueMonitor) {
			dequeueMonitor.notify();
		}
    }

4.Flume 经常抛出Session timeout exception,是因为当时网络不好

 

5. Spark调用hdfs接口的时候,经常报native方法找不到,需要在jrenative目录下添加hadoop相关的so文件

6. 修改whisperwrite函数,让新增的数据可以和原始数据相加,而不是直接覆盖

 

项目的优化:

1.写了专门用来监控performance相关参数的软件OI-Performance,graphite上能够很好的展示这些属性,便于观察,监控的属性有SystemCpuLoadProcessCpuLoadHeapMemoryUsageGC的时间,disk utilnetwork,以及一些业务属性

2.修改了spark中的storage level,减少了内存占用量

3. 调整了JVMnew-ratio的比例,延长变量在新生代中存活的时间,减少full GC的时间

4.减少了collector这部环节,让agent直接发送数据到flume,让压缩过的数据直接传输到spark,减少了磁盘的读写以及内存的开销

5. 减少了Avro格式转换成Seven自定义对象的环节,OI直接从generic record获取数据

6.替换flumechannel类型,让flume使用Memory Channel,减少磁盘的读写

 

基础知识补习:

1. ByteBuffer http://www.cnblogs.com/freeliver54/archive/2011/08/10/2133382.html

        sendBuffer.flip(); //limit =position,position = 0
        final int nbSentBytes = sendData(sendBuffer); //发送数据调用sendBuffer的get方法,数据的长度=limit-position
        sendBuffer.limit(sendBuffer.capacity());//重置limit为capacity
        sendBuffer.rewind();//position=0,mark = -1

 2. MogoDB的查询,查询很简单只要设置查询条件,调用相关的增删查方法就可以了

Query query = new Query();
        Criteria criteria = Criteria.where("instanceInfo").is(clientInfo.getInstanceInfo())
                .and("z7addr").is(clientInfo.getZ7addr());
        query.addCriteria(criteria);
        Update update = new Update();
        update.set("imei", clientInfo.getImei()).set("msisdn", clientInfo.getMsisdn())
                .set("clientVersion", clientInfo.getClientVersion()).set("osName", clientInfo.getOsName())
                .set("osVersion", clientInfo.getOsVersion())
                .set("appMap", clientInfo.getAppMap());
        mongoTemplate.upsert(query, update, clientInfo.getClass());

  3. JMX获取Java进程的heapmemory and Cpu Info

        MBean的介绍 http://blog.csdn.net/rudymatrix/article/details/1922868

    4.Scala语言可伸缩的语言 是一种多范式的编程语言,一种类似java的编程 ,设计初衷是要集成面向对象编程和函数式编程的各种特http://baike.baidu.com/view/1588150.htm?fr=aladdin