记录一次稽核数据的过程
程序员文章站
2022-07-13 08:54:11
...
背景介绍:因工作原因,需要核对内容中心的ring数据和本系统ring数据的差异;即对比双方ring_id的差异
概要说明:
1:内容中心的ring数据:提供一份文件,里面每行即ring_id,通过程序读出来,存到map中,约190万。
2:本系统全量的ring_id通过游标循环读出来,存入map中,约230万。
3:循环比较两个map的数据差异,将差异数据写入txt文件
将对象存在map中,存的时候更快,不需要按顺序存
160万数据读取到map中耗时1-5分钟,200数据通过游标循环读出来耗时10分钟以内
详细过程:
1:读取提供的文件
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Month;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;
/**
* 内容中心ringID缓存工具类
*/
@Slf4j
public class RelationCacheUtils {
// 线程安全 {ring_id:null} 初始化值:200万,避免不断扩容占用资源
private static HashMap<String, Object> contentRingCaches = new HashMap<>(2000000);
// 文件路径
private static String linuxFilePathFormat = "/home/user/zmq/ringid.txt";
private static String windowsFilePathFormat = "C:/Users/zhan/Desktop/aa/aaaa/ringid.txt";
private static String getFilePath() {
String filePathFormat = System.getProperty("os.name").contains("Windows") ? windowsFilePathFormat : linuxFilePathFormat;
return filePathFormat;
}
/**
* 重新加载所有上传的ring_id数据
*
* @param
*/
public static void reloadCaches(){
clearCaches();
doLoadCaches();
}
/**
* 清空缓存, 释放内存空间
* 供后续优化扩展
*/
public static synchronized void clearCaches() {
contentRingCaches.clear();
}
/**
* 是否需要生成差异文件
*
* @param ringId
* @return 缓存命中则需要,否则不需要
*/
public static boolean needGenerateBill(String ringId) {
return contentRingCaches.containsKey(ringId);
}
/** 返回缓存后的map */
public static HashMap<String, Object> getDate() {
return contentRingCaches;
}
/**
* 使用文件迭代器缓存ring_id数据
* @param , 目前有3、4、5、6月
*/
private static synchronized void doLoadCaches() {
String filePath = getFilePath();
log.info("加载缓存文件:{}...", filePath);
File srcFile = new File(filePath);
if (!srcFile.exists()) {
log.error("文件不存在:{}", filePath);
return;
}
long start = System.currentTimeMillis();
// 打印行数
printLines(srcFile);
// 文件迭代器
LineIterator lineIterator;
try {
lineIterator = FileUtils.lineIterator(srcFile);
} catch (Exception e) {
log.error("获取迭代器失败:{}", e.getMessage());
return;
}
while (lineIterator.hasNext()) {
String ringId = StringUtils.trimToEmpty(lineIterator.next());
if (StringUtils.isNotBlank(ringId)) {
contentRingCaches.put(ringId, null);
}
}
long end = System.currentTimeMillis();
long cost = (end - start);
log.info("缓存行数{}条, 耗时{}ms", contentRingCaches.size(), cost);
}
private static void printLines(File srcFile) {
String fileName = StringUtils.substringBeforeLast(srcFile.getName(), ".");
// 总行数
long totalLines = 0;
try {
totalLines = Files.lines(Paths.get(srcFile.getPath())).count();
} catch (IOException ex) {
log.error("文件行数读取失败:{}", ex.getMessage());
}
log.info("当前文件:{}, 共有{}行数据", fileName, totalLines);
}
}
2:获取本系统的全量数据:
<select id="getTableCursor" fetchSize="1000" resultType="java.lang.String" resultSetType="FORWARD_ONLY">
select RING_ID from vrbt_ring
</select>
resultSetType=“FORWARD_ONLY” 详解:https://zhuanlan.zhihu.com/p/260336151
mapper文件方法:
Cursor<String> getTableCursor(); //查询的可以是对象也可以是string
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Iterator;
@Slf4j
@Service
public class AllDBhandle {
@Autowired
SqlSessionFactory mybatisSessionFactory;
@Autowired
private VrbtRingService ringService;
static String statement; // 获取游标的方法路径
private static HashMap<String, Object> localRingCaches = new HashMap<>(3000000);
static final int BATCH_LOG_SIZE = 1000; // 每缓存1000个数据则清空缓存
@PostConstruct
public void init(){
this.statement = getMethodStatement();
}
/**
* 重新加载数据
*
* @param
*/
public void reloadCaches(){
clearCaches();
getLocalDb();
}
/**
* 清空缓存, 释放内存空间
* 供后续优化扩展
*/
public static synchronized void clearCaches() {
localRingCaches.clear();
}
/**
* 是否需要生成差异文件
*
* @param ringId
* @return 缓存命中则需要,否则不需要
*/
public static boolean needGenerateBill(String ringId) {
return localRingCaches.containsKey(ringId);
}
public static HashMap<String, Object> getDate() {
return localRingCaches;
}
/**
* 获取方法路径ID
* @return
*/
private String getMethodStatement() {
String statementFormat = "cn.womusic.vrbt.db.dao.mapper.VrbtRingMapper.getTableCursor";
return statementFormat;
}
public void getLocalDb(){
long start = System.currentTimeMillis();
try (
SqlSession session = mybatisSessionFactory.openSession(); // ThreadLocal<SqlSession>
Cursor<String> oracleTableCursor = session.selectCursor(statement)
) {
long currentTime = System.currentTimeMillis();
log.info("获取数据表游标耗时: {}ms", (currentTime - start));
int sum = 0;
Iterator<String> iterator = oracleTableCursor.iterator();
// 遍历游标
while (iterator.hasNext()) {
localRingCaches.put(iterator.next(),null);
if (sum > 0 && sum % BATCH_LOG_SIZE == 0) {
session.clearCache(); // 及时移除缓存对象, 避免内存泄漏
}
sum++;
}
log.info("执行结束, 总数:{}条, 总耗时:{}ms", sum, (System.currentTimeMillis() - currentTime));
} catch (Exception e) {
log.error("执行异常:{}", e.getMessage(), e);
}
}
}
3:实际对比数据:
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MybatisDbService {
@Autowired
private AllDBhandle dBhandle;
public void checkDB() throws Exception {
log.info("进入数据服务类。。。");
/** 差异文件 路径 */
File file1 = new File("/home/user/zmq/error1.txt");
File file2 = new File("/home/user/zmq/error2.txt");
RelationCacheUtils.reloadCaches(); /** 读取导入文件 */
dBhandle.reloadCaches(); /** 读取全部的本地库数据 */
/** 获取导入的文件数据 */
HashMap<String, Object> contentRingCaches = RelationCacheUtils.getDate();
/** 获取本地的数据库数据 */
HashMap<String, Object> localRingCaches = dBhandle.getDate();
/** 循环map比对数据 */
int num = 0;
for (Map.Entry<String, Object> entry : localRingCaches.entrySet()) {
String key = entry.getKey();
Boolean isClude = RelationCacheUtils.needGenerateBill(key);
if(!isClude){
key=key+"\r\n"; /** 获取的值后面加换行的转义,避免数据没有换行 */
FileUtils.write(file1, key, "UTF-8", true);
num++;
}
}
log.info("循环管理中心铃音文件,差异文件共计:"+num);
int count = 0;
for (Map.Entry<String, Object> entry : contentRingCaches.entrySet()) {
String key = entry.getKey();
Boolean isClude = dBhandle.needGenerateBill(key);
if(!isClude){
key=key+"\r\n";
FileUtils.write(file2, key, "UTF-8", true);
count++;
}
}
log.info("循环内容中心铃音文件,差异文件共计:"+count);
}
}
4:启动服务调用:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.http.util.Asserts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication(exclude = {
TransactionAutoConfiguration.class,
RedisAutoConfiguration.class,
HibernateJpaAutoConfiguration.class,
DataSourceAutoConfiguration.class,
KafkaAutoConfiguration.class
})
@ImportResource({"classpath*:spring-context.xml"})
public class CheckDBApplication {
private static Logger logger = LoggerFactory.getLogger(CheckDBApplication.class);
public static void main(String[] args) {
try {
logger.info("开始启动Springboot...");
SpringApplication springApplication = new SpringApplication(CheckDBApplication.class);
ConfigurableApplicationContext springIOC = springApplication.run(args);
logger.info("Spring加载完成");
MybatisDbService dispacher = springIOC.getBean(MybatisDbService.class);
Asserts.notNull(dispacher, "InformixTableDispacher");
dispacher.checkDB();
} catch (Exception e) {
logger.error("异常: {}", e.getMessage(), e);
System.exit(1);
}
}
}
此处记录动态获取文件数据:
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Month;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;
/**
* 订购关系缓存工具类
*
* @author linzt
* @version 2021年06月25日
*/
@Slf4j
public class RelationCacheUtils {
// 线程安全 {phoneNumber:null}
private static HashMap<String, Object> orderedUserCaches = new HashMap<>(4000000);
// 号码文件路径
private static String linuxFilePathFormat = "/home/user/order-relation-user-file/%d月视频彩铃产品用户.csv";
private static String windowsFilePathFormat = "D:/话单上报/%d月视频彩铃产品用户.csv";
private static String getFilePath(Month month) {
String filePathFormat = System.getProperty("os.name").contains("Windows") ? windowsFilePathFormat : linuxFilePathFormat;
return String.format(filePathFormat, month.getValue());
}
/**
* 是否命中
*
* @param phoneNumber
* @return 缓存命中则需要,否则不需要
*/
public static boolean needGenerateBill(String phoneNumber) {
return orderedUserCaches.containsKey(phoneNumber);
}
/**
* 重新加载给定月份的数据
*
* @param month 指定月份
*/
public static void reloadCaches(Month month){
clearCaches();
doLoadCaches(month);
}
/**
* 清空缓存, 释放内存空间
* 供后续优化扩展
*/
public static synchronized void clearCaches() {
orderedUserCaches.clear();
}
/**
* 使用文件迭代器缓存订购用户数据
* @param month 指定月份, 目前有3、4、5、6月
*/
private static synchronized void doLoadCaches(Month month) {
Assert.notNull(month, "月份不能为空!");
String filePath = getFilePath(month);
log.info("加载缓存文件:{}...", filePath);
File srcFile = new File(filePath);
if (!srcFile.exists()) {
log.error("文件不存在:{}", filePath);
return;
}
long start = System.currentTimeMillis();
// 打印行数
printLines(srcFile);
// 文件迭代器
LineIterator lineIterator;
try {
lineIterator = FileUtils.lineIterator(srcFile);
} catch (Exception e) {
log.error("获取迭代器失败:{}", e.getMessage());
return;
}
while (lineIterator.hasNext()) {
String callNumber = StringUtils.trimToEmpty(lineIterator.next());
if (StringUtils.isNotBlank(callNumber)) {
orderedUserCaches.put(callNumber, null);
}
}
long end = System.currentTimeMillis();
long cost = (end - start);
log.info("缓存行数{}条, 耗时{}ms", orderedUserCaches.size(), cost);
}
private static void printLines(File srcFile) {
String fileName = StringUtils.substringBeforeLast(srcFile.getName(), ".");
// 总行数
long totalLines = 0;
try {
totalLines = Files.lines(Paths.get(srcFile.getPath())).count();
} catch (IOException ex) {
log.error("文件行数读取失败:{}", ex.getMessage());
}
log.info("当前文件:{}, 共有{}行数据", fileName, totalLines);
}
}
此处记录动态获取游标方法:
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.cursor.defaults.DefaultCursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.defaults.DefaultSqlSession;
import java.util.Iterator;
/**
* 同步Oracle表数据到kafka基类<br>
* 泛型M所对应的Mapper必须新增getTableCursor方法, 否则终止任务<br>
* 示例:<br>
* <select id="getTableCursor" fetchSize="500" resultMap="BaseResultMap" resultSetType="FORWARD_ONLY">
* select * from vrbt_rotate_ring
* </select>
*
* @see VrbtRotateRingMapper#getTableCursor
* @see KafkaTopics Kafka主题常量
* @see DefaultCursor mybatis游标默认实现
* @see DefaultSqlSession mybatis会话默认实现
* @author linzt
* @author 2021-05-18
*/
@Slf4j
public abstract class NoPartitionHandler<T extends GenericEntity> extends BaseJobHandler {
@Override
protected void asyncTickWork() {
// 异步处理
log.info("任务开始...");
NoPatitionTableSyncWorker<T> noPatitionTableSyncWorker = new NoPatitionTableSyncWorker<>();
Thread workerThread = new Thread(noPatitionTableSyncWorker, kafkaTopic + "-线程1");
workerThread.setDaemon(false);
workerThread.start();
registTask(kafkaTopic, workerThread);
}
/**
* Oracle表数据同步线程工人
* @param <E> Oracle实体表
*/
private class NoPatitionTableSyncWorker<E extends GenericEntity> implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
try (
SqlSession session = mybatisSessionFactory.openSession(); // ThreadLocal<SqlSession>
Cursor<E> oracleTableCursor = session.selectCursor(statement)
) {
long currentTime = System.currentTimeMillis();
log.info("获取数据表游标耗时: {}ms", (currentTime - start));
int sum = 0;
Iterator<E> iterator = oracleTableCursor.iterator();
// 遍历游标
while (iterator.hasNext()) {
kafkaService.send(kafkaTopic, getContent((T)iterator.next()));
if (sum > 0 && sum % BATCH_LOG_SIZE == 0) {
session.clearCache(); // 及时移除缓存对象, 避免内存泄漏
}
sum++;
}
log.info("执行结束, 总数:{}条, 总耗时:{}ms", sum, (System.currentTimeMillis() - currentTime));
} catch (Exception e) {
log.error("执行异常:{}", e.getMessage(), e);
} finally {
unRegistTask(kafkaTopic, Thread.currentThread());
}
}
}
/**
* 获取发送字符串,子类自行实现
* @return
*/
protected abstract String getContent(T t);
}
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.lang.reflect.ParameterizedType;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public abstract class BaseJobHandler<T extends GenericEntity> extends IJobHandler {
@Autowired
KafkaService kafkaService;
@Autowired
SqlSessionFactory mybatisSessionFactory;
String kafkaTopic;
String statement; // 获取游标的方法路径
private Class<T> entityClass;
protected Map<String, Set<Thread>> currentTaskMap = new HashMap<>();
private final ReentrantLock lock = new ReentrantLock();
static final int BATCH_LOG_SIZE = 1000; // 每缓存1000个实体则清空缓存
protected String SPLIT = "|";
protected String LINE = System.lineSeparator();
public BaseJobHandler() {
// 通过反射获取泛型类型
ParameterizedType parameterizedType =
(ParameterizedType) this.getClass().getGenericSuperclass();
this.entityClass = (Class<T>) parameterizedType.getActualTypeArguments()[0];
log.info("泛型类型:{},", entityClass.getSimpleName());
}
@PostConstruct
public void init(){
this.kafkaTopic = getKafkaTopic();
this.statement = getMethodStatement();
}
@Override
public ReturnT<String> execute(String args) {
// 参数检查
initAndCheck(args);
// 分配任务
asyncTickWork();
return buildResponse("任务已受理");
}
private void initAndCheck(String args) {
init();
try {
Assert.state(kafkaService.existTopic(kafkaTopic), "主题不存在!");
Assert.state(mybatisSessionFactory.getConfiguration().hasStatement(statement), "方法不存在!");
} catch (Exception e) {
throwEx(e.getMessage());
}
killPreTaskIfNessary(args);
}
/**
* 该主题是否有未完成的任务, 有则终止
* @param xxlJobArgs
*/
private void killPreTaskIfNessary(String xxlJobArgs) {
if (CollectionUtils.isNotEmpty(currentTaskMap.get(kafkaTopic))) {
boolean forceContinue = StringUtils.equals(xxlJobArgs, "force");
if (!forceContinue)
throwEx("当前已有任务处理");
logAll("已有任务, 强制终止线程后继续处理...");
for (Thread thread : currentTaskMap.get(kafkaTopic)) {
try {
thread.interrupt();
} catch (Exception e) {
throwEx(thread.getName() + "终止线程失败:" + e.getMessage());
}
unRegistTask(kafkaTopic, thread);
}
}
}
private void throwEx(String message){
logAll(message);
throw new IllegalStateException(message);
}
private ReturnT buildResponse(String message) {
logAll(message);
return new ReturnT(message);
}
private void logAll(String message) {
log.info(message);
XxlJobLogger.log(message);
}
/**
* 获取方法路径ID
* @return
*/
private String getMethodStatement() {
String statementFormat = "cn.womusic.vrbt.db.dao.mapper.%sMapper.getTableCursor";
return String.format(statementFormat, entityClass.getSimpleName());
}
protected void registTask(String kafkaTopic, Thread thread) {
lock.lock();
try {
Set<Thread> topicThreads = currentTaskMap.getOrDefault(kafkaTopic, new HashSet<>());
log.info("{}注册工人:{}", kafkaTopic, thread.getName());
topicThreads.add(thread);
currentTaskMap.put(kafkaTopic, topicThreads);
} finally {
lock.unlock();
}
}
protected void unRegistTask(String kafkaTopic, Thread thread){
Set<Thread> topicThreads = currentTaskMap.get(kafkaTopic);
if (CollectionUtils.isEmpty(topicThreads)) {
return;
}
log.info("{}完成了", thread.getName());
if (topicThreads.size() == 1) {
lock.lock();
try {
topicThreads.clear();
currentTaskMap.remove(kafkaTopic);
return;
} finally {
lock.unlock();
}
}
if (thread == null) {
thread = Thread.currentThread();
}
lock.lock();
try {
topicThreads.remove(thread);
currentTaskMap.put(kafkaTopic, topicThreads);
} finally {
lock.unlock();
}
}
@PreDestroy
public void preDestroy(){
currentTaskMap.clear();
}
/**
* 获取Kafka主题
* 务必提前创建好该主题
*
* @return
*/
protected abstract String getKafkaTopic();
/**
* 异步拆解工作, 子类自行实现
*
*/
protected abstract void asyncTickWork();
protected String filterDate(Date date){
return date == null ? "" : DateTimeUtils.formatBasic(date);
}
protected String defalutString(String str){
return StringUtils.defaultString(str,"");
}
}
import com.xxl.job.core.handler.annotation.JobHandler;
import org.springframework.stereotype.Component;
/**
* 同将对象转换为string
* 302万 cost48秒
*
* @author linzt
* @date 2021-05-18
*/
@Component
@JobHandler(value = "rotateHandler")
public class RotateHandler extends NoPartitionHandler<VrbtRotate> {
@Override
public String getKafkaTopic() {
return KafkaTopics.VRBT_ROTATE;
}
@Override
protected String getContent(VrbtRotate rotate) {
return new StringBuilder()
.append(rotate.getRotateId()).append(SPLIT)
.append(rotate.getPhoneNumber()).append(SPLIT)
.append(rotate.getRingTotal()).append(SPLIT)
.append(rotate.getRingMax()).append(SPLIT)
.append(filterDate(rotate.getCreateDate())).append(SPLIT)
.append(filterDate(rotate.getUpdateDate())).append(SPLIT)
.append(rotate.getRotateName()).append(SPLIT)
.append(rotate.getIsVideo()).append(SPLIT)
.append(defalutString(rotate.getChannelId())).append(SPLIT)
.toString();
}
}
上一篇: 记一次vue项目打包发布过程
下一篇: post