CAT的Server消费消息(一)
1. 初始化RealtimeConsumer实时消费,使用容器自带的初始化前置处理Initializable,初始化PeriodManager,时间间隔为一个小时,也就是以小时为维度进行统计数据,并且设置统计类和消息分析器。
public void initialize() throws InitializationException {
m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
m_periodManager.init();
Threads.forGroup("cat").start(m_periodManager);
}
设置区间管理器相邻维度的相容时间为三分钟,也就是精度容许时间,在区间策略中保存区间长度和相应的容错长度。
public PeriodManager(long duration, MessageAnalyzerManager analyzerManager,
ServerStatisticManager serverStateManager, Logger logger) {
m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
m_active = true;
m_analyzerManager = analyzerManager;
m_serverStateManager = serverStateManager;
m_logger = logger;
}
执行PeriodManager#init,获取当前时间段的开始时间,第一次执行肯定是当前时间段的开始时间。第二次执行并且在同时段的57分钟内,则会设置m_lastEndTime为第一次的开始时间,并且返回0,代表本时段没有结束。第三次执行并且在同时段的第58分钟,这时候就取得是下一个小时的开始时间,设置m_lastStartTime也一样。以后当同时段的前57分钟执行的时候都返回的是0,也就意味着是同时段,不需要开始下一时段,其他的情况和上面类似循环执行。
public void init() {
long startTime = m_strategy.next(System.currentTimeMillis());
startPeriod(startTime);
}
public long next(long now) {
long startTime = now - now % m_duration;
// for current period
if (startTime > m_lastStartTime) {
m_lastStartTime = startTime;
return startTime;
}
// prepare next period ahead
if (now - m_lastStartTime >= m_duration - m_aheadTime) {
m_lastStartTime = startTime + m_duration;
return startTime + m_duration;
}
// last period is over
if (now - m_lastEndTime >= m_duration + m_extraTime) {
long lastEndTime = m_lastEndTime;
m_lastEndTime = startTime;
return -lastEndTime;
}
return 0;
}
开始一个时段周期,获取该时段的结束时间,初始化周期Period,设置开始结束时间,消息分析器,统计类以及日志属性。
private void startPeriod(long startTime) {
long endTime = startTime + m_strategy.getDuration();
Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
m_periods.add(period);
period.start();
}
创建处理各个消息类型的线程任务PeriodTask,包含各个类型的消息分析器,消息队列以及时段的开始时间。
public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager,
ServerStatisticManager serverStateManager, Logger logger) {
m_startTime = startTime;
m_endTime = endTime;
m_analyzerManager = analyzerManager;
m_serverStateManager = serverStateManager;
m_logger = logger;
List<String> names = m_analyzerManager.getAnalyzerNames();
m_tasks = new HashMap<String, List<PeriodTask>>();
for (String name : names) {
List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime);
for (MessageAnalyzer analyzer : messageAnalyzers) {
MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE);
PeriodTask task = new PeriodTask(analyzer, queue, startTime);
task.enableLogging(m_logger);
List<PeriodTask> analyzerTasks = m_tasks.get(name);
if (analyzerTasks == null) {
analyzerTasks = new ArrayList<PeriodTask>();
m_tasks.put(name, analyzerTasks);
}
analyzerTasks.add(task);
}
}
}
2. 初始化DefaultMessageAnalyzerManager,使用容器扩展的初始化函数initialize,从容器中查找所有实现了MessageAnalyzer接口的实现类,根据每个类的ID放置到map中,经过一些筛选最后保存相应的分析器名。
public void initialize() throws InitializationException {
Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class);
for (MessageAnalyzer analyzer : map.values()) {
analyzer.destroy();
}
m_analyzerNames = new ArrayList<String>(map.keySet());
Collections.sort(m_analyzerNames, new Comparator<String>() {
@Override
public int compare(String str1, String str2) {
String state = "state";
String top = "top";
if (state.equals(str1)) {
return 1;
} else if (state.equals(str2)) {
return -1;
}
if (top.equals(str1)) {
return -1;
} else if (top.equals(str2)) {
return 1;
}
return str1.compareTo(str2);
}
});
m_analyzerNames.remove("matrix");
m_analyzerNames.remove("dependency");
}
在DefaultMessageAnalyzerManager#getAnalyzer中,首先删除当前时段往前第二个时段的分析器等数据,private Map<Long, Map<String, List<MessageAnalyzer>>> m_analyzers = new HashMap<Long, Map<String, List<MessageAnalyzer>>>();最外层是时段,然后是分析器名,最后是分析器集合,查找集合中所有符合名字条件的分析器,然后进行初始化,保存开始时间,时段区间值,以及容许时间,最后把分析器放入集合map中保存起来。
public void initialize(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
m_duration = duration;
loadReports();
}
3. 先用TransactionAnalyzer举例说明,初始化报告管理类DefaultReportManager,预加载当前时段的分析报告
protected void loadReports() {
m_reportManager.loadHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
}
这里的m_name在加载components-cat-consumer.xml文件中指定transaction,
public Map<String, T> loadHourlyReports(long startTime, StoragePolicy policy, int index) {
Transaction t = Cat.newTransaction("Restore", m_name);
Map<String, T> reports = m_reports.get(startTime);
Cat.logEvent("Restore", m_name + ":" + index);
ReportBucket bucket = null;
if (reports == null) {
reports = new ConcurrentHashMap<String, T>();
m_reports.put(startTime, reports);
}
try {
bucket = m_bucketManager.getReportBucket(startTime, m_name, index);
for (String id : bucket.getIds()) {
String xml = bucket.findById(id);
T report = m_reportDelegate.parseXml(xml);
reports.put(id, report);
}
m_reportDelegate.afterLoad(reports);
t.setStatus(Message.SUCCESS);
} catch (Throwable e) {
t.setStatus(e);
Cat.logError(e);
m_logger.error(String.format("Error when loading %s reports of %s!", m_name, new Date(startTime)), e);
} finally {
t.complete();
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
return reports;
}
初始化报告块管理器DefaultReportBucketManager,获取hdfs的存储路径"target/bucket"
public void initialize() throws InitializationException {
m_reportBaseDir = m_configManager.getHdfsLocalBaseDir("report");
}
public ReportBucket getReportBucket(long timestamp, String name, int index) throws IOException {
Date date = new Date(timestamp);
ReportBucket bucket = lookup(ReportBucket.class);
bucket.initialize(name, date, index);
return bucket;
}
获取hdfs根目录,创建本时段的文件,存储路径"{0,date,yyyyMMdd}/{0,date,HH}/{1}/report-{2}"对应timestamp, index, name,在内存中保存索引文件里面的数据。索引文件里面保存的主要是数据文件里面的下标,根据下标的找出消息的长度,再通过长度找出后面相应的数据。
public void initialize(String name, Date timestamp, int index) throws IOException {
m_baseDir = m_configManager.getHdfsLocalBaseDir("report");
m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock();
String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index);
File dataFile = new File(m_baseDir, logicalPath);
File indexFile = new File(m_baseDir, logicalPath + ".idx");
if (indexFile.exists()) {
loadIndexes(indexFile);
}
final File dir = dataFile.getParentFile();
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException(String.format("Fail to create directory(%s)!", dir));
}
m_logicalPath = logicalPath;
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
m_writeDataFileLength = dataFile.length();
m_readDataFile = new RandomAccessFile(dataFile, "r");
}
从报告块中找出对应的报告数据,并且解析成TransactionReport,最后保存在内存缓存中。调用加载的后置处理。
public String findById(String id) throws IOException {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
m_readLock.lock();
try {
m_readDataFile.seek(offset);
int num = Integer.parseInt(m_readDataFile.readLine());
byte[] bytes = new byte[num];
m_readDataFile.readFully(bytes);
return new String(bytes, "utf-8");
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e);
} finally {
m_readLock.unlock();
}
}
return null;
}
这个过程就是先从hdfs中加载当前时段的相应数据报告,放入内存中实时查询,最后销毁该文件m_bucketManager.closeBucket(bucket);释放LocalReportBucket类中保存的文件以及清空对应的内存数据。以上基本就是方法DefaultMessageAnalyzerManager#getAnalyzer的全部内容。
4. 每个时段里面包含不同消息类型分析器的不同子任务,也就是Map<String, List<PeriodTask>> m_tasks,最后把该时段加进时段管理器的集合m_periods中,执行该时段的start方法。给每个子任务设置下标并且放进守护线程池组"Cat-RealtimeConsumer"
public void start() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(),
df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
List<PeriodTask> taskList = tasks.getValue();
for (int i = 0; i < taskList.size(); i++) {
PeriodTask task = taskList.get(i);
task.setIndex(i);
Threads.forGroup("Cat-RealtimeConsumer").start(task);
}
}
}
执行PeriodManager守护线程。Threads.forGroup("cat").start(m_periodManager);