pythonETL工具的使用
程序员文章站
2024-01-18 16:38:58
应对小规模业务场景使用专业的etl会有点费劲,还得增加运维监控成本,想想直接写个框架,用代码直接做etl数据接入
安装
pip install pyetl
使用
首...
应对小规模业务场景使用专业的etl会有点费劲,还得增加运维监控成本,想想直接写个框架,用代码直接做etl数据接入
安装
pip install pyetl
使用
首先看下最简单的demo
from pyetl import Etl class TestConfig: # 这里直接是把源库和目的库配置成了一个,只是为了演示使用 DST_URI = SRC_URI = {'uri': 'mysql+pymysql://root:hadoop@localhost:3306/test'} # 'test_src'和'test_dst'分别是源表名称和目的表名称 app = Etl('test_src', 'test_dst') app.config(TestConfig) app.run()
我们正常的业务需求肯定比这个复杂的多,这就涉及转换操作,这里的处理方式是添加udf函数
最简单的数据转换,比如某个字段全部大写处理(下面以id字段为例,一般不可能有这种简单的转换,这里只做演示)
app = Etl('src_table', 'dst_table', unique='id') # unique这个参数作用是确定唯一键,数据插入时会做merge操作 app.config(TestConfig) @app.add('id')(lambda x:x.upper()) # add(参数可以是str或list) 可以向任务注册某个或某几个字段的转换函数,可以是一个字段分解为多个字段,也可以是多个字段合并为一个字段 @app.befor(lambda app:app.dst.empty('dst_table')) # befor 函数是在开始一次etl前的预处理操作 @app.after(lambda app:app.dst.empty('src_table')) # after 函数是在一次etl完成后的扫尾操作 app.run(where='limit 10') # 这里where可以对数据源做数据筛选过滤过滤
下面是增量更新场景
mapping = {'id': 'id_code'} # mapping是当源库和目的库之间表字段不一致时配置的映射关系(key是目的库字段名,value是源库字段名,value可以是list表示两个字段合并到目的字段,怎么合并需要用add注册具体的处理函数) # src_update为更新标志字段(是源库的字段名),每次拉取数据都会记录时间点 app = Etl('src_table', 'dst_table', mapping=mapping, unique='id', src_update='update_time') app.config(TestConfig) app.run(days=1) # 这里的days作用是在原来更新时间点的基础上向前推days天,days为0时忽虐src_update字段记录的时间直接全表接入
创建项目
这里提供了一个简单的项目结构做参考,直接使用pyetl命令行生成一个简单的项目结构,其中的job.py就是我们的etl任务,数据库配置都在config.py里,根据实际环境添加
pyetl -b [project name]
以上命令创建好一个工程文件,需要根据个人环境进行配置,下面是一些参数说明 app/config.py文件(主要是SRC_URI和DST_URI参数) SRC_URI(源库配置) DST_URI(目的库配置) SRC_PLACEHOLDER(数据源驱动的占位符形式, 以sqlalchemy连接形式配置时不需要关注) QUERY_SIZE(单次获取数据量,根据实际环境机器性能,可以不关注) INSERT_SIZE(单次插入数据量,根据实际环境机器性能,可以不关注) app/etl/job.py文件,单个etl任务示例(主要是src_tab和dst_tab参数) src_tab(源表名称) dst_tab(目的表名称) mapping(可选关键字参数,目的表到源表的字段映射,当源表和目的表字段名称不一致是使用) src_update(可选关键字参数,源表的数据更新标志,只要是增长类型的都可以,比如数据变更时间字段,非空以增量形式插入) dst_unique(可选关键字参数,目的表的唯一键,非空以merge形式插入)
由于不同数据库驱动配置的形式都会不一样,因此这里统一的都采用字典的形式配置的,下面以连接impala为例:
DST_URI = {
‘host’: ‘192.168.1.1’,
‘port’: 21050,
‘use_kerberos’: True,
‘kerberos_service_name’: ‘impala’,
‘timeout’: 3600,
‘driver’: ‘impala.dbapi’,
}
这里要在原有impala驱动参数配置的基础上增加driver参数,来让代码识别对应的数据库驱动程序,没办法python没有像java一样统一的jdbc。