如何构建一个大数据量的搜索引擎
程序员文章站
2022-04-29 08:17:19
...
构建一个大数据量的搜索引擎,数据很重要,数据来源在哪里呢?一方面可以从站内结构化数据库导入,如MySQL,Oracle等数据库,构建一个站内搜索引擎,提高查询速度.另一方面构建一个分布式爬虫,每天定时抓取数据,不断地添加到索引库.典型地如百度,谷歌等全文检索引擎.
我们现在要做的就是第二种东西.说难不难,show That
1.定义一个实体,与索引库的type数据字段名一致.
package com.ytdx.entity;
import java.io.Serializable;
/**
* ES 索引对应实体
* @author lhy
*
*/
public class Blob implements Serializable {
private Integer id; //文章id
private String title; //文章标题
private String describe; //描述
private String url; //文章路径
private String ImageUrl; //图片路径
private String postInfo; //发布信息
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getDescribe() {
return describe;
}
public void setDescribe(String describe) {
this.describe = describe;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getImageUrl() {
return ImageUrl;
}
public void setImageUrl(String imageUrl) {
ImageUrl = imageUrl;
}
public String getPostInfo() {
return postInfo;
}
public void setPostInfo(String postInfo) {
this.postInfo = postInfo;
}
@Override
public String toString() {
return "Blob [id=" + id + ", title=" + title + ", describe=" + describe + ", url=" + url + ", ImageUrl="
+ ImageUrl + ", postInfo=" + postInfo + "]";
}
}
2.ES服务操作的工具类
package com.ytdx.util;
import java.net.InetAddress;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
/**
* ES服务器操作工具类
* @author lhy
* @date 2018.04.20
*/
public class EsUtil {
private Client client;
private static String name = "127.0.0.1";
private static Integer port = 9300;
// private static ObjectMapper objectMapper = new ObjectMapper();
static Settings set = Settings.builder().put("cluster.name", "elasticsearch").build();
//.put("client.transport.sniff",false).
/**
* 本地客户端连接ES服务器
* @return
*/
public static Client EsConnect(){
TransportClient client = null;
try {
client = new PreBuiltTransportClient(set)
.addTransportAddress(new TransportAddress(InetAddress.getByName(name), port));
System.out.println("ES服务器连接成功!");
return client;
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("ES服务器连接失败!");
e.printStackTrace();
}
return client;
}
/**
* 关闭ES连接的客户端
* @param client
*/
public static void closeClient(Client client){
if(client != null){
client.close();
System.out.println("Client已关闭!");
}
}
}
3.索引库的批量操作
package com.ytdx.es;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentFactory;
import com.ytdx.entity.Blob;
public class BulkIndex {
private Client client;
private String index = "home";
private String type = "blob";
public BulkIndex(Client client) {
super();
this.client = client;
}
//批量添加索引
public void BulkAddBlob(Blob blob) throws Exception {
BulkRequestBuilder builder = client.prepareBulk();
builder.add(
client.prepareIndex(index, type)
.setSource(XContentFactory.jsonBuilder().startObject().field("title", blob.getTitle())
.field("describe", blob.getDescribe()).field("url", blob.getUrl())
.field("ImageUrl", blob.getImageUrl()).field("postInfo", blob.getPostInfo()).endObject()));
BulkResponse response = builder.execute().actionGet();
// for (BulkItemResponse item : response.getItems()) {
// System.out.println("你的批量操作Response信息为: " + item.getResponse());
// }
// 4.错误信息日志读取
if (response.hasFailures()) {
// 可在这里对于失败请求进行处理
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
System.out.println("失败信息:--------" + item.getFailureMessage());
}
}
}
}
//批量删除索引库
public void BulkDelIndex() throws Exception {
DeleteIndexResponse Response = client.admin().indices().prepareDelete(index)
.execute().actionGet();
System.out.println("索引删除成功!");
}
//批量创建结构化索引库
public void BulkAdd() throws Exception {
BulkRequestBuilder builder = client.prepareBulk();
builder.add(client.prepareIndex(index, type)
.setSource(XContentFactory.jsonBuilder().startObject().startObject("properties")
.startObject("id").field("type", "integer").field("store", "yes").endObject()
.startObject("title").field("type", "text").field("store", "yes").endObject()
.startObject("describe").field("type", "text").field("store", "yes").endObject()
.startObject("url").field("type", "text").field("store", "yes").endObject()
.startObject("ImageUrl").field("type", "text").field("store", "yes").endObject()
.startObject("postInfo").field("type", "text").field("store", "yes").endObject()
.endObject().endObject()));
BulkResponse response = builder.execute().actionGet();
System.out.println("索引库创建成功!");
}
}
4.分布式爬虫定时抓取数据,构建大数据量的索引库package com.ytdx.task;
import org.elasticsearch.client.Client;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.ytdx.entity.Blob;
import com.ytdx.es.BulkIndex;
import com.ytdx.util.EsUtil;
@Component("FetchDataTask")
public class FetchData {
@Scheduled(cron = "0 30 12 ? * *") // 每天上午12:30抓取一次数据
public void TimeTask() throws Exception {
Client client = EsUtil.EsConnect();
BulkIndex bulk = new BulkIndex(client);
bulk.BulkDelIndex(); //先删除之前的索引库
bulk.BulkAdd(); //结构化重建索引库
String URL = "https://www.cnblogs.com/";
Fetch(client,URL); // 先抓取首页数据
for(int l=2;l<=200; l++){ //抓取从第2页到200页的数据
String url = "https://www.cnblogs.com/#p";
url += l;
Fetch(client,url);
}
System.out.println("系统抓取博客数据成功!");
}
public void Fetch(Client client,String URL) throws Exception {
// Client client = EsUtil.EsConnect();
BulkIndex bulk = new BulkIndex(client);
Document doc = Jsoup.connect(URL).get();
Element e = doc.getElementById("post_list");
Elements es = e.children();
for (int i = 0; i < es.size(); i++) {
Element nodes = es.get(i);
Element item = nodes.getElementsByClass("post_item_body").first();
String title = item.getElementsByTag("h3").get(0).getElementsByTag("a").text();
String describe = item.getElementsByTag("p").text();
String url = item.getElementsByTag("h3").get(0).getElementsByTag("a").attr("href");
String ImageUrl = "";
if (item.getElementsByTag("p").get(0).getElementsByTag("a").size() >= 1) {
ImageUrl = item.getElementsByTag("p").get(0).getElementsByTag("a").get(0).getElementsByTag("img").attr("src");
}
String postInfo = item.getElementsByClass("post_item_foot").first().text();
//添加抓取数据到索引库
Blob blob = new Blob();
blob.setTitle(title);
blob.setDescribe(describe);
blob.setUrl(url);
blob.setImageUrl(ImageUrl);
blob.setPostInfo(postInfo);
bulk.BulkAddBlob(blob);
}
}
}
5.后台的数据检索功能实现
package com.ytdx.es;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import com.ytdx.entity.Blob;
public class EsQuery {
private Client client;
private String index = "home";
private String type = "blob";
public EsQuery(Client client) {
super();
this.client = client;
}
/**
* 将查询后获得的response转成list
* @param client
* @param response
* @return
*/
public List<Blob> responseToList(Client client, SearchResponse response) {
SearchHits hits = response.getHits();
// List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
List<Blob> list = new ArrayList<Blob>();
for (int i = 0; i < hits.getHits().length; i++) {
Map<String, Object> map = hits.getAt(i).getSourceAsMap();
Blob blob = new Blob();
if(map.containsKey("title")){
blob.setTitle(map.get("title").toString());
}else if(map.containsKey("describe")){
blob.setDescribe(map.get("describe").toString());
}else if(map.containsKey("url")){
blob.setUrl(map.get("url").toString());
}else if(map.containsKey("ImageUrl")){
blob.setImageUrl(map.get("ImageUrl").toString());
}else if(map.containsKey("postInfo")){
blob.setPostInfo(map.get("postInfo").toString());
}
list.add(blob);
}
return list;
}
/**
* 返回全部索引数据
* @return
*/
public List<Blob> getAll() {
SearchResponse response = client.prepareSearch(index).setTypes(type) // 设置索引类型
.setQuery(QueryBuilders.matchAllQuery())
.setSize(30)
.setScroll(TimeValue.timeValueMinutes(30))
.execute()
.actionGet();
return responseToList(client, response);
}
/**
* 对title和describe进行检索
* @param values
* @return
*/
public List<Blob> getFilter(String values) {
String filter1 = "title";
String filter2 = "describe";
HighlightBuilder hiBuilder = new HighlightBuilder();
hiBuilder.preTags("<span style=\"color:red\">"); //检索关键词高亮显示
hiBuilder.postTags("</span>");
hiBuilder.field(filter1);
hiBuilder.preTags("<span style=\"color:red\">");
hiBuilder.postTags("</span>");
hiBuilder.field(filter2);
SearchResponse response = client.prepareSearch(index).setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders
.multiMatchQuery(values, filter1, filter2))
.setFrom(0).setSize(10) //数据分页显示
.setScroll(TimeValue.timeValueMinutes(30)) // 设置过期时间为30分钟
.setExplain(true)
.highlighter(hiBuilder)
.execute()
.actionGet();
return responseToList(client, response);
}
}
6.最后用SSM接口映射到页面,就不写了.看看具体检索效果.搜索的数据达到6万多条数据检索,毫秒级返回,搜索速度可见非同一般.