elasticsearch 数据导入到 mysql
程序员文章站
2022-06-11 11:38:27
...
最近在做一个数据分析的项目,数据来源是来自ES,但不允许每次去ES中拉取数据,所以需要从ES中拉取数据然后进行数据落地操作,本文选取落地到mysql数据库中,这样方便进行一些sql语句操作。
废话不多说,接下来正式开始, 由于要求快速,所以本文选择了Spark进行数据导出
1. ElasticSearch与Spark已经有集成的jar包,且我的测试环境基于以下版本进行, 添加Maven依赖
scala-version:2.11.8
spark-version:2.2.0
elasticsearch-version:5.5.0
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.2.4</version>
</dependency>
</dependencies>
2. 功能实现,从ES读取然后写入mysql数据库
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.SparkConf
import org.elasticsearch.spark.rdd.EsSpark
object ES2Mysql {
def main(args: Array[String]): Unit = {
//配置spark与es连接
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ES2Mysql")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1")
.set("es.port", "9200")
val spark = SparkSession.builder().config(conf).getOrCreate()
//从es中index为test中查找内容为1的所有数据
val rdd: RDD[(String, String)] = EsSpark.esJsonRDD(spark.sparkContext, "test", "?q=*1*")
//将rdd转换成df
import spark.implicits._
val ds: Dataset[String] = rdd.map(_._2).toDS()
val df: DataFrame = spark.read.json(ds)
//将数据导入到mysql,其中test数据库必须存在,但是test表可以不存在,会自动创建表
val jdbcUrl = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&characterEncoding=UTF-8"
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "1")
df.write.mode(SaveMode.Append).jdbc(jdbcUrl,"test", prop)
rdd.unpersist()
spark.stop()
}
}
到此,已经完成了批量数据从es导入到mysql中,有疑问欢迎大家一起讨论。
上一篇: Javascript中click与blur事件的顺序详析
下一篇: OpenGL学习之路之光照贴图