欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

从Mysql 批量导入数据到 elasticSearch

程序员文章站 2022-03-04 12:49:21
...


前言

从Mysql 批量导入 elasticSearch


提示:以下是本篇文章正文内容,下面案例可供参考

一、首先有个接收数据库的实体类

import com.alibaba.fastjson.annotation.JSONField;
import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.Date;
import java.util.Map;

public class Goods {

    private int id;
    private String title;
    private double price;
    private int stock;
    private int saleNum;
    private Date createTime;
    private String categoryName;
    private String brandName;
    private Map spec;

    @JsonFormat(pattern="yyyy-MM-dd")
    @JSONField(serialize = false)//在转换JSON时,忽略该字段

    private String specStr;//接收数据库的信息 "{}"


    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    public int getStock() {
        return stock;
    }

    public void setStock(int stock) {
        this.stock = stock;
    }

    public int getSaleNum() {
        return saleNum;
    }

    public void setSaleNum(int saleNum) {
        this.saleNum = saleNum;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public String getCategoryName() {
        return categoryName;
    }

    public void setCategoryName(String categoryName) {
        this.categoryName = categoryName;
    }

    public String getBrandName() {
        return brandName;
    }

    public void setBrandName(String brandName) {
        this.brandName = brandName;
    }

    public Map getSpec() {
        return spec;
    }

    public void setSpec(Map spec) {
        this.spec = spec;
    }

    public String getSpecStr() {
        return specStr;
    }

    public void setSpecStr(String specStr) {
        this.specStr = specStr;
    }


    @Override
    public String toString() {
        return "Goods{" +
                "id=" + id +
                ", title='" + title + '\'' +
                ", price=" + price +
                ", stock=" + stock +
                ", saleNum=" + saleNum +
                ", createTime=" + createTime +
                ", categoryName='" + categoryName + '\'' +
                ", brandName='" + brandName + '\'' +
                ", spec=" + spec +
                ", specStr='" + specStr + '\'' +
                '}';
    }
}


二、Dao层

代码如下(示例):

import com.itheima.elasticsearchdemo2.domain.Goods;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
@Mapper
public interface GoodsMapper {


    /**
     * 查询所有
     */
    public List<Goods> findAll();

}

三、Mapper

代码如下(示例):

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">


<mapper namespace="com.itheima.mapper.GoodsMapper">

    <select id="findAll" resultType="com.itheima.domain.Goods">
        select
              `id`         ,
              `title`       ,
              `price`       ,
              `stock`       ,
              `saleNum`     ,
              `createTime`  ,
              `categoryName`,
              `brandName`   ,
              `spec`  as specStr

         from goods


    </select>

</mapper>

四、application.yml 连接数据库的相关配置

代码如下(示例):

elasticsearch:    #ES的地址及端口
  host: localhost
  port: 9200


# datasource
spring:
  datasource:
    url: jdbc:mysql://host:3306/health
    username: root
    password: itheima
    driver-class-name: com.mysql.jdbc.Driver


# mybatis
mybatis:
  mapper-locations: classpath:mapper/*Mapper.xml # mapper映射文件路径
  type-aliases-package: com.itheima.domain  #识别 Dao层用的封装对象

五、用脚本创建下 索引库(注意:这里的Object 只支持对象与Map)

PUT goods
{
	"mappings": {
		"properties": {
			"title": {
				"type": "text",
				"analyzer": "ik_smart"
			},
			"price": { 
				"type": "double"
			},
			"createTime": {
				"type": "date"
			},
			"categoryName": {    
				"type": "keyword"
			},
			"brandName": {    
				"type": "keyword"
			},
			"spec": {        
				"type": "object"
			},
			"saleNum": {    
				"type": "integer"
			},
			"stock": {    
				"type": "integer"
			}
		}
	}
}

六、从Mysql批量导入elasticSearch

import com.alibaba.fastjson.JSON;
import com.itheima.domain.Goods;
import com.itheima.mapper.GoodsMapper;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.List;
import java.util.Map;


@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
    /**
     * 从Mysql 批量导入 elasticSearch
     */
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Autowired //引入dao
    private GoodsMapper goodsMapper;

    @org.junit.Test
    public void bulkRequest() throws IOException {

        //1.查询所有数据,mysql
        List<Goods> goodsList = goodsMapper.findAll();

        //2.bulk批量导入对象
        BulkRequest bulkRequest= new BulkRequest();

        //2.1 循环goodsList,创建IndexRequest添加数据
        for (Goods goods : goodsList) {


            //数据库存储的时JSON格式的字符串,我们可以先用字符串的方式从数据库获取到,
                                //然后再转变成Map在赋值
                                    //主要原因 ES里的Objiect只解析 Map与对象格式的JSON
            String specStr = goods.getSpecStr();
            Map map = JSON.parseObject(specStr, Map.class);
            goods.setSpec(map);
            String jsonString = JSON.toJSONString(goods);

            //新增添加数据对象                                                                          //记得给他指定你传入的格式
            IndexRequest indexRequest = new IndexRequest("goods").id(goods.getId() + "").source(jsonString, XContentType.JSON);
            //批量存入
            bulkRequest.add(indexRequest);
        }
        //3 执行索引对象 批量存入ES
        BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println(response.status()); //返回结果 ok
    }
}
 


总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。