ElasticSearch入门:增删改查
程序员文章站
2022-07-05 08:05:42
...
描述:ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。(具体学习可查看https://blog.csdn.net/column/details/18392.html)
安装API
pip install elasticsearch
建立es连接
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
数据检索功能
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
常用参数
- index - 索引名
- q - 查询指定匹配 使用Lucene查询语法
- from_ - 查询起始点 默认0
- doc_type - 文档类型
- size - 指定查询条数 默认10
- field - 指定字段 逗号分隔
- sort - 排序 字段:asc/desc
- body - 使用Query DSL
- scroll - 滚动查询
增删改查
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/9/10 16:28
# @Author : HJH
# @File : es.py
# @Software: PyCharm
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class ESUtil(object):
def __init__(self, localhost):
self.localhost = localhost
self.es = Elasticsearch(localhost)
def create_train_index(self, index_name):
'''
创建模型索引
'''
_index_mappings = {
"mappings": {
index_name: {
"properties": {
"train_time": {
"type": "double",
"index": True
},
"status": {
"type": "keyword",
"index": True,
}
}
}
}
}
if self.es.indices.exists(index=index_name) is not True:
res = self.es.indices.create(index=index_name, body=_index_mappings)
# print(res)
def insert_data(self, index_value, content_map):
'''
单条插入数据
'''
return self.es.index(index=index_value, doc_type=index_value, body=content_map)
def delete_data(self, index_value, query_field, query_text):
'''通过其他字段删除数据'''
return self.es.delete_by_query(index=index_value, body={'query': {'term': {query_field: query_text}}})
def delete_data_by_id(self, index_value, myid):
'''通过id删除数据'''
return self.es.delete(index=index_value, doc_type=index_value, id=myid)
def delete_index(self, index_value):
'''删除索引'''
return self.es.indices.delete(index=index_value)
def update_data(self, index_value, my_id, content_map):
'''
更新插入数据
'''
return self.es.update(index=index_value, doc_type=index_value, id=my_id, body=content_map)
def get_data(self, index_value, id_value):
'''
通过主键查询数据
'''
return self.es.get(index=index_value, id=id_value)
def search_data_10(self, index_value, query_field, query_text):
'''
搜索数据返回查询后的结果列表,只返回10个
'''
result_list = []
id_list = []
res = self.es.search(index=index_value, body={'query': {'term': {query_field: query_text}}})
search_list = res["hits"]["hits"]
for search in search_list:
result_list.append(search["_source"])
id_list.append(search['_id'])
return result_list, id_list
def search_data_user_defined(self, index_value, query_field, query_text, search_offset, search_size):
'''
搜索数据返回查询后的结果列表,自定义返回个数,最多1万个
'''
final_result = []
es_result = self.es.search(
index=index_value,
body={"query": {'term': {query_field: query_text}}},
from_=search_offset,
size=search_size
)
result_items = es_result['hits']['hits']
for item in result_items:
final_result.append(item['_source'])
return final_result
def search_data_all(self, index_value, query_field, query_text, scroll='5m', timeout="1m"):
'''
搜索数据返回查询后的结果列表,返回全部
'''
final_result = []
es_result = helpers.scan(
client=self.es,
query={"query": {'term': {query_field: query_text}}},
scroll=scroll,
index=index_value
)
for item in es_result:
final_result.append(item['_source'])
return final_result
def search_lte_time(self, index_value, query_time, scroll='5m'):
'''
搜索数据返回查询后的结果列表,返回全部
'''
final_result = []
es_result = helpers.scan(
client=self.es,
query={"query": {'range': {'update_time': {'lte': query_time}}}, "size": 2},
scroll=scroll,
index=index_value
)
for item in es_result:
final_result.append(item['_source'])
return final_result
def search_gt_time(self, index_value, query_time, scroll='5m'):
'''
搜索数据返回查询后的结果列表,返回全部
'''
final_result = []
es_result = helpers.scan(
client=self.es,
query={"query": {'range': {'update_time': {'gt': query_time}}}},
scroll=scroll,
index=index_value
)
for item in es_result:
final_result.append(item['_source'])
return final_result
def get_max(self, index, query):
'''找到最大值'''
body = {
"query": {
"match_all": {}
},
"sort": {
query: {
"order": "desc"
}
}
}
res = self.es.search(index=index, body=body)
search_list = res["hits"]["hits"]
if len(search_list) > 0:
status = search_list[0]
else:
status = None
return status
def rename_idex(self, src, des):
'''对索引重命名'''
body = {"query": {"match_all": {}}}
return helpers.reindex(client=self.es, source_index=src, target_index=des, target_client=self.es,
query=body, chunk_size=1000)
if __name__ == "__main__":
es = ESUtil(['10.9.27.153:9200'])
es.create_index("train")
# 调用上述函数,此处省略