欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

datax同步数据到elasticsearch、mongodb、hdfs(hive)示例

程序员文章站 2022-06-16 08:42:20
...

环境组件安装

服务器
192.168.23.132(spark01)
192.168.23.133(spark02)
192.168.23.134(spark03)

jdk1.8
mysql5.7(单机部署,此示例安装在192.168.23.132)
zk + elasticsearch + mongo(此示例安装在192.168.23.132、192.168.23.133、192.168.23.134)
elasticsearch-head-master(es界面化单机部署,此示例安装在192.168.23.132)

测试数据准备
mysql数据库名datax、表名park,10条测试数据

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for park
-- ----------------------------
DROP TABLE IF EXISTS `park`;
CREATE TABLE `park`  (
  `park_id` int(11) NOT NULL,
  `shop_park_id` bigint(20) NULL DEFAULT NULL,
  `park_code` int(11) NULL DEFAULT NULL,
  `park_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
  `park_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
  `park_city` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
  `park_alias` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
  `update_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0),
  PRIMARY KEY (`park_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of park
-- ----------------------------
INSERT INTO `park` VALUES (1, 801, 9527, 'A01园', 'A', '北京', 'XA', '2020-03-31 11:12:58');
INSERT INTO `park` VALUES (2, 802, 9528, 'A02园', 'A', '北京', 'XB', '2020-03-31 11:13:01');
INSERT INTO `park` VALUES (3, 803, 9529, 'A03园', 'A', '北京', 'XC', '2020-03-31 11:13:04');
INSERT INTO `park` VALUES (4, 804, 9530, 'B01园', 'B', '上海 ', 'HB', '2020-03-31 11:13:07');
INSERT INTO `park` VALUES (5, 805, 9531, 'B02园', 'B', '上海', 'HA', '2020-03-31 11:13:09');
INSERT INTO `park` VALUES (6, 806, 9532, 'C01园', 'C', '广州 ', 'WA', '2020-03-31 11:13:11');
INSERT INTO `park` VALUES (7, 807, 9533, 'C02园', 'C', '广州 ', 'WL', '2020-03-31 11:13:22');
INSERT INTO `park` VALUES (8, 808, 9534, 'C03园', 'C', '广州 ', 'WB', '2020-03-31 11:13:14');
INSERT INTO `park` VALUES (9, 809, 9535, 'C04园', 'C', '广州 ', 'WH', '2020-03-31 11:13:18');
INSERT INTO `park` VALUES (10, 810, 9536, 'D01园', 'D', '深圳', 'ZA', '2020-03-31 11:13:26');

SET FOREIGN_KEY_CHECKS = 1;

datax同步数据到elasticsearch、mongodb、hdfs(hive)示例

datax安装

下载地址https://github.com/alibaba/DataX,直接解压即可使用
此demo安装目录为 /export/servers/datax/
tar -zxvf /opt/datax.tar.gz -C /export/servers
运行demo示例,检查是否能正常启动datax(job.json为datax自带demo,路径 /export/servers/datax/job)
cd /export/servers/datax/job
python …/bin/datax.py …/job/job.json
运行结果如下
datax同步数据到elasticsearch、mongodb、hdfs(hive)示例

同步mysql数据到mongodb

job配置
cd /export/servers/datax/job
mkdir mongo_test.json

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
                                    "select park_id,shop_park_id,park_code,park_name,park_type,park_city,park_alias,update_time from park;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.23.132:3306/datax"
                                ]
                            }
                        ]
                    }
                },
            	"writer": {
		    "name": "mongodbwriter",
        	    "parameter": {
			"address": [
                            "192.168.23.132:27017"
                        ],
                        "userName": "",
                        "userPassword": "",
                        "dbName": "test",
                        "collectionName": "mongo_test",
			"writeMode": {
                            "isReplace": "false",
                            "replaceKey": "_id"
                        },
			"column": [
              			{ "name": "park_id","type": "long" },
		                { "name": "shop_park_id","type": "long" },
		                { "name": "park_code","type": "long" },
		                { "name": "park_name","type": "string" },
		                { "name": "park_type", "type": "string" },
		                { "name": "park_city", "type": "string" },
	  	                { "name": "park_alias", "type": "string" },
		                { "name": "update_time", "type": "date" }
            ]
          }
        }


	    }
        ]
    }
}

”isReplace”: “false”,不更新,直接插入,速度最快.

执行该job
cd /export/servers/datax/job
python …/bin/datax.py …/job/mongo_test.json
datax同步数据到elasticsearch、mongodb、hdfs(hive)示例
windows cmd命令登录检查数据
mongo --host 192.168.23.132 --port 27017
datax同步数据到elasticsearch、mongodb、hdfs(hive)示例

同步mysql数据到elasticsearch

1.由于此示例下载的datax无es writer组件
先要自行配置好es writer组件,datax同步mysql数据到elasticsearch组件下载地址
2.将组件解压到 /export/servers/datax/plugin/writer
datax同步数据到elasticsearch、mongodb、hdfs(hive)示例
3.job配置
cd /export/servers/datax/job
mkdir es_test.json

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "querySql": [
       				    "select park_id,shop_park_id,park_code,park_name,park_type,park_city,park_alias,update_time from park;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.23.132:3306/datax"
                                ]
                            }
                        ]
                    }
                },
            	"writer": {
		    "name": "elasticsearchwriter",
        	    "parameter": {
            		"endpoint": "http://192.168.23.132:9200",
            		"accessId": "12312313",
                        "accessKey": "1232222222",
			"index": "test-datax",
            		"type": "default",
            		"cleanup": true,
            		"settings": {"index" :{"number_of_shards": 3, "number_of_replicas": 2}},
            		"discovery": false,
            		"batchSize": 1000,
            		"column": [
              			{ "name": "park_id","type": "long" },
		                { "name": "shop_park_id","type": "long" },
		                { "name": "park_code","type": "integer" },
		                { "name": "park_name","type": "string" },
		                { "name": "park_type", "type": "string" },
		                { "name": "park_city", "type": "string" },
	  	                { "name": "park_alias", "type": "string" },
		                { "name": "update_time", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
            ]
          }
        }


	    }
        ]
    }
}

endpoint 描述:ElasticSearch的连接地址 必选:是
accessId 描述:http auth中的user 必选:否
accessKey 描述:http auth中的password 必选:否
index 描述:elasticsearch中的index名 必选:是
type 描述:elasticsearch中index的type名 必选:否 默认值:index名
cleanup 描述:是否删除原表 必选:否 默认值:false
batchSize 描述:每次批量数据的条数 必选:否 默认值:1000
trySize 描述:失败后重试的次数 必选:否 默认值:30
timeout 描述:客户端超时时间 必选:否 默认值:600000
discovery 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。 必选:否 默认值:false
compression 描述:http请求,开启压缩 必选:否 默认值:true
multiThread 描述:http请求,是否有多线程 必选:否 默认值:true
ignoreWriteError 描述:忽略写入错误,不重试,继续写入 必选:否 默认值:false
ignoreParseError 描述:忽略解析数据格式错误,继续写入 必选:否 默认值:true
alias 描述:数据导入完成后写入别名 必选:否
aliasMode 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个) 必选:否 默认值:append
settings 描述:创建index时候的settings, 与elasticsearch官方相同 必选:否
splitter 描述:如果插入数据是array,就使用指定分隔符 必选:否 默认值:-,-
column 描述:elasticsearch所支持的字段类型,样例中包含了全部 必选:是
dynamic 描述: 不使用datax的mappings,使用es自己的自动mappings 必选: 否 默认值: false

官方文档说明

执行该job
cd /export/servers/datax/job
python …/bin/datax.py …/job/es_test.json
datax同步数据到elasticsearch、mongodb、hdfs(hive)示例
浏览器登录es管理界面
http://192.168.23.132:9100/
datax同步数据到elasticsearch、mongodb、hdfs(hive)示例

同步SQL server数据到hdfs,并加载到hive

job配置
table1.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["$connstr"],
                                "querySql": ["select * from $table_name where creationtime >= '${start_time}' and creationtime < '${end_time}'"]
                            }
                        ],
                        "password": "$password",
                        "username": "$username"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                                 {"name": "Id","type": "bigint"},
                                 {"name": "PkId","type": "int"},
                                 {"name": "Barcode","type": "String"},
                                 {"name": "ClassId","type": "int"},
                                 {"name": "Qty","type": "int"},
                                 {"name": "TerminalId","type": "int"},
                                 {"name": "Remark","type": "String"},
                                 {"name": "CreationTime","type": "TIMESTAMP"},
                                 {"name": "SaleClassId","type": "int"}
                                ],
                        "defaultFS": "$defaultFS",
                        "fieldDelimiter": "$separator",
                        "fileName": "$fileName",
                        "fileType": "$fileType",
                        "path": "$datadir",
                        "writeMode": "$writeMode"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "3"
            },
            "errorLimit":{
                "percentage":0.02,
                "record":0
            }
        }
    }
}

执行脚本execute.sh

#!/bin/bash
source /etc/profile
#数据库连接信息
connstr="jdbc:sqlserver://11.99.99.99:1433;DatabaseName=databs"
username="root"
password="123456"
#源数据库表
#名获取开始和结束的日期
if [ $# -gt 1 ]
then
    table_name=$1
    start_time=$2
    end_time=$3
else
    table_name="table1,table2,table3,table4"
    start_time=`date -d "-1 day" +%Y-%m-%d`
    end_time=`date +%Y-%m-%d`
fi

#hdfs地址信息
defaultFS="hdfs://cdh03:8020"
#datax导入文件后的存储格式
fileType="text"
#写入hdfs的模式
writeMode="append"
#文件数据分隔符
separator='\t'
#hive库
database="ods_data_all"

array=(${table_name//,/ })

for var in ${array[@]}
do

  #datax数据导入目录
  datadir="/data/datax/databs/${var}/${start_time}/"
  #datax数据导入存储的文件名
  fileName='$start_time'
  #创建数据导入目录命令
  hdfs dfs -mkdir -p $datadir
  #运行datax的命令
  python /opt/datax/bin/datax.py -p "-Dconnstr='$connstr' -Dusername='$username' -Dpassword='$password' -Dtable_name='$var' -Dstart_time='$start_time' -Dend_time='$end_time' -DdefaultFS='$defaultFS' -DfileType='$fileType' -Ddatadir='$datadir' -DfileName='$fileName' -DwriteMode='$writeMode' -Dseparator='$separator'" --jvm="-Xms1G -Xmx1G" /opt/datax/job/${var}.json
  #将数据加载进hive表
  hive -e "use ${database};load data inpath '${defaultFS}/${datadir}/*' into table $var;"
done

总任务执行脚本execute.sh
sh execute.sh table_name start_time end_time

一共可以传三个参数
table_name:你要执行同步的表名,多个表用逗号分隔开(比如"table1,table2")
start_time,end_time:同步数据的时间段,这个时间段在每张表的where条件里面用时间戳字段来过滤

如果你提前确定了表名就可以在脚本里面配好,时间段默认是前一天。这样就不用传参数了直接执行脚本: sh execute.sh (记得要提前在hive里面建好对应表,导完数据会load进hive表)

转载自datax介绍及生产脚本配置


参考链接
官方GitHub源码
通过datax导入数据到elasticsearch
DataX写入mongoDB速度比较
DataX 数据转储笔记
datax介绍及生产脚本配置

datax同步数据到elasticsearch、mongodb、hdfs(hive)示例

相关标签: 组件