欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

记录一次稽核数据的过程

程序员文章站 2022-07-13 08:54:11
...

背景介绍:因工作原因,需要核对内容中心的ring数据和本系统ring数据的差异;即对比双方ring_id的差异

概要说明:
1:内容中心的ring数据:提供一份文件,里面每行即ring_id,通过程序读出来,存到map中,约190万。
2:本系统全量的ring_id通过游标循环读出来,存入map中,约230万。
3:循环比较两个map的数据差异,将差异数据写入txt文件

将对象存在map中,存的时候更快,不需要按顺序存
160万数据读取到map中耗时1-5分钟,200数据通过游标循环读出来耗时10分钟以内

详细过程:
1:读取提供的文件

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Month;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;

/**
 * 内容中心ringID缓存工具类
 */
@Slf4j
public class RelationCacheUtils {

    // 线程安全 {ring_id:null}   初始化值:200万,避免不断扩容占用资源
    private static HashMap<String, Object> contentRingCaches = new HashMap<>(2000000);    

    // 文件路径
    private static String linuxFilePathFormat = "/home/user/zmq/ringid.txt";
    private static String windowsFilePathFormat = "C:/Users/zhan/Desktop/aa/aaaa/ringid.txt";

    private static String getFilePath() {
        String filePathFormat = System.getProperty("os.name").contains("Windows") ? windowsFilePathFormat : linuxFilePathFormat;
        return filePathFormat;
    }

    /**
     * 重新加载所有上传的ring_id数据
     *
     * @param
     */
    public static void reloadCaches(){
        clearCaches();
        doLoadCaches();
    }

    /**
     * 清空缓存, 释放内存空间
     * 供后续优化扩展
     */
    public static synchronized void clearCaches() {
        contentRingCaches.clear();
    }

    /**
     * 是否需要生成差异文件
     *
     * @param ringId
     * @return 缓存命中则需要,否则不需要
     */
    public static boolean needGenerateBill(String ringId) {
        return contentRingCaches.containsKey(ringId);
    }

	/**   返回缓存后的map   */
    public static HashMap<String, Object> getDate() {
        return contentRingCaches;
    }

    /**
     * 使用文件迭代器缓存ring_id数据
     * @param  , 目前有3、4、5、6月
     */
    private static synchronized void doLoadCaches() {

        String filePath = getFilePath();
        log.info("加载缓存文件:{}...", filePath);
        File srcFile = new File(filePath);
        if (!srcFile.exists()) {
            log.error("文件不存在:{}", filePath);
            return;
        }
        long start = System.currentTimeMillis();

        // 打印行数
        printLines(srcFile);

        // 文件迭代器
        LineIterator lineIterator;

        try {
            lineIterator = FileUtils.lineIterator(srcFile);
        } catch (Exception e) {
            log.error("获取迭代器失败:{}", e.getMessage());
            return;
        }

        while (lineIterator.hasNext()) {
            String ringId = StringUtils.trimToEmpty(lineIterator.next());
            if (StringUtils.isNotBlank(ringId)) {
                contentRingCaches.put(ringId, null);
            }
        }
        long end = System.currentTimeMillis();
        long cost = (end - start);
        log.info("缓存行数{}条, 耗时{}ms", contentRingCaches.size(), cost);
    }

    private static void printLines(File srcFile) {
        String fileName = StringUtils.substringBeforeLast(srcFile.getName(), ".");

        // 总行数
        long totalLines = 0;
        try {
            totalLines = Files.lines(Paths.get(srcFile.getPath())).count();
        } catch (IOException ex) {
            log.error("文件行数读取失败:{}", ex.getMessage());
        }
        log.info("当前文件:{}, 共有{}行数据", fileName, totalLines);
    }
}


2:获取本系统的全量数据:

<select id="getTableCursor" fetchSize="1000" resultType="java.lang.String" resultSetType="FORWARD_ONLY">
    select RING_ID from vrbt_ring
  </select>

resultSetType=“FORWARD_ONLY” 详解:https://zhuanlan.zhihu.com/p/260336151

mapper文件方法:

Cursor<String> getTableCursor();     //查询的可以是对象也可以是string
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Iterator;

@Slf4j
@Service
public class AllDBhandle {

    @Autowired
    SqlSessionFactory mybatisSessionFactory;

    @Autowired
    private VrbtRingService ringService;

    static String statement; // 获取游标的方法路径

    private static HashMap<String, Object> localRingCaches = new HashMap<>(3000000);

    static final int BATCH_LOG_SIZE = 1000; // 每缓存1000个数据则清空缓存


    @PostConstruct
    public void init(){
        this.statement = getMethodStatement();
    }

    /**
     * 重新加载数据
     *
     * @param
     */
    public void reloadCaches(){
        clearCaches();
        getLocalDb();
    }

    /**
     * 清空缓存, 释放内存空间
     * 供后续优化扩展
     */
    public static synchronized void clearCaches() {
        localRingCaches.clear();
    }

    /**
     * 是否需要生成差异文件
     *
     * @param ringId
     * @return 缓存命中则需要,否则不需要
     */
    public static boolean needGenerateBill(String ringId) {
        return localRingCaches.containsKey(ringId);
    }

    public static HashMap<String, Object> getDate() {
        return localRingCaches;
    }

    /**
     * 获取方法路径ID
     * @return
     */
    private String getMethodStatement() {
        String statementFormat = "cn.womusic.vrbt.db.dao.mapper.VrbtRingMapper.getTableCursor";
        return statementFormat;
    }


    public void getLocalDb(){
        long start = System.currentTimeMillis();

        try (
                SqlSession session = mybatisSessionFactory.openSession(); // ThreadLocal<SqlSession>
                Cursor<String> oracleTableCursor = session.selectCursor(statement)
        ) {
            long currentTime = System.currentTimeMillis();
            log.info("获取数据表游标耗时: {}ms", (currentTime - start));

            int sum = 0;
            Iterator<String> iterator = oracleTableCursor.iterator();

            // 遍历游标
            while (iterator.hasNext()) {
                localRingCaches.put(iterator.next(),null);

                if (sum > 0 && sum % BATCH_LOG_SIZE == 0) {
                    session.clearCache(); // 及时移除缓存对象, 避免内存泄漏
                }
                sum++;
            }
            log.info("执行结束, 总数:{}条, 总耗时:{}ms", sum, (System.currentTimeMillis() - currentTime));

        } catch (Exception e) {
            log.error("执行异常:{}", e.getMessage(), e);

        }
    }

}

3:实际对比数据:

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MybatisDbService {

    @Autowired
    private AllDBhandle dBhandle;

    public void checkDB() throws Exception {
        log.info("进入数据服务类。。。");
        /**   差异文件 路径     */
        File file1 = new File("/home/user/zmq/error1.txt");
        File file2 = new File("/home/user/zmq/error2.txt");
        
        RelationCacheUtils.reloadCaches();      /**   读取导入文件   */
        dBhandle.reloadCaches();    /**  读取全部的本地库数据   */

        /**   获取导入的文件数据  */
        HashMap<String, Object> contentRingCaches = RelationCacheUtils.getDate();    
        /**   获取本地的数据库数据  */
        HashMap<String, Object> localRingCaches = dBhandle.getDate();
        
        /**   循环map比对数据  */
        int num = 0;
        for (Map.Entry<String, Object> entry : localRingCaches.entrySet()) {
            String key = entry.getKey();
            Boolean isClude = RelationCacheUtils.needGenerateBill(key);
            if(!isClude){
                key=key+"\r\n";     /**   获取的值后面加换行的转义,避免数据没有换行  */
                FileUtils.write(file1, key, "UTF-8", true);
                num++;
            }

        }
        log.info("循环管理中心铃音文件,差异文件共计:"+num);

        int count = 0;
        for (Map.Entry<String, Object> entry : contentRingCaches.entrySet()) {
            String key = entry.getKey();
            Boolean isClude = dBhandle.needGenerateBill(key);
            if(!isClude){
                key=key+"\r\n";
                FileUtils.write(file2, key, "UTF-8", true);
                count++;
            }

        }
        log.info("循环内容中心铃音文件,差异文件共计:"+count);
    }

}

4:启动服务调用:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.http.util.Asserts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication(exclude = {
        TransactionAutoConfiguration.class,
        RedisAutoConfiguration.class,
        HibernateJpaAutoConfiguration.class,
        DataSourceAutoConfiguration.class,
        KafkaAutoConfiguration.class
})
@ImportResource({"classpath*:spring-context.xml"})
public class CheckDBApplication {

    private static Logger logger = LoggerFactory.getLogger(CheckDBApplication.class);

    public static void main(String[] args) {
        try {

            logger.info("开始启动Springboot...");
            SpringApplication springApplication = new SpringApplication(CheckDBApplication.class);
            ConfigurableApplicationContext springIOC = springApplication.run(args);
            logger.info("Spring加载完成");

            MybatisDbService dispacher = springIOC.getBean(MybatisDbService.class);
            Asserts.notNull(dispacher, "InformixTableDispacher");
            dispacher.checkDB();

        } catch (Exception e) {
            logger.error("异常: {}", e.getMessage(), e);
            System.exit(1);
        }

    }
}



此处记录动态获取文件数据:

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Month;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;

/**
 * 订购关系缓存工具类
 *
 * @author linzt
 * @version 2021年06月25日
 */
@Slf4j
public class RelationCacheUtils {

  // 线程安全 {phoneNumber:null}
  private static HashMap<String, Object> orderedUserCaches = new HashMap<>(4000000);

  // 号码文件路径
  private static String linuxFilePathFormat = "/home/user/order-relation-user-file/%d月视频彩铃产品用户.csv";
  private static String windowsFilePathFormat = "D:/话单上报/%d月视频彩铃产品用户.csv";

  private static String getFilePath(Month month) {
    String filePathFormat = System.getProperty("os.name").contains("Windows") ? windowsFilePathFormat : linuxFilePathFormat;
    return String.format(filePathFormat, month.getValue());
  }

  /**
   * 是否命中
   *
   * @param phoneNumber
   * @return 缓存命中则需要,否则不需要
   */
  public static boolean needGenerateBill(String phoneNumber) {
    return orderedUserCaches.containsKey(phoneNumber);
  }

  /**
   * 重新加载给定月份的数据
   *
   * @param month 指定月份
   */
  public static void reloadCaches(Month month){
    clearCaches();
    doLoadCaches(month);
  }

  /**
   * 清空缓存, 释放内存空间
   * 供后续优化扩展
   */
  public static synchronized void clearCaches() {
    orderedUserCaches.clear();
  }

  /**
   * 使用文件迭代器缓存订购用户数据
   * @param month 指定月份, 目前有3、4、5、6月
   */
  private static synchronized void doLoadCaches(Month month) {
    Assert.notNull(month, "月份不能为空!");
    String filePath = getFilePath(month);
    log.info("加载缓存文件:{}...", filePath);
    File srcFile = new File(filePath);
    if (!srcFile.exists()) {
      log.error("文件不存在:{}", filePath);
      return;
    }
    long start = System.currentTimeMillis();

    // 打印行数
    printLines(srcFile);

    // 文件迭代器
    LineIterator lineIterator;

    try {
      lineIterator = FileUtils.lineIterator(srcFile);
    } catch (Exception e) {
      log.error("获取迭代器失败:{}", e.getMessage());
      return;
    }

    while (lineIterator.hasNext()) {
      String callNumber = StringUtils.trimToEmpty(lineIterator.next());
      if (StringUtils.isNotBlank(callNumber)) {
        orderedUserCaches.put(callNumber, null);
      }
    }
    long end = System.currentTimeMillis();
    long cost = (end - start);
    log.info("缓存行数{}条, 耗时{}ms", orderedUserCaches.size(), cost);
  }

  private static void printLines(File srcFile) {
    String fileName = StringUtils.substringBeforeLast(srcFile.getName(), ".");

    // 总行数
    long totalLines = 0;
    try {
      totalLines = Files.lines(Paths.get(srcFile.getPath())).count();
    } catch (IOException ex) {
      log.error("文件行数读取失败:{}", ex.getMessage());
    }
    log.info("当前文件:{}, 共有{}行数据", fileName, totalLines);
  }


}

此处记录动态获取游标方法:

import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.cursor.defaults.DefaultCursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.defaults.DefaultSqlSession;

import java.util.Iterator;

/**
 * 同步Oracle表数据到kafka基类<br>
 * 泛型M所对应的Mapper必须新增getTableCursor方法, 否则终止任务<br>
 * 示例:<br>
 * <select id="getTableCursor" fetchSize="500" resultMap="BaseResultMap" resultSetType="FORWARD_ONLY">
 *   select * from vrbt_rotate_ring
 * </select>
 *
 * @see VrbtRotateRingMapper#getTableCursor
 * @see KafkaTopics Kafka主题常量
 * @see DefaultCursor mybatis游标默认实现
 * @see DefaultSqlSession mybatis会话默认实现
 * @author linzt
 * @author 2021-05-18
 */
@Slf4j
public abstract class NoPartitionHandler<T extends GenericEntity> extends BaseJobHandler {

  @Override
  protected void asyncTickWork() {
    // 异步处理
    log.info("任务开始...");
    NoPatitionTableSyncWorker<T> noPatitionTableSyncWorker = new NoPatitionTableSyncWorker<>();
    Thread workerThread = new Thread(noPatitionTableSyncWorker, kafkaTopic + "-线程1");
    workerThread.setDaemon(false);
    workerThread.start();
    registTask(kafkaTopic, workerThread);
  }


  /**
   * Oracle表数据同步线程工人
   * @param <E> Oracle实体表
   */
  private class NoPatitionTableSyncWorker<E extends GenericEntity> implements Runnable {


    @Override
    public void run() {

      long start = System.currentTimeMillis();
      try (
          SqlSession session = mybatisSessionFactory.openSession(); // ThreadLocal<SqlSession>
          Cursor<E> oracleTableCursor = session.selectCursor(statement)
      ) {
        long currentTime = System.currentTimeMillis();
        log.info("获取数据表游标耗时: {}ms", (currentTime - start));

        int sum = 0;
        Iterator<E> iterator = oracleTableCursor.iterator();

        // 遍历游标
        while (iterator.hasNext()) {
          kafkaService.send(kafkaTopic, getContent((T)iterator.next()));

          if (sum > 0 && sum % BATCH_LOG_SIZE == 0) {
            session.clearCache(); // 及时移除缓存对象, 避免内存泄漏
          }
          sum++;
        }
        log.info("执行结束, 总数:{}条, 总耗时:{}ms", sum, (System.currentTimeMillis() - currentTime));

      } catch (Exception e) {
        log.error("执行异常:{}", e.getMessage(), e);

      } finally {
        unRegistTask(kafkaTopic, Thread.currentThread());
      }
    }
  }

  /**
   * 获取发送字符串,子类自行实现
   * @return
   */
  protected abstract String getContent(T t);

}

import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.lang.reflect.ParameterizedType;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public abstract class BaseJobHandler<T extends GenericEntity> extends IJobHandler {

  @Autowired
  KafkaService kafkaService;

  @Autowired
  SqlSessionFactory mybatisSessionFactory;

  String kafkaTopic;

  String statement; // 获取游标的方法路径

  private Class<T> entityClass;

  protected Map<String, Set<Thread>> currentTaskMap = new HashMap<>();

  private final ReentrantLock lock = new ReentrantLock();

  static final int BATCH_LOG_SIZE = 1000; // 每缓存1000个实体则清空缓存

  protected String SPLIT = "|";
  protected String LINE = System.lineSeparator();

  public BaseJobHandler() {
    // 通过反射获取泛型类型
    ParameterizedType parameterizedType =
        (ParameterizedType) this.getClass().getGenericSuperclass();
    this.entityClass = (Class<T>) parameterizedType.getActualTypeArguments()[0];

    log.info("泛型类型:{},", entityClass.getSimpleName());
  }

  @PostConstruct
  public void init(){
    this.kafkaTopic = getKafkaTopic();
    this.statement = getMethodStatement();
  }

  @Override
  public ReturnT<String> execute(String args) {

    // 参数检查
    initAndCheck(args);

    // 分配任务
    asyncTickWork();

    return buildResponse("任务已受理");
  }

  private void initAndCheck(String args) {
    init();
    try {
      Assert.state(kafkaService.existTopic(kafkaTopic), "主题不存在!");
      Assert.state(mybatisSessionFactory.getConfiguration().hasStatement(statement), "方法不存在!");
    } catch (Exception e) {
      throwEx(e.getMessage());
    }
    killPreTaskIfNessary(args);
  }

  /**
   * 该主题是否有未完成的任务, 有则终止
   * @param xxlJobArgs
   */
  private void killPreTaskIfNessary(String xxlJobArgs) {
    if (CollectionUtils.isNotEmpty(currentTaskMap.get(kafkaTopic))) {

      boolean forceContinue = StringUtils.equals(xxlJobArgs, "force");
      if (!forceContinue)
        throwEx("当前已有任务处理");

      logAll("已有任务, 强制终止线程后继续处理...");
      for (Thread thread : currentTaskMap.get(kafkaTopic)) {
        try {
          thread.interrupt();
        } catch (Exception e) {
          throwEx(thread.getName() + "终止线程失败:" + e.getMessage());
        }
        unRegistTask(kafkaTopic, thread);
      }
    }
  }

  private void throwEx(String message){
    logAll(message);
    throw new IllegalStateException(message);
  }

  private ReturnT buildResponse(String message) {
    logAll(message);
    return new ReturnT(message);
  }

  private void logAll(String message) {
    log.info(message);
    XxlJobLogger.log(message);
  }

  /**
   * 获取方法路径ID
   * @return
   */
  private String getMethodStatement() {
    String statementFormat = "cn.womusic.vrbt.db.dao.mapper.%sMapper.getTableCursor";
    return String.format(statementFormat, entityClass.getSimpleName());
  }

  protected void registTask(String kafkaTopic, Thread thread) {
    lock.lock();
    try {
      Set<Thread> topicThreads = currentTaskMap.getOrDefault(kafkaTopic, new HashSet<>());
      log.info("{}注册工人:{}", kafkaTopic, thread.getName());
      topicThreads.add(thread);
      currentTaskMap.put(kafkaTopic, topicThreads);
    } finally {
      lock.unlock();
    }
  }

  protected void unRegistTask(String kafkaTopic, Thread thread){
    Set<Thread> topicThreads = currentTaskMap.get(kafkaTopic);
    if (CollectionUtils.isEmpty(topicThreads)) {
      return;
    }
    log.info("{}完成了", thread.getName());
    if (topicThreads.size() == 1) {
      lock.lock();
      try {
        topicThreads.clear();
        currentTaskMap.remove(kafkaTopic);
        return;
      } finally {
        lock.unlock();
      }
    }
    if (thread == null) {
      thread = Thread.currentThread();
    }
    lock.lock();
    try {
      topicThreads.remove(thread);
      currentTaskMap.put(kafkaTopic, topicThreads);
    } finally {
      lock.unlock();
    }

  }

  @PreDestroy
  public void preDestroy(){
    currentTaskMap.clear();
  }


  /**
   * 获取Kafka主题
   * 务必提前创建好该主题
   *
   * @return
   */
  protected abstract String getKafkaTopic();


  /**
   * 异步拆解工作, 子类自行实现
   *
   */
  protected abstract void asyncTickWork();

  protected String filterDate(Date date){
    return date == null ? "" : DateTimeUtils.formatBasic(date);
  }

  protected String defalutString(String str){
    return StringUtils.defaultString(str,"");
  }

}

import com.xxl.job.core.handler.annotation.JobHandler;
import org.springframework.stereotype.Component;

/**
 * 同将对象转换为string
 * 302万 cost48秒
 *
 * @author linzt
 * @date 2021-05-18
 */
@Component
@JobHandler(value = "rotateHandler")
public class RotateHandler extends NoPartitionHandler<VrbtRotate> {

  @Override
  public String getKafkaTopic() {
    return KafkaTopics.VRBT_ROTATE;
  }

  @Override
  protected String getContent(VrbtRotate rotate) {
    return new StringBuilder()
            .append(rotate.getRotateId()).append(SPLIT)
            .append(rotate.getPhoneNumber()).append(SPLIT)
            .append(rotate.getRingTotal()).append(SPLIT)
            .append(rotate.getRingMax()).append(SPLIT)
            .append(filterDate(rotate.getCreateDate())).append(SPLIT)
            .append(filterDate(rotate.getUpdateDate())).append(SPLIT)
            .append(rotate.getRotateName()).append(SPLIT)
            .append(rotate.getIsVideo()).append(SPLIT)
            .append(defalutString(rotate.getChannelId())).append(SPLIT)
            .toString();
  }
}

相关标签: file java