批量导入GIS数据到Elasticsearch中
GIS数据是指地理信息系统行业按照点、线、面等几何形状保存的矢量格式的数据,一般有通用的Shapefile数据、ESRI通过SDE保存在数据中的数据、Geodatabase数据、MAPGIS、SuperMap数据等。
ELK Stack套件中的Logstash可以实现从关系型数据库同步到Elasticsearch中,但是初始GIS数据往往涉及数量大、记录条数多等特点,研究如何将几亿、几十亿条记录快速导入到Elasticsearch中建立全文索引和空间索引就显得十分有必要。有多种方法可以批量将GIS数据导入到Elasticsearch中,但归根结底都是将JSON格式的文档插入到Elasticsearch的索引中。
大批量导入GIS数据到Elasticsearch中有两种方法,第一种是反复调用常规的PUT、POST请求,每个请求插入一条记录;第二种方法可以利用Bulk API来批量导入大量的文档。相比较而言,第二种方法因为减少了网络开销而比第一种方法效率更高。一次性批量导入的文件大小取决于多个因素:单个文档(document)的大小和复杂度,索引和查询的承受度以及集群的可用资源的多少。一般建议从1000到5000条记录,文件大小在5M到15M之间进行测试。
下面主要介绍利用Bulk API来进行批量导入数据。
方法1:在命令行环境下,利用CURL工具批量操作。
请求方式:
POST /_bulk
POST /<index>/_bulk
请求体(request body)格式:
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
批量操作都定义在请求体中,以NDJSON(newline delimited JSON)结构进行定义。
BULK批量操作可以是index、create、delete、update四种操作的结合。每个action_and_meta_data定义一种操作方式。Index和create操作的下一行需要定义数据源,并且和标准的index API具有相同的op_type语义:如果索引中已经存在相同名称的文档,那么create操作返回失败,而index操作则会新增或是替换原有的文档。Update操作需要在下一行中定义需要更新的文档内容、增量内容或者脚本。Delete操作在下一行不需要数据源,语义上跟标准的delete API相同。
curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
方法2:利用客户端实现批量请求
可以使用Perl或是Python的客户端来实现批量请求已经更新索引。
支持Perl语言的详见https://metacpan.org/pod/Search::Elasticsearch::Client::5_0::Bulk
支持Python的详见https://elasticsearch-py.readthedocs.io/en/master/helpers.html。
方法3:利用cURL提交文件格式的批量请求
可以将NDJSON格式的Bulk批量操作的内容保存在一个文本文件中,利用curl来进行提交操作。这时只要把—data-binary标记替换-d即可。
$ cat requests
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}