欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

elasticsearch 数据导入到 mysql

程序员文章站 2022-06-11 11:38:27
...

最近在做一个数据分析的项目,数据来源是来自ES,但不允许每次去ES中拉取数据,所以需要从ES中拉取数据然后进行数据落地操作,本文选取落地到mysql数据库中,这样方便进行一些sql语句操作。

废话不多说,接下来正式开始, 由于要求快速,所以本文选择了Spark进行数据导出

1.  ElasticSearch与Spark已经有集成的jar包,且我的测试环境基于以下版本进行, 添加Maven依赖

scala-version:2.11.8

spark-version:2.2.0

elasticsearch-version:5.5.0

<dependencies>
 
   <dependency>
       <groupId>org.elasticsearch</groupId>
       <artifactId>elasticsearch-spark-20_2.11</artifactId>
       <version>6.2.4</version>
   </dependency>
 
</dependencies>

2. 功能实现,从ES读取然后写入mysql数据库

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.SparkConf
import org.elasticsearch.spark.rdd.EsSpark


object ES2Mysql {

  def main(args: Array[String]): Unit = {
    //配置spark与es连接
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ES2Mysql")
      .set("es.index.auto.create", "true")
      .set("es.nodes", "127.0.0.1")
      .set("es.port", "9200")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    //从es中index为test中查找内容为1的所有数据
    val rdd: RDD[(String, String)] = EsSpark.esJsonRDD(spark.sparkContext, "test", "?q=*1*")

    //将rdd转换成df
    import spark.implicits._
    val ds: Dataset[String] = rdd.map(_._2).toDS()
    val df: DataFrame = spark.read.json(ds)

    //将数据导入到mysql,其中test数据库必须存在,但是test表可以不存在,会自动创建表
    val jdbcUrl = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&characterEncoding=UTF-8"
    val prop = new Properties()
    prop.setProperty("user", "root")
    prop.setProperty("password", "1")
    df.write.mode(SaveMode.Append).jdbc(jdbcUrl,"test", prop)

    rdd.unpersist()
    spark.stop()
  }
}

到此,已经完成了批量数据从es导入到mysql中,有疑问欢迎大家一起讨论。