datax同步数据到elasticsearch、mongodb、hdfs(hive)示例
环境组件安装
服务器
192.168.23.132(spark01)
192.168.23.133(spark02)
192.168.23.134(spark03)
jdk1.8
mysql5.7(单机部署,此示例安装在192.168.23.132)
zk + elasticsearch + mongo(此示例安装在192.168.23.132、192.168.23.133、192.168.23.134)
elasticsearch-head-master(es界面化单机部署,此示例安装在192.168.23.132)
测试数据准备
mysql数据库名datax、表名park,10条测试数据
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for park
-- ----------------------------
DROP TABLE IF EXISTS `park`;
CREATE TABLE `park` (
`park_id` int(11) NOT NULL,
`shop_park_id` bigint(20) NULL DEFAULT NULL,
`park_code` int(11) NULL DEFAULT NULL,
`park_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`park_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`park_city` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`park_alias` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`update_time` timestamp(0) NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0),
PRIMARY KEY (`park_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of park
-- ----------------------------
INSERT INTO `park` VALUES (1, 801, 9527, 'A01园', 'A', '北京', 'XA', '2020-03-31 11:12:58');
INSERT INTO `park` VALUES (2, 802, 9528, 'A02园', 'A', '北京', 'XB', '2020-03-31 11:13:01');
INSERT INTO `park` VALUES (3, 803, 9529, 'A03园', 'A', '北京', 'XC', '2020-03-31 11:13:04');
INSERT INTO `park` VALUES (4, 804, 9530, 'B01园', 'B', '上海 ', 'HB', '2020-03-31 11:13:07');
INSERT INTO `park` VALUES (5, 805, 9531, 'B02园', 'B', '上海', 'HA', '2020-03-31 11:13:09');
INSERT INTO `park` VALUES (6, 806, 9532, 'C01园', 'C', '广州 ', 'WA', '2020-03-31 11:13:11');
INSERT INTO `park` VALUES (7, 807, 9533, 'C02园', 'C', '广州 ', 'WL', '2020-03-31 11:13:22');
INSERT INTO `park` VALUES (8, 808, 9534, 'C03园', 'C', '广州 ', 'WB', '2020-03-31 11:13:14');
INSERT INTO `park` VALUES (9, 809, 9535, 'C04园', 'C', '广州 ', 'WH', '2020-03-31 11:13:18');
INSERT INTO `park` VALUES (10, 810, 9536, 'D01园', 'D', '深圳', 'ZA', '2020-03-31 11:13:26');
SET FOREIGN_KEY_CHECKS = 1;
datax安装
下载地址https://github.com/alibaba/DataX,直接解压即可使用
此demo安装目录为 /export/servers/datax/
tar -zxvf /opt/datax.tar.gz -C /export/servers
运行demo示例,检查是否能正常启动datax(job.json为datax自带demo,路径 /export/servers/datax/job)
cd /export/servers/datax/job
python …/bin/datax.py …/job/job.json
运行结果如下
同步mysql数据到mongodb
job配置
cd /export/servers/datax/job
mkdir mongo_test.json
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select park_id,shop_park_id,park_code,park_name,park_type,park_city,park_alias,update_time from park;"
],
"jdbcUrl": [
"jdbc:mysql://192.168.23.132:3306/datax"
]
}
]
}
},
"writer": {
"name": "mongodbwriter",
"parameter": {
"address": [
"192.168.23.132:27017"
],
"userName": "",
"userPassword": "",
"dbName": "test",
"collectionName": "mongo_test",
"writeMode": {
"isReplace": "false",
"replaceKey": "_id"
},
"column": [
{ "name": "park_id","type": "long" },
{ "name": "shop_park_id","type": "long" },
{ "name": "park_code","type": "long" },
{ "name": "park_name","type": "string" },
{ "name": "park_type", "type": "string" },
{ "name": "park_city", "type": "string" },
{ "name": "park_alias", "type": "string" },
{ "name": "update_time", "type": "date" }
]
}
}
}
]
}
}
”isReplace”: “false”,不更新,直接插入,速度最快.
执行该job
cd /export/servers/datax/job
python …/bin/datax.py …/job/mongo_test.json
windows cmd命令登录检查数据
mongo --host 192.168.23.132 --port 27017
同步mysql数据到elasticsearch
1.由于此示例下载的datax无es writer组件
先要自行配置好es writer组件,datax同步mysql数据到elasticsearch组件下载地址
2.将组件解压到 /export/servers/datax/plugin/writer
3.job配置
cd /export/servers/datax/job
mkdir es_test.json
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select park_id,shop_park_id,park_code,park_name,park_type,park_city,park_alias,update_time from park;"
],
"jdbcUrl": [
"jdbc:mysql://192.168.23.132:3306/datax"
]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://192.168.23.132:9200",
"accessId": "12312313",
"accessKey": "1232222222",
"index": "test-datax",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 3, "number_of_replicas": 2}},
"discovery": false,
"batchSize": 1000,
"column": [
{ "name": "park_id","type": "long" },
{ "name": "shop_park_id","type": "long" },
{ "name": "park_code","type": "integer" },
{ "name": "park_name","type": "string" },
{ "name": "park_type", "type": "string" },
{ "name": "park_city", "type": "string" },
{ "name": "park_alias", "type": "string" },
{ "name": "update_time", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
]
}
}
}
]
}
}
endpoint 描述:ElasticSearch的连接地址 必选:是
accessId 描述:http auth中的user 必选:否
accessKey 描述:http auth中的password 必选:否
index 描述:elasticsearch中的index名 必选:是
type 描述:elasticsearch中index的type名 必选:否 默认值:index名
cleanup 描述:是否删除原表 必选:否 默认值:false
batchSize 描述:每次批量数据的条数 必选:否 默认值:1000
trySize 描述:失败后重试的次数 必选:否 默认值:30
timeout 描述:客户端超时时间 必选:否 默认值:600000
discovery 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。 必选:否 默认值:false
compression 描述:http请求,开启压缩 必选:否 默认值:true
multiThread 描述:http请求,是否有多线程 必选:否 默认值:true
ignoreWriteError 描述:忽略写入错误,不重试,继续写入 必选:否 默认值:false
ignoreParseError 描述:忽略解析数据格式错误,继续写入 必选:否 默认值:true
alias 描述:数据导入完成后写入别名 必选:否
aliasMode 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个) 必选:否 默认值:append
settings 描述:创建index时候的settings, 与elasticsearch官方相同 必选:否
splitter 描述:如果插入数据是array,就使用指定分隔符 必选:否 默认值:-,-
column 描述:elasticsearch所支持的字段类型,样例中包含了全部 必选:是
dynamic 描述: 不使用datax的mappings,使用es自己的自动mappings 必选: 否 默认值: false
执行该job
cd /export/servers/datax/job
python …/bin/datax.py …/job/es_test.json
浏览器登录es管理界面
http://192.168.23.132:9100/
同步SQL server数据到hdfs,并加载到hive
job配置
table1.json
{
"job": {
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["$connstr"],
"querySql": ["select * from $table_name where creationtime >= '${start_time}' and creationtime < '${end_time}'"]
}
],
"password": "$password",
"username": "$username"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name": "Id","type": "bigint"},
{"name": "PkId","type": "int"},
{"name": "Barcode","type": "String"},
{"name": "ClassId","type": "int"},
{"name": "Qty","type": "int"},
{"name": "TerminalId","type": "int"},
{"name": "Remark","type": "String"},
{"name": "CreationTime","type": "TIMESTAMP"},
{"name": "SaleClassId","type": "int"}
],
"defaultFS": "$defaultFS",
"fieldDelimiter": "$separator",
"fileName": "$fileName",
"fileType": "$fileType",
"path": "$datadir",
"writeMode": "$writeMode"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
},
"errorLimit":{
"percentage":0.02,
"record":0
}
}
}
}
执行脚本execute.sh
#!/bin/bash
source /etc/profile
#数据库连接信息
connstr="jdbc:sqlserver://11.99.99.99:1433;DatabaseName=databs"
username="root"
password="123456"
#源数据库表
#名获取开始和结束的日期
if [ $# -gt 1 ]
then
table_name=$1
start_time=$2
end_time=$3
else
table_name="table1,table2,table3,table4"
start_time=`date -d "-1 day" +%Y-%m-%d`
end_time=`date +%Y-%m-%d`
fi
#hdfs地址信息
defaultFS="hdfs://cdh03:8020"
#datax导入文件后的存储格式
fileType="text"
#写入hdfs的模式
writeMode="append"
#文件数据分隔符
separator='\t'
#hive库
database="ods_data_all"
array=(${table_name//,/ })
for var in ${array[@]}
do
#datax数据导入目录
datadir="/data/datax/databs/${var}/${start_time}/"
#datax数据导入存储的文件名
fileName='$start_time'
#创建数据导入目录命令
hdfs dfs -mkdir -p $datadir
#运行datax的命令
python /opt/datax/bin/datax.py -p "-Dconnstr='$connstr' -Dusername='$username' -Dpassword='$password' -Dtable_name='$var' -Dstart_time='$start_time' -Dend_time='$end_time' -DdefaultFS='$defaultFS' -DfileType='$fileType' -Ddatadir='$datadir' -DfileName='$fileName' -DwriteMode='$writeMode' -Dseparator='$separator'" --jvm="-Xms1G -Xmx1G" /opt/datax/job/${var}.json
#将数据加载进hive表
hive -e "use ${database};load data inpath '${defaultFS}/${datadir}/*' into table $var;"
done
总任务执行脚本execute.sh
sh execute.sh table_name start_time end_time
一共可以传三个参数
table_name:你要执行同步的表名,多个表用逗号分隔开(比如"table1,table2")
start_time,end_time:同步数据的时间段,这个时间段在每张表的where条件里面用时间戳字段来过滤
如果你提前确定了表名就可以在脚本里面配好,时间段默认是前一天。这样就不用传参数了直接执行脚本: sh execute.sh (记得要提前在hive里面建好对应表,导完数据会load进hive表)
参考链接
官方GitHub源码
通过datax导入数据到elasticsearch
DataX写入mongoDB速度比较
DataX 数据转储笔记
datax介绍及生产脚本配置