LogStash 导入 oracle(mysql)数据
程序员文章站
2022-06-11 11:56:53
...
LogStash 导入 oracle(mysql)数据
版本:
ES 7.13.3
LogStash 7.13.4
1. 背景
因为一些原因需要将oracle的数据导入到Es中,A、B两个表的数据构成商品信息,组成products索引。
分词器采用ik_max_word 对name相关的字段采用搜索自动补全功能。
2. 数据导入
首先默认我们已经按转好了 ES和Logstash环境。
下载 oracle驱动
2.1 编写sql文件
#ROWNUM 是oracle自带的 可以查出当前数据是第多少条数据,用于做#ROWNUM 是oracle自带的 可以查出当前数据是第多少条数据,用于做功能导入数据使用
#suggest 会根据这个字段做猜你所想的功能具体分下见下文
SELECT ROWNUM,NAME AS suggest,STATUS,AGE FORM A
SELECT ROWNUM,NAME AS suggest FROM B
2.1 编写数据输入源和输出源代文件
input:
1.配置数据库相关信息见注释
2.配置增量导入实现的标记,需要注意/path/id/ 文件夹是需要真实存在的 。当然我们也可以根据时间做增量,具体场景具体分析。
output
1.es配置 地址、账号、密码、类型和document_id等
2.分词器魔板文件,如果需要制定分析器的话。
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@ip/databaseName"
jdbc_user => "userName"
jdbc_password => "pwd"
jdbc_driver_library =>
"/path/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
codec => plain { charset => "UTF-8"}
statement_filepath => "/path/your.sql"
schedule => "*/2 * * * *"
type => "bp_dealer_rp"
use_column_value => "true"
record_last_run => "true"
# 增量数据需要,将最后一次查询的rownum存入到下边的配置文件中去,下次导入数据 直接从rownum之后导入,需要根据自己的场景选择对应的 增量导入模式
tracking_column => "rownum"
last_run_metadata_path=> "/path/id/file1"
clean_run =>"false"
}
}
filter {
}
output {
elasticsearch {
hosts => ["40.18.15.23:9200"]
user => "elastic"
password => "pwd"
index => "products"
# 因为是多个表导入到一个索引当中 后边添加type字段区分不同表的数据。
document_id => "%{rownum}-type1"
# 模板名称
template_name => "products"
#自定义配置位置 绝对路径
template => "/path/ik.json"
#重写模板
template_overwrite => true
#manage_template => false
}
stdout {
codec => json_lines
}
}
2.2 修改pipeline文件
pipeline 是logstash的一个工作流可以执行多个任务,path.conf配置的就是我们上文所编写的文件,这里边需要配置两个任务。
- pipeline.id: task1
pipeline.workers: 2
pipeline.batch.size: 256
path.config: ""
queue.type: persisted
- pipeline.id: task2
pipeline.workers: 2
pipeline.batch.size: 256
path.config: ""
queue.type: persisted
2.3 自定义分词器模板
因为需要指定需要的分词器,有两种方法
1.直接创建索引指定对应的分词器后再用logstash导入
2.配置logstash自定义分词器魔板
在上文中的template 目录下创建对应的 json文件
{
"template": "products",
"index_patterns": [
"products"
],
"settings": {
"index": {
"number_of_shards": "1",
"refresh_interval": "5s"
}
},
"mappings": {
"properties": {
"suggest" : {
"type": "completion",
"analyzer": "ik_smart",
"search_analyzer": "ik_smart"
},
"filed1": {
"type": "text",
#指定分词器类型
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"filed1": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"filed1": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"filed1": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
猜你所想功能
我们做了如下配置,contexts主要是我们为了进行条件过滤添加的如果需要在插叙的时候通过STATUS进行过滤,就可以配置上这个信息,支持多个条件的筛选
"suggest" : {
"type": "completion",
"analyzer": "ik_smart",
"search_analyzer": "ik_smart",
"contexts": [
{
"name": "delete_status_cat",
"type": "category",
"path": "status"
}
]
}
启动logstash
#直接启动即可,这里会自动加载我们之前修改的pepilines文件
logstash
Kibana查看效果
mappings:
{
"products" : {
"order" : 0,
"index_patterns" : [
"products"
],
"settings" : {
"index" : {
"number_of_shards" : "1",
"refresh_interval" : "5s"
}
},
"mappings" : {
"properties" : {
"suggest" : {
"search_analyzer" : "ik_smart",
"analyzer" : "ik_smart",
"type" : "completion"
}
},
#省略其他字段
...
"aliases" : { }
}
}
查询猜你所想功能数据
POST /products/_search?pretty
{
"_source": "suggest" ,
"suggest": {
"my-suggest": {
"prefix":"xx",
"completion":{
"field":"suggest",
"skip_duplicates":true,
"size":5
#如果需要其他字段筛选加上contest标签即可
}
}
}
}
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 0,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"suggest" : {
"my-suggest" : [
{
"text" : "XX",
"offset" : 0,
"length" : 2,
"options" : [
{
"text" : "XX易方达版",
"_index" : "products",
"_type" : "_doc",
"_id" : "33-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX易方达版"
}
},
{
"text" : "XX对接1111",
"_index" : "products",
"_type" : "_doc",
"_id" : "105-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX对接1111"
}
},
{
"text" : "XXnishiaosdas",
"_index" : "products",
"_type" : "_doc",
"_id" : "72-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XXnishiaosdas"
}
},
{
"text" : "XX三生三世十里桃花",
"_index" : "products",
"_type" : "_doc",
"_id" : "2-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX三生三世十里桃花"
}
},
{
"text" : "XX萨德萨斯大恶趣味趣味",
"_index" : "products",
"_type" : "_doc",
"_id" : "46-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX萨德萨斯大恶趣味趣味"
}
}
]
}
]
}
}
效果
{
"products" : {
"order" : 0,
"index_patterns" : [
"products"
],
"settings" : {
"index" : {
"number_of_shards" : "1",
"refresh_interval" : "5s"
}
},
"mappings" : {
"properties" : {
"suggest" : {
"search_analyzer" : "ik_smart",
"analyzer" : "ik_smart",
"type" : "completion"
}
},
"aliases" : { }
}
}
POST /products/_search?pretty
{
"_source": "suggest" ,
"suggest": {
"my-suggest": {
"prefix":"xx",
"completion":{
"field":"suggest",
"skip_duplicates":true,
"size":5
}
}
}
}
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 0,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"suggest" : {
"my-suggest" : [
{
"text" : "XX",
"offset" : 0,
"length" : 2,
"options" : [
{
"text" : "XX易方达版",
"_index" : "products",
"_type" : "_doc",
"_id" : "33-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX易方达版"
}
},
{
"text" : "XX对接30",
"_index" : "products",
"_type" : "_doc",
"_id" : "105-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX对接30"
}
},
{
"text" : "XX对接35",
"_index" : "products",
"_type" : "_doc",
"_id" : "72-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX对接35"
}
},
{
"text" : "XX对接37",
"_index" : "products",
"_type" : "_doc",
"_id" : "2-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX对接37"
}
},
{
"text" : "XX对接40",
"_index" : "products",
"_type" : "_doc",
"_id" : "46-jijin",
"_score" : 1.0,
"_source" : {
"suggest" : "XX对接40"
}
}
]
}
]
}
}
上一篇: mac下配置nginx