Airflow笔记-MySqlOperator使用及conn配置
程序员文章站
2022-07-09 19:31:01
1. 依赖 的数据库交互通过 模块来实现, 使用前需要安装相关依赖: 2. 使用 使用 执行sql任务的一个简单例子: 3. 参数 接收几个参数: : 待执行的sql语句; : mysql数据库配置ID, Airflow的conn配置有两种配置方式,一是通过 来配置环境变量实现,二是通过web界面配 ......
1. 依赖
mysqloperator
的数据库交互通过 mysqldb
模块来实现, 使用前需要安装相关依赖:
pip install apache-airflow[mysql]
2. 使用
使用 mysqloperator
执行sql任务的一个简单例子:
from airflow import dag from airflow.utils.dates import days_ago from airflow.operators.mysql_operator import mysqloperator default_args = { 'owner': 'airflow', 'depends_on_past': false, 'start_date': days_ago(1), 'email': ['j_hao104@163.com'], 'email_on_failure': true, 'email_on_retry': false, } dag = dag( 'mysqloperatorexample', default_args=default_args, description='mysqloperatorexample', schedule_interval="30 18 * * *") insert_sql = "insert into log select * from temp_log" task = mysqloperator( task_id='select_sql', sql=insert_sql, mysql_conn_id='mysql_conn', autocommit=true, dag=dag)
3. 参数
mysqloperator
接收几个参数:
-
sql
: 待执行的sql语句; -
mysql_conn_id
: mysql数据库配置id, airflow的conn配置有两种配置方式,一是通过os.environ
来配置环境变量实现,二是通过web界面配置到代码中,具体的配置方法会在下文描述; -
parameters
: 相当于mysqldb
库的execute
方法的第二参数,比如:cur.execute('insert into userinfo values(%s,%s)',('alex',18))
; -
autocommit
: 自动执行commit
; -
database
: 用于覆盖conn
配置中的数据库名称, 这样方便于连接统一个mysql的不同数据库;
4. conn
配置
建议conn
配置通过web界面来配置,这样不用硬编码到代码中,关于配置中的各个参数:
-
conn id
: 对应mysqloperator
中的mysql_conn_id
; -
host
: 数据库ip地址; -
schema
: 库名, 可以被mysqloperator
中的database
重写; -
login
: 登录用户名; -
password
: 登录密码; -
port
: 数据库端口; -
extra
:mysqldb.connect
的额外参数,包含charset
、cursor
、ssl
、local_infile
其中cursor
的值的对应关系为: sscursor
—> mysqldb.cursors.sscursor
; dictcursor
—> mysqldb.cursors.dictcursor
; ssdictcursor
—> mysqldb.cursors.ssdictcursor