欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ElasticSearch入门:增删改查

程序员文章站 2022-07-05 08:05:42
...

描述:ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。(具体学习可查看https://blog.csdn.net/column/details/18392.html

安装API

pip install elasticsearch

建立es连接

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host':'10.10.13.12','port':9200}])

数据检索功能

es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')

常用参数

  • index - 索引名
  • q - 查询指定匹配 使用Lucene查询语法
  • from_ - 查询起始点  默认0
  • doc_type - 文档类型
  • size - 指定查询条数 默认10
  • field - 指定字段 逗号分隔
  • sort - 排序  字段:asc/desc
  • body - 使用Query DSL
  • scroll - 滚动查询

增删改查

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/9/10 16:28
# @Author  : HJH
# @File    : es.py
# @Software: PyCharm

from elasticsearch import Elasticsearch
from elasticsearch import helpers


class ESUtil(object):
    def __init__(self, localhost):
        self.localhost = localhost
        self.es = Elasticsearch(localhost)

    def create_train_index(self, index_name):
        '''
        创建模型索引
        '''

        _index_mappings = {
            "mappings": {
                index_name: {
                    "properties": {
                        "train_time": {
                            "type": "double",
                            "index": True
                        },
                        "status": {
                            "type": "keyword",
                            "index": True,
                        }
                    }
                }
            }
        }
        if self.es.indices.exists(index=index_name) is not True:
            res = self.es.indices.create(index=index_name, body=_index_mappings)
            # print(res)

    def insert_data(self, index_value, content_map):
        '''
        单条插入数据
        '''
        return self.es.index(index=index_value, doc_type=index_value, body=content_map)

    def delete_data(self, index_value, query_field, query_text):
        '''通过其他字段删除数据'''
        return self.es.delete_by_query(index=index_value, body={'query': {'term': {query_field: query_text}}})

    def delete_data_by_id(self, index_value, myid):
        '''通过id删除数据'''
        return self.es.delete(index=index_value, doc_type=index_value, id=myid)

    def delete_index(self, index_value):
        '''删除索引'''
        return self.es.indices.delete(index=index_value)

    def update_data(self, index_value, my_id, content_map):
        '''
        更新插入数据
        '''
        return self.es.update(index=index_value, doc_type=index_value, id=my_id, body=content_map)

    def get_data(self, index_value, id_value):
        '''
        通过主键查询数据
        '''
        return self.es.get(index=index_value, id=id_value)

    def search_data_10(self, index_value, query_field, query_text):
        '''
        搜索数据返回查询后的结果列表,只返回10个
        '''
        result_list = []
        id_list = []
        res = self.es.search(index=index_value, body={'query': {'term': {query_field: query_text}}})
        search_list = res["hits"]["hits"]
        for search in search_list:
            result_list.append(search["_source"])
            id_list.append(search['_id'])

        return result_list, id_list

    def search_data_user_defined(self, index_value, query_field, query_text, search_offset, search_size):
        '''
        搜索数据返回查询后的结果列表,自定义返回个数,最多1万个
        '''
        final_result = []
        es_result = self.es.search(
            index=index_value,
            body={"query": {'term': {query_field: query_text}}},
            from_=search_offset,
            size=search_size
        )
        result_items = es_result['hits']['hits']
        for item in result_items:
            final_result.append(item['_source'])
        return final_result

    def search_data_all(self, index_value, query_field, query_text, scroll='5m', timeout="1m"):
        '''
        搜索数据返回查询后的结果列表,返回全部
        '''
        final_result = []
        es_result = helpers.scan(
            client=self.es,
            query={"query": {'term': {query_field: query_text}}},
            scroll=scroll,
            index=index_value
        )
        for item in es_result:
            final_result.append(item['_source'])
        return final_result

    def search_lte_time(self, index_value, query_time, scroll='5m'):
        '''
        搜索数据返回查询后的结果列表,返回全部
        '''
        final_result = []

        es_result = helpers.scan(
            client=self.es,
            query={"query": {'range': {'update_time': {'lte': query_time}}}, "size": 2},
            scroll=scroll,
            index=index_value
        )

        for item in es_result:
            final_result.append(item['_source'])
        return final_result

    def search_gt_time(self, index_value, query_time, scroll='5m'):
        '''
        搜索数据返回查询后的结果列表,返回全部
        '''
        final_result = []
        es_result = helpers.scan(
            client=self.es,
            query={"query": {'range': {'update_time': {'gt': query_time}}}},
            scroll=scroll,
            index=index_value
        )
        for item in es_result:
            final_result.append(item['_source'])
        return final_result

    def get_max(self, index, query):
        '''找到最大值'''
        body = {
            "query": {
                "match_all": {}
            },
            "sort": {
                query: {
                    "order": "desc"
                }
            }
        }
        res = self.es.search(index=index, body=body)
        search_list = res["hits"]["hits"]
        if len(search_list) > 0:
            status = search_list[0]
        else:
            status = None

        return status

    def rename_idex(self, src, des):
        '''对索引重命名'''
        body = {"query": {"match_all": {}}}
        return helpers.reindex(client=self.es, source_index=src, target_index=des, target_client=self.es,
                               query=body, chunk_size=1000)


if __name__ == "__main__":
    es = ESUtil(['10.9.27.153:9200'])
    es.create_index("train")
    # 调用上述函数,此处省略