2022-06-27 23:21:21
同步模块 同步流程业务数据改变–>修改关系型数据库数据–>将修改的记录主键、操作等信息放入队列充当生产者–> 事件调度任务利用多线程获取数据充当消费者,获取队首数据–>ES检索服务通过队列数据获取到相应的记录数据更新,同步到ES,实现实时同步更新。
主要代码实现public class InstantiationTracingBeanPostProcessor implements ApplicationListener { private static Logger log = LoggerFactory.getLogger(InstantiationTracingBeanPostProcessor.class); @Autowired private EventQueue eventQueue; @Autowired private EsSyncTask esSyncTask; private static final String THREAD_COUNT = SystemConfig.get("es.sync.thread.count"); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (contextRefreshedEvent.getApplicationContext().getParent() == null) { System.out.println("Application starting ..."); try { System.out.println("Event start ..."); eventQueue.popQueue(); int threadCount = 1; try { threadCount = Integer.parseInt(THREAD_COUNT); } catch (Exception e) { log.error("Wrong thread count configuration \"s%\", use default 1 thread to sync.", THREAD_COUNT); } System.out.println("Elasticsearch sync thread is starting, current thread count : " + threadCount); esSyncTask.start(threadCount); } catch (InterruptedException e) { log.error("Tasks start error, make sure redis is running : ", e); } System.out.println("application start end..."); } } }
public void start(int threadCount) { if (StringUtils.hasLength(ES_SYNC_RETRY_TIMES_CONFIG)) { ES_SYNC_RETRY_TIMES = Integer.valueOf(ES_SYNC_RETRY_TIMES_CONFIG); } ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { executorService.execute(new SyncThread()); } } /** * 同步线程 */ class SyncThread implements Runnable { @Override public void run() { while (true) { String threadName = Thread.currentThread().getName() + ":" + Thread.currentThread().getId(); String json = redisService.removeFromHead(RedisConstant.ES_DATA_SYNC_KEY); if (StringUtils.hasLength(json)) { long start = System.currentTimeMillis(); log.info("[" + threadName + "]Start to sync data : " + json); SaveData saveData = JSON.parseObject(json, SaveData.class); String id = saveData.getId(); if (!StringUtils.hasLength(id)) { log.warn("No id in saveData be found."); continue; } int type = saveData.getType(); int oper = saveData.getOper(); if (EsSyncConst.SyncType.HOTEL.codeOf().equals(type)) { syncHotel(id, saveData); } else if (EsSyncConst.SyncType.HOTEL_SCORE.codeOf().equals(type)) { syncHotelScore(id, saveData); } else if (EsSyncConst.SyncType.ROOM_PRICE.codeOf().equals(type)) { syncRoomPrice(id, saveData); } else if (EsSyncConst.SyncType.ROOM_PRICE_BY_ROOM.codeOf().equals(type)) { syncRoomPricesByRoom(id, saveData); } else if (EsSyncConst.SyncType.NIGHT_USER_BY_ROOM.codeOf().equals(type)) { syncNightUserByRoom(id, saveData); } else { log.warn("Type in saveData is unsupported : " + type); } long end = System.currentTimeMillis(); log.info("[" + threadName + "]Sync data finished, spend : " + (end - start) + "ms."); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
上一篇: 关于店铺装修设计的几个要点!