欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Airflow笔记-MySqlOperator使用及conn配置

程序员文章站 2022-07-09 19:31:01
1. 依赖 的数据库交互通过 模块来实现, 使用前需要安装相关依赖: 2. 使用 使用 执行sql任务的一个简单例子: 3. 参数 接收几个参数: : 待执行的sql语句; : mysql数据库配置ID, Airflow的conn配置有两种配置方式,一是通过 来配置环境变量实现,二是通过web界面配 ......

1. 依赖

mysqloperator 的数据库交互通过 mysqldb 模块来实现, 使用前需要安装相关依赖:

pip install apache-airflow[mysql]

2. 使用

使用 mysqloperator 执行sql任务的一个简单例子:

from airflow import dag
from airflow.utils.dates import days_ago
from airflow.operators.mysql_operator import mysqloperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': false,
    'start_date': days_ago(1),
    'email': ['j_hao104@163.com'],
    'email_on_failure': true,
    'email_on_retry': false,
}

dag = dag(
    'mysqloperatorexample',
    default_args=default_args,
    description='mysqloperatorexample',
    schedule_interval="30 18 * * *")

insert_sql = "insert into log select * from temp_log"


task = mysqloperator(
    task_id='select_sql',
    sql=insert_sql,
    mysql_conn_id='mysql_conn',
    autocommit=true,
    dag=dag)

3. 参数

mysqloperator 接收几个参数:

  • sql: 待执行的sql语句;
  • mysql_conn_id: mysql数据库配置id, airflow的conn配置有两种配置方式,一是通过os.environ来配置环境变量实现,二是通过web界面配置到代码中,具体的配置方法会在下文描述;
  • parameters: 相当于mysqldb库的execute 方法的第二参数,比如: cur.execute('insert into userinfo values(%s,%s)',('alex',18));
  • autocommit: 自动执行 commit;
  • database: 用于覆盖conn配置中的数据库名称, 这样方便于连接统一个mysql的不同数据库;

4. conn配置

Airflow笔记-MySqlOperator使用及conn配置

建议conn配置通过web界面来配置,这样不用硬编码到代码中,关于配置中的各个参数:

  • conn id: 对应 mysqloperator 中的 mysql_conn_id
  • host: 数据库ip地址;
  • schema: 库名, 可以被mysqloperator中的database重写;
  • login: 登录用户名;
  • password: 登录密码;
  • port: 数据库端口;
  • extra: mysqldb.connect的额外参数,包含charsetcursorssllocal_infile

其中cursor的值的对应关系为: sscursor —> mysqldb.cursors.sscursor; dictcursor —> mysqldb.cursors.dictcursor; ssdictcursor —> mysqldb.cursors.ssdictcursor