elasticsearch结合canal实战-问题搜索系统
程序员文章站
2022-07-05 14:26:41
...
你是否有这样的困惑?刚进一家公司后遇到各种bug,而这些bug很多和公司所使用的技术栈有关,有些问题你排查起来很难定位到具体错误原因,解决起来会很废时间,当然你可以咨询同事,但是他们也不一定有空,也可能不知道,于是在接触es与canal后我想写这样的一个简单的问题搜索系统,帮助你快速搜索到类似到问题,而不是在百度里面去大海捞针。
- 系统还不是很完善,还有很多不足到地方,比如前端页面显示数据到问题,本人主要是后台开发,前端这部分还没有写,如果有感兴趣的小伙伴,欢迎加入,GitHub:https://github.com/zhuifengzheng/question-search.git
1、适用场景
1、你自己的问题库,记录你学习中遇到的问题,下次遇到类似问题时如果记不清之前解答思路了,就可以通过问题搜索系统搜索你之前记录的答案。
2、团队的问题库,在团队中的每个人,遇到开发中的bug,之后解决了该bug,就可以记录到问题搜索系统,这样如果团队中还有其他人遇到类似到问题,那么就可以通过问题搜索系统找到解答思路。
3、公司的问题库,公司中的开发遇到的bug以及解决的思路都可以记录到问题系统,这样对于之后到新人会有很大帮助。
2、相关版本
需要提前安装下面软件
1、jdk1.8
2、elasticsearch 6.8.x
3、kibana 6.8.x
4、ik分词器 6.8.x
5、canal 1.3.x
6、mysql 5.7
说明:mysql的binlog日志需求提前开启
3、表结构
CREATE TABLE `question` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(200) DEFAULT NULL,#问题名称
`english_name` varchar(200) DEFAULT NULL,#问题英文名称
`description` varchar(255) DEFAULT NULL,#问题描述
`content` text,#问题的具体内容
`create_by` varchar(20) DEFAULT NULL,#创建问题的人
`update_by` varchar(20) DEFAULT NULL,#更新问题的人
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4;
4、elasticsearch创建索引脚本
{
"aliases": {
"index_question": {}
},
"settings": {
"number_of_replicas": 0,
"number_of_shards": 2
},
"mappings": {
"index_question": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"english_name": {
"type": "text",
"analyzer": "english"
},
"description": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"content": {
"type": "text"
},
"create_by": {
"type": "keyword"
},
"update_by": {
"type": "keyword"
},
"create_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"update_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
}
}
}
}
5、初始化canal连接
@Bean
public CanalConnector getCanalConnector() {
// CanalConnectors.newSingleConnector
log.info("===开始连接cannal");
canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(
new InetSocketAddress(canalProperties.getHostname(), canalProperties.getPort())),
canalProperties.getDestination(), canalProperties.getUsername(), canalProperties.getPassword()
);
canalConnector.connect();
canalConnector.subscribe("er.question");
// canalConnector.subscribe("*.*");
canalConnector.rollback();
log.info("===cannal连接初始化成功");
return canalConnector;
}
6、canal监听binlog日志
public class CanalScheduling implements Runnable {
@Autowired
private CanalConnector canalConnector;
@Autowired
private RestHighLevelClient highLevelClient;
/**
* 这里是针对数据多的情况,一般来说可以写个定时任务,每天凌晨执行一次更新即可
*
* @Scheduled(cron = "0 0 0 * * ? *") -> 每天凌晨执行一次即可
* @Scheduled(fixedDelay = 2000) -> 每两秒执行一次,数据更新频繁可以用
*/
@Override
@Scheduled(fixedDelay = 2000)
public void run() {
log.info("===开始同步mysql中的数据");
long batchId = -1;
try {
// 一次取1000条数据
int batchSize = 1000;
Message message = canalConnector.getWithoutAck(batchSize);
batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
entries.forEach(entry -> {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 解析处理
publishCanalEvent(entry);
}
});
}
// 提交确认消费完毕
canalConnector.ack(batchId);
} catch (Exception e) {
e.printStackTrace();
// 失败的话进行回滚
canalConnector.rollback(batchId);
}
}
/**
* 解析收到的日志事件
*
* @param entry
*/
private void publishCanalEvent(CanalEntry.Entry entry) {
log.info("收到canal消息{}", entry.toString());
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
return;
}
CanalEntry.RowChange change;
try {
change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return;
}
CanalEntry.EventType eventType = change.getEventType();
change.getRowDatasList().forEach(rowData -> {
List<CanalEntry.Column> columns;
// 对于es来说 只要关注 delete 和 update 还有insert
if (eventType == CanalEntry.EventType.DELETE) {
// 删除是拿到之前到数据,不然删除后就没有了
columns = rowData.getBeforeColumnsList();
} else {
// 修改后到数据
columns = rowData.getAfterColumnsList();
}
// 解析成map成格式
Map<String, Object> dataMap = parseColumnsToMap(columns);
// 存入es中
indexES(dataMap, eventType);
});
}
/**
* 解析变化的列
*
* @param columns
* @return
*/
Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
Map<String, Object> jsonMap = new HashMap<>();
columns.forEach(column -> {
if (column == null) {
return;
}
jsonMap.put(column.getName(), column.getValue());
});
return jsonMap;
}
/**
* 修改es数据
*
* @param dataMap
* @param eventType
*/
private void indexES(Map<String, Object> dataMap, CanalEntry.EventType eventType) {
try {
if (eventType == CanalEntry.EventType.DELETE) {
log.info("删除索引Id={},type={},value={}", dataMap.get("id"), eventType.toString(), dataMap.toString());
DeleteRequest deleteRequest = new DeleteRequest("book", "_doc", String.valueOf(dataMap.get("id")));
highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} else {
//如果是又业务关联的 这里就要写自己的业务代码
log.info("更新索引Id={},type={},value={}", dataMap.get("id"), eventType.toString(), dataMap.toString());
IndexRequest indexRequest = new IndexRequest("question", "index_question");
indexRequest.id(String.valueOf(dataMap.get("id")));
indexRequest.source(dataMap);
highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
7、elasticsearch写入数据的相关操作说明
这里不贴冗长的代码了,感兴趣的小伙伴可以去github上面查看源码,这里只讲思路
1、全量更新索引:主要是初始化es中的数据,将mysql中的数据导入到es。
2、增量更新索引:主要是通过canal监听mysql,数据库有增删改操作时更新操作后到数据到es中。