通过springboot 同步pgsql数据到kafka 发送到logstash 再同步到es
程序员文章站
2022-03-07 11:25:54
安装 es下载安装包 es安装包地址kafka安装brew install kafka 安装的配置文件位置/usr/local/etc/kafka/server.properties/usr/local/etc/kafka/zookeeper.properties启动kafka运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper启动zookeeperzookee...
安装 es
下载安装包 es安装包地址
kafka安装
brew install kafka
安装的配置文件位置
/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties
启动kafka
运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper
启动zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
启动kafka
kafka-server-start /usr/local/etc/kafka/server.properties
安装logstash
安装步骤略
重点:配置文件
# Sample Logstash configuration for creating a simple
# Kafka -> Logstash -> Elasticsearch pipeline.
input {
tcp {
host => "127.0.0.1"
port => 5544
type => tcp
codec => json
}
kafka {
bootstrap_servers => "localhost:9092"
topics => ["BookList","BookDuplicate"]
decorate_events => true #可向事件添加Kafka元数据比如 topic,消费者组
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/folio"
jdbc_user => "folio"
jdbc_password => "folio123" #更换自己的数据库用户名和密码
jdbc_driver_library => "/Users/yanziyu/logstash/logstash-7.7.1/bin/postgresql-42.2.14.jar" #更换自己的驱动地址
jdbc_driver_class => "org.postgresql.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "300000"
use_column_value => "true"
tracking_column => "id"
#这是要操作的sql 表,下面会提供
statement => "select jsonb::TEXT from shlibrary_mod_shl_inventory.booklist_add"
schedule => "* * * * *"
type => "jdbc_booklist"
jdbc_default_timezone =>"Asia/Shanghai"
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/folio"
jdbc_user => "folio"
jdbc_password => "folio123" #更换自己的数据库用户名和密码
jdbc_driver_library => "/Users/yanziyu/logstash/logstash-7.7.1/bin/postgresql-42.2.14.jar" #更换自己的驱动地址
jdbc_driver_class => "org.postgresql.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "300000"
use_column_value => "true"
tracking_column => "id"
#这是要操作的sql 表,下面会提供
statement => "select jsonb::TEXT from shlibrary_mod_shl_inventory.bookduplicate_add"
schedule => "* * * * *"
type => "jdbc_bookduplicate"
jdbc_default_timezone =>"Asia/Shanghai"
}
}
filter {
json {
source => "message" #将message中数据分成对应的field
#remove_field => ["message"]
}
mutate {
remove_field => ["@version","@timestamp"] #移除字段
}
}
output {
if [type] == "tcp" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "dataloader-es-log"
}
}
if [type]== "jdbc_booklist" {
kafka {
bootstrap_servers => "localhost:9092" #kafka服务器地址
topic_id => "BookList_Add"
batch_size => 5
codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
}
}
if [type]== "jdbc_bookduplicate" {
kafka {
bootstrap_servers => "localhost:9092" #kafka服务器地址
topic_id => "BookDuplicate_Add"
batch_size => 5
codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式
}
}
if [@metadata][kafka][topic] == "BookList" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "booklist"
document_id => "%{id}" #将message中的id设置为_doc的Id
}
}
if [@metadata][kafka][topic] == "BookDuplicate" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "bookduplicate"
document_id => "%{id}" #将message中的id设置为_doc的Id
}
}
}
本文地址:https://blog.csdn.net/yanziyu0721730/article/details/107456723