欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

LogStash 导入 oracle(mysql)数据

程序员文章站 2022-06-11 11:56:53
...

LogStash 导入 oracle(mysql)数据

版本:

ES 7.13.3

LogStash 7.13.4

1. 背景

因为一些原因需要将oracle的数据导入到Es中,A、B两个表的数据构成商品信息,组成products索引。

分词器采用ik_max_word 对name相关的字段采用搜索自动补全功能。

2. 数据导入

首先默认我们已经按转好了 ES和Logstash环境。

下载 oracle驱动

2.1 编写sql文件
#ROWNUM 是oracle自带的 可以查出当前数据是第多少条数据,用于做#ROWNUM 是oracle自带的 可以查出当前数据是第多少条数据,用于做功能导入数据使用 
#suggest 会根据这个字段做猜你所想的功能具体分下见下文
SELECT ROWNUM,NAME AS suggest,STATUS,AGE FORM A

SELECT ROWNUM,NAME AS suggest FROM B
2.1 编写数据输入源和输出源代文件

input:

1.配置数据库相关信息见注释

2.配置增量导入实现的标记,需要注意/path/id/ 文件夹是需要真实存在的 。当然我们也可以根据时间做增量,具体场景具体分析。

output

1.es配置 地址、账号、密码、类型和document_id等

2.分词器魔板文件,如果需要制定分析器的话。

input {
   stdin {
   }
   jdbc {
          jdbc_connection_string => "jdbc:oracle:thin:@ip/databaseName"
          jdbc_user => "userName"
          jdbc_password => "pwd"
          jdbc_driver_library =>
          "/path/ojdbc6.jar"
      jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
          codec => plain { charset => "UTF-8"}
          statement_filepath => "/path/your.sql"
         schedule => "*/2 * * * *"
          type => "bp_dealer_rp"
        use_column_value => "true"
        record_last_run => "true"
        # 增量数据需要,将最后一次查询的rownum存入到下边的配置文件中去,下次导入数据 直接从rownum之后导入,需要根据自己的场景选择对应的 增量导入模式
         tracking_column => "rownum"
        last_run_metadata_path=> "/path/id/file1"
     clean_run =>"false"
   }
}

filter {

}

output {
   elasticsearch {
       hosts => ["40.18.15.23:9200"]
       user => "elastic"
      password => "pwd"
      index => "products"
      # 因为是多个表导入到一个索引当中 后边添加type字段区分不同表的数据。
     document_id => "%{rownum}-type1"
      # 模板名称
    template_name => "products"
    #自定义配置位置 绝对路径
    template => "/path/ik.json"
   #重写模板
    template_overwrite => true
   #manage_template => false
  }
  stdout {
       codec => json_lines
  }
}


2.2 修改pipeline文件

pipeline 是logstash的一个工作流可以执行多个任务,path.conf配置的就是我们上文所编写的文件,这里边需要配置两个任务。

- pipeline.id: task1
  pipeline.workers: 2
  pipeline.batch.size: 256
  path.config: ""
  queue.type: persisted


- pipeline.id: task2
  pipeline.workers: 2
  pipeline.batch.size: 256 
  path.config: ""
  queue.type: persisted  


2.3 自定义分词器模板

因为需要指定需要的分词器,有两种方法

1.直接创建索引指定对应的分词器后再用logstash导入

2.配置logstash自定义分词器魔板

在上文中的template 目录下创建对应的 json文件

{
 "template": "products",
  "index_patterns": [
    "products"
  ],
  "settings": {
    "index": {
      "number_of_shards": "1",
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "properties": {
      "suggest" : {
        "type": "completion",
        "analyzer": "ik_smart",
         "search_analyzer": "ik_smart"
      },
      "filed1": {
        "type": "text",
          #指定分词器类型
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "filed1": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "filed1": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "filed1": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      }
    }
} 

猜你所想功能

我们做了如下配置,contexts主要是我们为了进行条件过滤添加的如果需要在插叙的时候通过STATUS进行过滤,就可以配置上这个信息,支持多个条件的筛选

  "suggest" : {
        "type": "completion",
        "analyzer": "ik_smart",
         "search_analyzer": "ik_smart",
    
   				 "contexts": [
                          {
                             "name": "delete_status_cat",
                             "type": "category",
                             "path": "status"
                           }
                      ]
      }

启动logstash

#直接启动即可,这里会自动加载我们之前修改的pepilines文件
logstash

Kibana查看效果

mappings:

{
  "products" : {
    "order" : 0,
    "index_patterns" : [
      "products"
    ],
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "refresh_interval" : "5s"
      }
    },
    "mappings" : {
      "properties" : {
        "suggest" : {
          "search_analyzer" : "ik_smart",
          "analyzer" : "ik_smart",
          "type" : "completion"
        }
    },
      #省略其他字段
      ...
    "aliases" : { }
  }
}

查询猜你所想功能数据

POST /products/_search?pretty
{
 "_source": "suggest" ,
 "suggest": {
   "my-suggest": {
     "prefix":"xx",
     "completion":{
       "field":"suggest",
       "skip_duplicates":true,
       "size":5
   		#如果需要其他字段筛选加上contest标签即可
     }
   }
 }
}
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "suggest" : {
    "my-suggest" : [
      {
        "text" : "XX",
        "offset" : 0,
        "length" : 2,
        "options" : [
          {
            "text" : "XX易方达版",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "33-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX易方达版"
            }
          },
          {
            "text" : "XX对接1111",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "105-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX对接1111"
            }
          },
          {
            "text" : "XXnishiaosdas",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "72-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XXnishiaosdas"
            }
          },
          {
            "text" : "XX三生三世十里桃花",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "2-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX三生三世十里桃花"
            }
          },
          {
            "text" : "XX萨德萨斯大恶趣味趣味",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "46-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX萨德萨斯大恶趣味趣味"
            }
          }
        ]
      }
    ]
  }
}


效果

{
  "products" : {
    "order" : 0,
    "index_patterns" : [
      "products"
    ],
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "refresh_interval" : "5s"
      }
    },
    "mappings" : {
      "properties" : {
     
        "suggest" : {
          "search_analyzer" : "ik_smart",
          "analyzer" : "ik_smart",
          "type" : "completion"
        }
    },
    "aliases" : { }
  }
}

POST /products/_search?pretty
{
 "_source": "suggest" ,
 "suggest": {
   "my-suggest": {
     "prefix":"xx",
     "completion":{
       "field":"suggest",
       "skip_duplicates":true,
       "size":5
     }
   }
 }
}
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "suggest" : {
    "my-suggest" : [
      {
        "text" : "XX",
        "offset" : 0,
        "length" : 2,
        "options" : [
          {
            "text" : "XX易方达版",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "33-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX易方达版"
            }
          },
          {
            "text" : "XX对接30",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "105-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX对接30"
            }
          },
          {
            "text" : "XX对接35",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "72-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX对接35"
            }
          },
          {
            "text" : "XX对接37",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "2-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX对接37"
            }
          },
          {
            "text" : "XX对接40",
            "_index" : "products",
            "_type" : "_doc",
            "_id" : "46-jijin",
            "_score" : 1.0,
            "_source" : {
              "suggest" : "XX对接40"
            }
          }
        ]
      }
    ]
  }
}