Mongodb数据同步到Hive
思路:利用Mongodb的export工具导出数据成json格式;
load该json数据到临时hive表的一个字段;
从该临时hive表解析json落地成最终的表;
代码:
1. 利用Mongodb的export工具导出数据成json格式
$mongo_path/bin/mongoexport -h $host:27017 -u $user -p $password -d $db -c $collection --fields _id,listContent,title,content --type json -q $query -o $mongodb_data_path/article_inc.json
2.load该json数据到临时hive表的一个字段
hive -e "truncate table schema_name.table_name;"
hive -e "load data local inpath '$mongodb_data_path/article_inc.json' overwrite into table schema_name.table_name;"
3. 从该临时hive表解析json落地成最终的表
insert overwrite table schema2_name.table2_name
select
json_tuple("_id","title", "listContent", "content")
from schema_name.table_name;</code>
具体例子:(shell脚本)
yesterday=`date -d 'yesterday' '+%Y-%m-%d'`
DB_NAME=ods
MONGO_DATA=/home/hadoop/xxx/mongoExport/xxx.json
#-- MONGODB连接配置信息
mongoexport --host=(mongodb url) \
--port=端口 \
--username=用户名 \
--password=密码 \
--db=具体db \
--collection=表 \
--type=json \
--query="{crawl_time:{\"\$gte\":'$yesterday'}}" \
--out=${MONGO_DATA}
hive -e "CREATE TABLE IF NOT EXISTS ${DB_NAME}.xxx (params string);"
hive -e "LOAD DATA LOCAL INPATH '${MONGO_DATA}' OVERWRITE INTO TABLE ${DB_NAME}.xxx;"
rm -f ${MONGO_DATA}
hive -e "
insert overwrite table ${DB_NAME}.xxx partition (dt)
select
primary_key,
phone,
plat_code,
crawl_time,
jrjt_del_dt,
dt
from (
select primary_key,phone,plat_code,crawl_time,jrjt_del_dt,to_date(crawl_time) dt
from ${DB_NAME}.xxx
lateral view json_tuple(params,'primary_key','phone','plat_code','crawl_time','jrjt_del_dt') v1 as primary_key,
phone,plat_code,crawl_time,jrjt_del_dt
) base;"
上一篇: 数据仓库之数据质量管理
下一篇: GBASE 8s 数据库基本语法
推荐阅读
-
用python简单实现mysql数据同步到ElasticSearch的教程
-
MySQL数据以全量和增量方式,同步到ES搜索引擎
-
python读取json文件并将数据插入到mongodb的方法
-
cwrsync实现从linux到windows的数据同步备份
-
Python实现读取SQLServer数据并插入到MongoDB数据库的方法示例
-
用python简单实现mysql数据同步到ElasticSearch的教程
-
详解MongoDB数据还原及同步解决思路
-
SQL数据同步到ELK(四)- 利用SQL SERVER Track Data相关功能同步数据(上)
-
本地数据库(sql server)插入一条新数据时,同步到服务器数据库
-
DMHS单机库到单机库数据同步