欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

pythonETL工具的使用

程序员文章站 2024-01-18 16:38:58
应对小规模业务场景使用专业的etl会有点费劲,还得增加运维监控成本,想想直接写个框架,用代码直接做etl数据接入 安装 pip install pyetl 使用 首...

应对小规模业务场景使用专业的etl会有点费劲,还得增加运维监控成本,想想直接写个框架,用代码直接做etl数据接入

安装

pip install pyetl

使用

首先看下最简单的demo

from pyetl import Etl
class TestConfig:
    # 这里直接是把源库和目的库配置成了一个,只是为了演示使用
    DST_URI = SRC_URI = {'uri': 'mysql+pymysql://root:hadoop@localhost:3306/test'}
# 'test_src'和'test_dst'分别是源表名称和目的表名称
app = Etl('test_src', 'test_dst')
app.config(TestConfig)
app.run()

我们正常的业务需求肯定比这个复杂的多,这就涉及转换操作,这里的处理方式是添加udf函数

最简单的数据转换,比如某个字段全部大写处理(下面以id字段为例,一般不可能有这种简单的转换,这里只做演示)

app = Etl('src_table', 'dst_table', unique='id') # unique这个参数作用是确定唯一键,数据插入时会做merge操作
app.config(TestConfig)
@app.add('id')(lambda x:x.upper()) # add(参数可以是str或list) 可以向任务注册某个或某几个字段的转换函数,可以是一个字段分解为多个字段,也可以是多个字段合并为一个字段
@app.befor(lambda app:app.dst.empty('dst_table')) # befor 函数是在开始一次etl前的预处理操作
@app.after(lambda app:app.dst.empty('src_table')) # after 函数是在一次etl完成后的扫尾操作
app.run(where='limit 10') # 这里where可以对数据源做数据筛选过滤过滤

下面是增量更新场景

mapping = {'id': 'id_code'} 
# mapping是当源库和目的库之间表字段不一致时配置的映射关系(key是目的库字段名,value是源库字段名,value可以是list表示两个字段合并到目的字段,怎么合并需要用add注册具体的处理函数)
# src_update为更新标志字段(是源库的字段名),每次拉取数据都会记录时间点
app = Etl('src_table', 'dst_table', mapping=mapping, unique='id', src_update='update_time')
app.config(TestConfig)
app.run(days=1) # 这里的days作用是在原来更新时间点的基础上向前推days天,days为0时忽虐src_update字段记录的时间直接全表接入

创建项目

这里提供了一个简单的项目结构做参考,直接使用pyetl命令行生成一个简单的项目结构,其中的job.py就是我们的etl任务,数据库配置都在config.py里,根据实际环境添加

pyetl -b [project name]

pythonETL工具的使用

以上命令创建好一个工程文件,需要根据个人环境进行配置,下面是一些参数说明
app/config.py文件(主要是SRC_URI和DST_URI参数)
    SRC_URI(源库配置)
    DST_URI(目的库配置)
    SRC_PLACEHOLDER(数据源驱动的占位符形式, 以sqlalchemy连接形式配置时不需要关注)
    QUERY_SIZE(单次获取数据量,根据实际环境机器性能,可以不关注)
    INSERT_SIZE(单次插入数据量,根据实际环境机器性能,可以不关注)

app/etl/job.py文件,单个etl任务示例(主要是src_tab和dst_tab参数)
    src_tab(源表名称)
    dst_tab(目的表名称)
    mapping(可选关键字参数,目的表到源表的字段映射,当源表和目的表字段名称不一致是使用)
    src_update(可选关键字参数,源表的数据更新标志,只要是增长类型的都可以,比如数据变更时间字段,非空以增量形式插入)
    dst_unique(可选关键字参数,目的表的唯一键,非空以merge形式插入)

由于不同数据库驱动配置的形式都会不一样,因此这里统一的都采用字典的形式配置的,下面以连接impala为例:

DST_URI = {

‘host’: ‘192.168.1.1’,

‘port’: 21050,

‘use_kerberos’: True,

‘kerberos_service_name’: ‘impala’,

‘timeout’: 3600,

‘driver’: ‘impala.dbapi’,

}

这里要在原有impala驱动参数配置的基础上增加driver参数,来让代码识别对应的数据库驱动程序,没办法python没有像java一样统一的jdbc。