欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

elasticsearch结合canal实战-问题搜索系统

程序员文章站 2022-07-05 14:26:41
...

你是否有这样的困惑?刚进一家公司后遇到各种bug,而这些bug很多和公司所使用的技术栈有关,有些问题你排查起来很难定位到具体错误原因,解决起来会很废时间,当然你可以咨询同事,但是他们也不一定有空,也可能不知道,于是在接触es与canal后我想写这样的一个简单的问题搜索系统,帮助你快速搜索到类似到问题,而不是在百度里面去大海捞针。

  • 系统还不是很完善,还有很多不足到地方,比如前端页面显示数据到问题,本人主要是后台开发,前端这部分还没有写,如果有感兴趣的小伙伴,欢迎加入,GitHub:https://github.com/zhuifengzheng/question-search.git

1、适用场景

1、你自己的问题库,记录你学习中遇到的问题,下次遇到类似问题时如果记不清之前解答思路了,就可以通过问题搜索系统搜索你之前记录的答案。

2、团队的问题库,在团队中的每个人,遇到开发中的bug,之后解决了该bug,就可以记录到问题搜索系统,这样如果团队中还有其他人遇到类似到问题,那么就可以通过问题搜索系统找到解答思路。

3、公司的问题库,公司中的开发遇到的bug以及解决的思路都可以记录到问题系统,这样对于之后到新人会有很大帮助。

2、相关版本

需要提前安装下面软件

1、jdk1.8

2、elasticsearch 6.8.x

3、kibana 6.8.x

4、ik分词器 6.8.x

5、canal 1.3.x

6、mysql 5.7

说明:mysql的binlog日志需求提前开启

3、表结构


CREATE TABLE `question` (

  `id` bigint(20) NOT NULL AUTO_INCREMENT,

  `name` varchar(200) DEFAULT NULL,#问题名称

  `english_name` varchar(200) DEFAULT NULL,#问题英文名称

  `description` varchar(255) DEFAULT NULL,#问题描述

  `content` text,#问题的具体内容

  `create_by` varchar(20) DEFAULT NULL,#创建问题的人

  `update_by` varchar(20) DEFAULT NULL,#更新问题的人

  `create_time` datetime DEFAULT NULL,

  `update_time` datetime DEFAULT NULL,

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4;

4、elasticsearch创建索引脚本


{
  "aliases": {
    "index_question": {}
  },
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 2
  },
  "mappings": {
    "index_question": {
      "properties": {
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_smart"
        },
        "english_name": {
          "type": "text",
          "analyzer": "english"
        },
        "description": {
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_smart"
        },
        "content": {
          "type": "text"
        },
        "create_by": {
          "type": "keyword"
        },
        "update_by": {
          "type": "keyword"
        },
        "create_time": {
          "format": "yyyy-MM-dd HH:mm:ss",
          "type": "date"
        },
        "update_time": {
          "format": "yyyy-MM-dd HH:mm:ss",
          "type": "date"
        }
      }
    }
  }
}

5、初始化canal连接


@Bean
    public CanalConnector getCanalConnector() {
        // CanalConnectors.newSingleConnector
        log.info("===开始连接cannal");
        canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(
                new InetSocketAddress(canalProperties.getHostname(), canalProperties.getPort())),
                canalProperties.getDestination(), canalProperties.getUsername(), canalProperties.getPassword()
        );

        canalConnector.connect();

        canalConnector.subscribe("er.question");
//        canalConnector.subscribe("*.*");

        canalConnector.rollback();
        log.info("===cannal连接初始化成功");
        return canalConnector;
    }

6、canal监听binlog日志


public class CanalScheduling implements Runnable {


    @Autowired
    private CanalConnector canalConnector;

    @Autowired
    private RestHighLevelClient highLevelClient;

    /**
     * 这里是针对数据多的情况,一般来说可以写个定时任务,每天凌晨执行一次更新即可
     *
     * @Scheduled(cron = "0 0 0 * * ? *") -> 每天凌晨执行一次即可
     * @Scheduled(fixedDelay = 2000) -> 每两秒执行一次,数据更新频繁可以用
     */
    @Override
    @Scheduled(fixedDelay = 2000)
    public void run() {
        log.info("===开始同步mysql中的数据");
        long batchId = -1;
        try {
            // 一次取1000条数据
            int batchSize = 1000;
            Message message = canalConnector.getWithoutAck(batchSize);
            batchId = message.getId();
            List<CanalEntry.Entry> entries = message.getEntries();
            if (batchId != -1 && entries.size() > 0) {
                entries.forEach(entry -> {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        // 解析处理
                        publishCanalEvent(entry);
                    }
                });
            }
            // 提交确认消费完毕
            canalConnector.ack(batchId);
        } catch (Exception e) {
            e.printStackTrace();
            // 失败的话进行回滚
            canalConnector.rollback(batchId);
        }

    }

    /**
     * 解析收到的日志事件
     *
     * @param entry
     */
    private void publishCanalEvent(CanalEntry.Entry entry) {
        log.info("收到canal消息{}", entry.toString());
        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            return;
        }

        CanalEntry.RowChange change;
        try {
            change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            return;
        }
        CanalEntry.EventType eventType = change.getEventType();
        change.getRowDatasList().forEach(rowData -> {

            List<CanalEntry.Column> columns;
            // 对于es来说 只要关注 delete 和 update 还有insert
            if (eventType == CanalEntry.EventType.DELETE) {
                // 删除是拿到之前到数据,不然删除后就没有了
                columns = rowData.getBeforeColumnsList();
            } else {
                // 修改后到数据
                columns = rowData.getAfterColumnsList();
            }
            // 解析成map成格式
            Map<String, Object> dataMap = parseColumnsToMap(columns);
            // 存入es中
            indexES(dataMap, eventType);

        });
    }

    /**
     * 解析变化的列
     *
     * @param columns
     * @return
     */
    Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
        Map<String, Object> jsonMap = new HashMap<>();
        columns.forEach(column -> {
            if (column == null) {
                return;
            }
            jsonMap.put(column.getName(), column.getValue());
        });
        return jsonMap;
    }

    /**
     * 修改es数据
     *
     * @param dataMap
     * @param eventType
     */
    private void indexES(Map<String, Object> dataMap, CanalEntry.EventType eventType) {
        try {
            if (eventType == CanalEntry.EventType.DELETE) {
                log.info("删除索引Id={},type={},value={}", dataMap.get("id"), eventType.toString(), dataMap.toString());
                DeleteRequest deleteRequest = new DeleteRequest("book", "_doc", String.valueOf(dataMap.get("id")));
                highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            } else {
                //如果是又业务关联的 这里就要写自己的业务代码
                log.info("更新索引Id={},type={},value={}", dataMap.get("id"), eventType.toString(), dataMap.toString());

                IndexRequest indexRequest = new IndexRequest("question", "index_question");
                indexRequest.id(String.valueOf(dataMap.get("id")));
                indexRequest.source(dataMap);
                highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

7、elasticsearch写入数据的相关操作说明

这里不贴冗长的代码了,感兴趣的小伙伴可以去github上面查看源码,这里只讲思路

1、全量更新索引:主要是初始化es中的数据,将mysql中的数据导入到es。

2、增量更新索引:主要是通过canal监听mysql,数据库有增删改操作时更新操作后到数据到es中。