欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于logstash实现mysql与elasticsearch数据同步

程序员文章站 2022-05-09 14:17:37
...

Linux安装docker,docker安装mysql,es,kibana请看这篇文章
下载logstash
提取码:s4sn
下载mysql-connector-java-5.1.46.jar
提取码:r3gn
上传到服务器并解压

tar -zxvf logstash-6.4.3.tar.gz

进入 logstash-6.4.3目录
如果未安装jdk则用yum安装jdk
可以使用命令查询想要安装的jdk版本

1.查看jdk版本
yum search java | grep -i --color jdk
2.选择安装
yum install java-1.8.0-openjdk-devel.x86_64

在logstash-6.4.3目录下安装

bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch

mysql创建表如下
基于logstash实现mysql与elasticsearch数据同步
配置***.conf,并将jar和conf文件放到/usr/local/sql目录下,(自己指定即可,但注意配置)
指定jar路径并配置mysql数据库连接和同步表查询,配置es索引

input {
  jdbc {
    # 指定mysql连接jar
    jdbc_driver_library => "/usr/local/sql/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 连接mysql地址
    jdbc_connection_string => "jdbc:mysql://ip:3306/test?useSSL=false"
    jdbc_user => "root"
    jdbc_password => "root"
    # 设置心跳时间为每分钟
    schedule => "* * * * *"
    # 每分钟去查询上一次update_time时间以及之后的数据增量到es对应索引中
    statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "syncpoint_table"
  }
}


output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.255.136:9200"]
        # 索引名称 可自定义
        index => "user"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
        document_type => "user"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

然后进入到/usr/local/logstash-6.4.3/config目录下打开并编辑vi pipelines.yml在最后一行配置
可以配置多个

- pipeline.id: user
  path.config: "/usr/local/sql/mysql_1.conf"
- pipeline.id: member
  path.config: "/usr/local/sql/mysql_2.conf"

在启动mysql和es和kibana的前提下,进入到/usr/local/logstash-6.4.3目录启动logstash

./bin/logstash

启动成功我们可以看到sql执行去同步到es
基于logstash实现mysql与elasticsearch数据同步
kibana中查询可以看到数据已经同步过来
基于logstash实现mysql与elasticsearch数据同步
我们再添加一条新数据,这个时候可以看到新数据也已经同步过来
基于logstash实现mysql与elasticsearch数据同步
基于logstash实现mysql与elasticsearch数据同步
到此,基于logstash同步mysql-es已经完成