使用pyspark模仿sqoop从oracle导数据到hive的主要功能(自动建表,分区导入,增量,解决数据换行符问题)
最近公司开始做大数据项目,让我使用sqoop(1.6.4版本)导数据进行数据分析计算,然而当我们将所有的工作流都放到azkaban上时整个流程跑完需要花费13分钟,而其中导数据(增量)就占了4分钟左右,老板给我提供了使用 spark 导数据的思路,学习整理了一个多星期,终于实现了sqoop的主要功能。
这里我使用的是pyspark完成的所有操作。
条件:hdfs平台,pyspark,ubuntu系统
运行:我这里是在 /usr/bin 目录下(或者指定在此目录下 )运行的python文件,也可以使用系统自带的pyspark
1 ./spark-submit --jars "/home/engyne/spark/ojdbc7.jar" --master local /home/engyne/spark/SparkDataBase.py
其中--jars 是指定连接oracle的驱动,ojdbc7.jar对应的是oracle12版本,--master local /...指定的是运行的python文件
注意:我的代码没有解决中文问题,所以不管是注释还是代码中都不能出现中文,记得删除!!!
1、pyspark连接oracle,导数据到hive(后面的代码需要在此篇代码基础上进行,重复代码不再copy了)
1 import sys 2 from pyspark.sql import HiveContext 3 from pyspark import SparkConf, SparkContext, SQLContext 4 5 conf = SparkConf().setAppName('inc_dd_openings') 6 sc = SparkContext(conf=conf) 7 sqlContext = HiveContext(sc) 8 9 #以下是为了在console中打印出表内容 10 reload(sys) 11 sys.setdefaultencoding("utf-8") 12 13 get_df_url = "jdbc:oracle:thin:@//192.168.1.1:1521/ORCLPDB" 14 get_df_driver = "oracle.jdbc.driver.OracleDriver" 15 get_df_user = "xxx" 16 get_df_password = "xxx" 17 18 df = sqlContext.read.format("jdbc") \ 19 .option("url", get_df_url) \ 20 .option("driver", get_df_driver) \ 21 .option("dbtable", "tableName") \ 22 .option("user", get_df_user).option("password", get_df_password) \ 23 .load() 24 #df.show() #可以查看到获取的表的内容,默认显示20行 25 sqlContext.sql("use databaseName") #databaseName指定使用hive中的数据库 26 #创建临时表 27 df.registerTempTable("tempTable") 28 #创建表并写入数据 29 sqlContext.sql("create table %s as select * from tempTable")
2、pyspark在hive中创建动态分区表
1 #修改一下hive的默认设置以支持动态分区 2 sqlContext.sql("set hive.exec.dynamic.partition=true") 3 sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict") 4 #设置hive支持创建分区文件的最大值 5 sqlContext.sql("SET hive.exec.max.dynamic.partitions=100000") 6 sqlContext.sql("SET hive.exec.max.dynamic.partitions.pernode=100000")
这里需要先手动创建分区表,我使用dataframe的dtypes属性获取到表结构,然后循环拼接表的每个字段在hive中所对应的类型
最后写入表数据的代码是:
1 sqlContext.sql("insert overwrite table STUDENT partition(AGE) SELECT ID,NAME,UPDATETIME,AGE FROM tempTable")
3、实现增量导入数据
我这里使用了MySql数据库,用来存储增量导入的信息,创建表(job)
DROP TABLE IF EXISTS `job`; CREATE TABLE `job` ( `id` int(10) NOT NULL AUTO_INCREMENT, `database_name` varchar(50) DEFAULT NULL, --数据库名称 `table_name` varchar(100) DEFAULT NULL, --需要增量导入的表名 `partition_column_name` varchar(100) DEFAULT NULL, --分区的字段名(这里只考虑对一个字段分区,如果多个字段这里应该使用一对多表结构吧) `partition_column_desc` varchar(50) DEFAULT NULL, --分区字段类型 `check_column` varchar(50) DEFAULT NULL, --根据(table_name中)此字段进行增量导入校验(我这里例子使用的是updatetime) `last_value` varchar(255) DEFAULT NULL, --校验值 `status` int(1) NOT NULL, --是否使用(1表示此job激活) PRIMARY KEY (`id`) ) INCREMENTAL=InnoDB AUTO_INCREMENT=81 DEFAULT CHARSET=utf8;
存储STUDENT表增量导入信息(这里是为了演示)
insert into `job`(`id`,`database_name`,`table_name`,`partition_column_name`,`partition_column_desc`,`check_column`,`last_value`,`status`)values (1,'test_datebase','STUDENT','AGE','string','UPDATETIME','2018-07-30',1)
python 连接MySql的方法我这里就直接怼代码了,具体详解大家就看
Ubuntu需要安装MySQLdb( sudo apt-get install python-mysqldb )
import MySQLdb # insert update delete def conMysqlDB_exec(sqlStr): db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' ) cursor = db.cursor() try: cursor.execute(sqlStr) db.commit() result = True except: print("---->MySqlError: execute error") result = False db.rollback() db.close return result # select def conMysqlDB_fetchall(sqlStr): db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' ) cursor = db.cursor() results = [] try: cursor.execute(sqlStr) results = cursor.fetchall() except: print("---->MySqlError: unable to fecth data") db.close return results
查询增量信息,使用spark进行导入
findJobSql = "SELECT * FROM job where status=1"
result = conMysqlDB_fetchall(findJobSql) databaseName = val[1] tableName = val[2] partitionColumnName = val[3] partitionColumnDesc = val[4] checkColumn = val[5] lastValue = val[6] sqlContext.sql("use database") df = sqlContext.read.format("jdbc") \ .option("url", "jdbc:oracle:thin:@//192.168.xxx.xxx:1521/ORCLPDB") \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .option("dbtable", "(select * from %s where to_char(%s, 'yyyy-MM-dd')>'%s')" % (tableName, checkColumn, lastValue)) \ #这里是关键,直接查询出新增的数据,这样后面的速度才能提升,否则要对整个表的dataframe进行操作,慢死了,千万不要相信dataframe的filter,where这些东西,4万多条数据要查3分钟!!! .option("user", "xxx").option("password", "xxx") \ .load()
def max(a, b):
if a>b:
return a
else:
return b
try: #获取到新增字段的最大值!!!(这块也困了我好久)这里使用的是python的reduce函数,调用的max方法 nowLastValue = df.rdd.reduce(max)[checkColumn]
df.registerTempTable("temp")#写入内容 saveSql = "insert into table student select * from temp" sqlContext.sql(saveSql) #更新mysql表,使lastValue是表最新值 updataJobSql = "UPDATE job SET last_value='%s' WHERE table_name='%s'" % (nowLastValue, tableName) if conMysqlDB_exec(updataJobSql): print("---->SUCCESS: incremental import success") except ValueError: print("---->INFO: No new data added!") except: print("---->ERROR: other error")
4、解决导入数据换行符问题
有时候oracle中的数据中会存在换行符(" \n ")然而hive1.1.0中数据换行默认识别的也是\n,最坑的是还不能对它进行修改(目前我没有查出修改的方法,大家要是有办法欢迎在评论区讨论)
那我只能对数据进行处理了,以前使用sqoop的时候也有这个问题,所幸sqoop有解决换行符的语句,,,,巴拉巴拉,,,扯远了
解决换行符需要dataframe的map方法,然后使用lambda表达式进行replace,总结好就是下面的代码(第3行)
解释:这是个for循环里面加if else 判断,整个需要用 [ ] 包起来,没错这是个list ,如果不包就报错,lambda x 获取到的是表中一行行的数据,for循环对每一行进行遍历,然后对一行中每个字段
进行判断,是否是unicode或者str类型,(一般只有是这两个类型才存在换行符)如果是则进行replace处理,否则不做处理。
转化好之后这是个rdd类型的数据,需要转化为dataframe类型才能写入hive
1 #df自带获取schema的方法,不要学我去拼凑出来(
相关文章:
-
-
1、首先解压jdk文件,例: 解压完成后进入jdk的bin目录里面,用 测试jdk是否可用。测试可用后开始配置环境: 进入/etc/profile文... [阅读全文]
-
小编带大家来分析一下,零基础入门学习大数据可以从事哪些工作呢? 2018年随着当代信息技术的迅猛发展,大数据在人们的工作、生产、生活、学习、娱乐等方... [阅读全文]
-
受访专家:上海交通大学医学院附属瑞金医院呼吸科博士周剑平上海读者罗女士问:近期流感高发,我现在带孩子... [阅读全文]
-
系统路径 文件操作 资源文件assets和RW res/raw:文件会被映射到R.java文件中,访问的时候直接通过资源ID访问,没有有目录结构 a... [阅读全文]
-
1.测试盗链(www.html2.com 盗取 www.html5.com的图片) 2.防止盗链 符合盗链 —— 重写 说明:if ($invali... [阅读全文]
-
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论