Elasticsearch全文检索企业开发记录总结:数据同步
程序员文章站
2022-03-15 11:21:53
同步机制
概述
本次项目中数据实时同步没有使用一些同步插件例如go-mysql-elasticsearch、elasticsearch-jdbc等同步技术,而是根据企业业务和项目的结构的特...
同步机制
概述
本次项目中数据实时同步没有使用一些同步插件例如go-mysql-elasticsearch、elasticsearch-jdbc等同步技术,而是根据企业业务和项目的结构的特殊性采用了更适合项目和业务需求的多线程任务调度数据同步的机制。
同步模块 同步流程业务数据改变–>修改关系型数据库数据–>将修改的记录主键、操作等信息放入队列充当生产者–> 事件调度任务利用多线程获取数据充当消费者,获取队首数据–>ES检索服务通过队列数据获取到相应的记录数据更新,同步到ES,实现实时同步更新。
主要代码实现public class InstantiationTracingBeanPostProcessor implements ApplicationListener { private static Logger log = LoggerFactory.getLogger(InstantiationTracingBeanPostProcessor.class); @Autowired private EventQueue eventQueue; @Autowired private EsSyncTask esSyncTask; private static final String THREAD_COUNT = SystemConfig.get("es.sync.thread.count"); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (contextRefreshedEvent.getApplicationContext().getParent() == null) { System.out.println("Application starting ..."); try { System.out.println("Event start ..."); eventQueue.popQueue(); int threadCount = 1; try { threadCount = Integer.parseInt(THREAD_COUNT); } catch (Exception e) { log.error("Wrong thread count configuration \"s%\", use default 1 thread to sync.", THREAD_COUNT); } System.out.println("Elasticsearch sync thread is starting, current thread count : " + threadCount); esSyncTask.start(threadCount); } catch (InterruptedException e) { log.error("Tasks start error, make sure redis is running : ", e); } System.out.println("application start end..."); } } }
public void start(int threadCount) { if (StringUtils.hasLength(ES_SYNC_RETRY_TIMES_CONFIG)) { ES_SYNC_RETRY_TIMES = Integer.valueOf(ES_SYNC_RETRY_TIMES_CONFIG); } ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { executorService.execute(new SyncThread()); } } /** * 同步线程 */ class SyncThread implements Runnable { @Override public void run() { while (true) { String threadName = Thread.currentThread().getName() + ":" + Thread.currentThread().getId(); String json = redisService.removeFromHead(RedisConstant.ES_DATA_SYNC_KEY); if (StringUtils.hasLength(json)) { long start = System.currentTimeMillis(); log.info("[" + threadName + "]Start to sync data : " + json); SaveData saveData = JSON.parseObject(json, SaveData.class); String id = saveData.getId(); if (!StringUtils.hasLength(id)) { log.warn("No id in saveData be found."); continue; } int type = saveData.getType(); int oper = saveData.getOper(); if (EsSyncConst.SyncType.HOTEL.codeOf().equals(type)) { syncHotel(id, saveData); } else if (EsSyncConst.SyncType.HOTEL_SCORE.codeOf().equals(type)) { syncHotelScore(id, saveData); } else if (EsSyncConst.SyncType.ROOM_PRICE.codeOf().equals(type)) { syncRoomPrice(id, saveData); } else if (EsSyncConst.SyncType.ROOM_PRICE_BY_ROOM.codeOf().equals(type)) { syncRoomPricesByRoom(id, saveData); } else if (EsSyncConst.SyncType.NIGHT_USER_BY_ROOM.codeOf().equals(type)) { syncNightUserByRoom(id, saveData); } else { log.warn("Type in saveData is unsupported : " + type); } long end = System.currentTimeMillis(); log.info("[" + threadName + "]Sync data finished, spend : " + (end - start) + "ms."); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
上一篇: 今日头条运营秘诀分享:让你的文章阅读量和收入都“飞”起来
下一篇: HDFS读写数据的原理