欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用ElasticSearch6.0快速实现全文搜索功能的示例代码

程序员文章站 2022-06-11 12:11:29
本文不涉及elasticsearch具体原理,只记录如何快速的导入mysql中的数据进行全文检索。 工作中需要实现一个搜索功能,并且导入现有数据库数据,组长推荐用elas...

本文不涉及elasticsearch具体原理,只记录如何快速的导入mysql中的数据进行全文检索。

工作中需要实现一个搜索功能,并且导入现有数据库数据,组长推荐用elasticsearch实现,网上翻一通教程,都是比较古老的文章了,无奈只能自己摸索,参考es的文档,总算是把服务搭起来了,记录下,希望有同样需求的朋友可以少走弯路,能按照这篇教程快速的搭建一个可用的elasticsearch服务。

es的搭建

es搭建有直接下载zip文件,也有docker容器的方式,相对来说,docker更适合我们跑es服务。可以方便的搭建集群或建立测试环境。这里使用的也是容器方式,首先我们需要一份dockerfile:

from docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0
# 提交配置 包括新的elasticsearch.yml 和 keystore.jks文件
copy --chown=elasticsearch:elasticsearch conf/ /usr/share/elasticsearch/config/
# 安装ik
run ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip
# 安装readonlyrest
run ./bin/elasticsearch-plugin install https://github.com/hyy-yu/beziercurvedemo/raw/master/readonlyrest-1.16.14_es6.0.0.zip

user elasticsearch
cmd ./bin/elasticsearch

这里对上面的操作做一下说明:

  1. 首先在dockerfile下的同级目录中需要建立一个conf文件夹,保存elasticsearch.yml文件(稍后给出)和keystore.jks。(jks是自签名文件,用于https,如何生成请自行搜索)
  2. ik是一款很流行的中文分词库,使用它来支持中文搜索。
  3. readonlyrest是一款开源的es插件,用于用户管理、安全验证,土豪可以使用es自带的x-pack包,有更完善的安全功能。

elactic配置 elasticsearch.yml

cluster.name: "docker-cluster"
network.host: 0.0.0.0

# minimum_master_nodes need to be explicitly set when bound on a public ip
# set to 1 to allow single node clusters
# details: https://github.com/elastic/elasticsearch/pull/17288
discovery.zen.minimum_master_nodes: 1

# 禁止系统对es交换内存
bootstrap.memory_lock: true

http.type: ssl_netty4

readonlyrest:
 enable: true
 ssl:
  enable: true
  keystore_file: "server.jks"
  keystore_pass: server
  key_pass: server

 access_control_rules:

  - name: "block 1 - root"
   type: allow
   groups: ["admin"]

  - name: "user read only - paper"
   groups: ["user"]
   indices: ["paper*"]
   actions: ["indices:data/read/*"]

 users:

  - username: root
   auth_key_sha256: cb7c98bae153065db931980a13bd45ee3a77cb8f27a7dfee68f686377acc33f1
   groups: ["admin"]

  - username: xiaoming
   auth_key: xiaoming:xiaoming
   groups: ["user"]

这里bootstrap.memory_lock: true是个坑,这里文档已经说明了,有的os会在运行时把暂时不用的内存交换到硬盘的一块区域,然而这种行为会让es的资源占用率飙升,甚至让系统无法响应。

配置文件里已经很明显了,一个root用户属于admin组,而admin有所有权限,xiaoming同学因为在user组,只能访问paper索引,并且只能读取,不能操作。更详细的配置请见:

至此,es的准备工作算是做完了,docker build -t esimage:tag 一下,docker run -p 9200:9200 esimage:tag跑起来。
如果https://127.0.0.1:9200/返回

{
  "name": "vakwrir",
  "cluster_name": "docker-cluster",
  "cluster_uuid": "ysydowkvrh2swz907s2m_w",
  "version": {
    "number": "6.0.0",
    "build_hash": "8f0685b",
    "build_date": "2017-11-10t18:41:22.859z",
    "build_snapshot": false,
    "lucene_version": "7.0.1",
    "minimum_wire_compatibility_version": "5.6.0",
    "minimum_index_compatibility_version": "5.0.0"
  },
  "tagline": "you know, for search"
}

我们本次教程的主角算是出场了,分享几个常用的api调戏调试es用:

{{url}}替换成你本地的es地址。

  1. 查看所有插件:{{url}}/_cat/plugins?v
  2. 查看所有索引:{{url}}/_cat/indices?v
  3. 对es进行健康检查:{{url}}/_cat/health?v
  4. 查看当前的磁盘占用率:{{url}}/_cat/allocation?v

导入mysql数据

这里我使用的是mysql数据,其实其它的数据库也是一样,关键在于如何导入,网上教程会推荐logstash、beat、es的mysql插件进行导入,我也都实验过,配置繁琐,文档稀少,要是数据库结构复杂一点,导入是个劳心劳神的活计,所以并不推荐。其实es在各个语言都有对应的api库,你在语言层面把数据组装成json,通过api库发送到es即可。流程大致如下:

使用ElasticSearch6.0快速实现全文搜索功能的示例代码

我使用的是golang的es库elastic,其它语言可以去github上自行搜索,操作的方式都是一样的。

接下来使用一个简单的数据库做介绍:

paper表

id name
1 北京第一小学模拟卷
2 江西北京通用高考真题

province表

id name
1 北京
2 江西

paper_province表

paper_id province_id
1 1
2 1
2 2

如上,paper和province是多对多关系,现在把paper数据打入es,,可以按paper名称模糊搜索,也可通过province进行筛选。json数据格式如下:

{
  "id":1,
  "name": "北京第一小学模拟卷",
  "provinces":[
    {
      "id":1,
      "name":"北京"
    }
  ]
}

首先准备一份mapping.json文件,这是在es中数据的存储结构定义,

{
  "mappings":{
    "docs":{
  "include_in_all": false, 
      "properties":{
        "id":{
          "type":"long"
        },
        "name":{
          "type":"text",
          "analyzer":"ik_max_word" // 使用最大词分词器
        },
        "provinces":{
          "type":"nested",
          "properties":{
            "id":{
              "type":"integer"
            },
            "name":{
              "type":"text",
              "index":"false" // 不索引
            }
          }
        }
      }
    }
  },
  "settings":{
    "number_of_shards":1,
    "number_of_replicas":0
  }
}

需要注意的是取消_all字段,这个默认的_all会收集所有的存储字段,实现无条件限制的搜索,缺点是空间占用大。

shard(分片)数我设置为了1,没有设置replicas(副本),毕竟这不是一个集群,处理的数据也不是很多,如果有大量数据需要处理可以自行设置分片和副本的数量。

首先与es建立连接,ca.crt与jks自签名有关。当然,在这里我使用insecureskipverify忽略了证书文件的验证。

func initelasticsearch() {
 pool := x509.newcertpool()
 crt, err0 := ioutil.readfile("conf/ca.crt")
 if err0 != nil {
 cannotopenes(err0, "read crt file err")
 return
 }

 pool.appendcertsfrompem(crt)
 tr := &http.transport{
 tlsclientconfig: &tls.config{rootcas: pool, insecureskipverify: true},
 }
 httpclient := &http.client{transport: tr}

 //后台构造elasticclient
 var err error
 elasticclient, err = elastic.newclient(elastic.seturl(myconfig.elasticurl),
 elastic.seterrorlog(getlogger()),
 elastic.setgzip(true),
 elastic.sethttpclient(httpclient),
 elastic.setsniff(false), // 集群嗅探,单节点记得关闭。
 elastic.setscheme("https"),
 elastic.setbasicauth(myconfig.elasticusername, myconfig.elasticpassword))
 if err != nil {
 cannotopenes(err, "search_client_error")
 return
 }
 //elasticclient构造完成

 //查询是否有paper索引
 exist, err := elasticclient.indexexists(myconfig.elasticindexname).do(context.background())
 if err != nil {
 cannotopenes(err, "exist_paper_index_check")
 return
 }

 //索引存在且通过完整性检查则不发送任何数据
 if exist {
 if !isindexintegrity(elasticclient) {
  //删除当前索引  准备重建
  deleteresponse, err := elasticclient.deleteindex(myconfig.elasticindexname).do(context.background())
  if err != nil || !deleteresponse.acknowledged {
  cannotopenes(err, "delete_index_error")
  return
  }
 } else {
  return
 }
 }

 //后台查询数据库,发送数据到elasticsearch中
 go fetchdbgetallpaperandsendtoes()
}

type papersearch struct {
 paperid  int64   `gorm:"primary_key;column:f_paper_id;type:bigint(20)" json:"id"`
 name    string  `gorm:"column:f_name;size:80" json:"name"`
 provinces []province `gorm:"many2many:t_paper_province;" json:"provinces"`    // 试卷适用的省份
}

func fetchdbgetallpaperandsendtoes() {
 //fetch paper
 var allpaper []papersearch

 getdb().table("t_papers").find(&allpaper)

 //province
 for i := range allpaper {
 var allpro []province
 getdb().table("t_provinces").joins("inner join `t_paper_province` on `t_paper_province`.`province_f_province_id` = `t_provinces`.`f_province_id`").
  where("t_paper_province.paper_f_paper_id = ?", allpaper[i].paperid).find(&allpro)
 allpaper[i].provinces = allpro
 }

 if len(allpaper) > 0 {
 //send to es - create index
 createservice := getelasticsearch().createindex(myconfig.elasticindexname)
 // 此处的index_default_setting就是上面mapping.json中的内容。
 createservice.body(index_default_setting)
 createresult, err := createservice.do(context.background())
 if err != nil {
  cannotopenes(err, "create_paper_index")
  return
 }

 if !createresult.acknowledged || !createresult.shardsacknowledged {
  cannotopenes(err, "create_paper_index_fail")
 }

 // - send all paper
 bulkrequest := getelasticsearch().bulk()

 for i := range allpaper {
  indexreq := elastic.newbulkindexrequest().optype("create").index(myconfig.elasticindexname).type("docs").
  id(helper.int64tostring(allpaper[i].paperid)).
  doc(allpaper[i])

  bulkrequest.add(indexreq)
 }

 // do sends the bulk requests to elasticsearch
 bulkresponse, err := bulkrequest.do(context.background())
 if err != nil {
  cannotopenes(err, "insert_docs_error")
  return
 }

 // bulk request actions get cleared
 if len(bulkresponse.created()) != len(allpaper) {
  cannotopenes(err, "insert_docs_nums_error")
  return
 }
 //send success
 }
}

跑通上面的代码后,使用{{url}}/_cat/indices?v看看es中是否出现了新创建的索引,使用{{url}}/papers/_search看看命中了多少文档,如果文档数等于你发送过去的数据量,搜索服务就算跑起来了。

搜索

现在就可以通过provinceid和q来搜索试卷,默认按照相关度评分排序。

//q 搜索字符串 provinceid 限定省份id limit page 分页参数
func searchpaper(q string, provinceid uint, limit int, page int) (list []papersearch, totalpage int, currentpage int, pageisend int, returnerr error) {
 //不满足条件,使用数据库搜索
 if !canuseelasticsearch && !myconfig.useelasticsearch {
 return searchpaperlocal(q, courseid, gradeid, provinceid, papertypeid, limit, page)
 }

 list = make([]papersimple, 0)
 totalpage = 0
 currentpage = page
 pageisend = 0
 returnerr = nil

 client := getelasticsearch()
 if client == nil {
 return searchpaperlocal(q, courseid, gradeid, provinceid, papertypeid, limit, page)
 }

 //elasticsearch有问题,使用数据库搜索
 if !isindexintegrity(client) {
 return searchpaperlocal(q, courseid, gradeid, provinceid, papertypeid, limit, page)
 }

 if !client.isrunning() {
 client.start()
 }
 defer client.stop()

 q = html.escapestring(q)
 boolquery := elastic.newboolquery()
 // paper.name
 matchquery := elastic.newmatchquery("name", q)

 //省份
 if provinceid > 0 && provinceid != default_province_all {
 probool := elastic.newboolquery()
 tpro := elastic.newtermquery("provinces.id", provinceid)
 pronest := elastic.newnestedquery("provinces", probool.must(tpro))
 boolquery.must(pronest)
 }

 boolquery.must(matchquery)

 for _, e := range termquerys {
 boolquery.must(e)
 }

 highligt := elastic.newhighlight()
 highligt.field(elastic_search_search_field_name)
 highligt.pretags(elastic_search_search_field_tag_start)
 highligt.posttags(elastic_search_search_field_tag_end)
 searchresult, err2 := client.search(myconfig.elasticindexname).
 highlight(highligt).
 query(boolquery).
 from((page - 1) * limit).
 size(limit).
 do(context.background())

 if err2 != nil {
 // handle error
 getlogger().logerr("搜索时出错 "+err2.error(), "search_error")
 // handle error
 returnerr = errors.new("搜索时出错")
 } else {
 if searchresult.hits.totalhits > 0 {
  // iterate through results
  for _, hit := range searchresult.hits.hits {
  var p papersearch
  err := json.unmarshal(*hit.source, &p)
  if err != nil {
   // deserialization failed
   getlogger().logerr("搜索时出错 "+err.error(), "search_deserialization_error")
   returnerr = errors.new("搜索时出错")
   return
  }

  if len(hit.highlight[elastic_search_search_field_name]) > 0 {
   p.name = hit.highlight[elastic_search_search_field_name][0]
  }

  list = append(list, p)
  }

  count := searchresult.totalhits()

  currentpage = page
  if count > 0 {
  totalpage = int(math.ceil(float64(count) / float64(limit)))
  }
  if currentpage >= totalpage {
  pageisend = 1
  }
 } else {
  // no hits
 }
 }
 return
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。