欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apache Airflow安装

程序员文章站 2022-05-01 13:03:47
...

Apache Airflow安装

一、安装Python3

首先在CentOS7服务器上安装Python3,Python3安装可以参考下面这篇博客:
CentOS7安装Python3.7

二、安装mysql5.7.32

其次安装Mysql5.7数据库存储airflow元数据信息,Mysql5.7数据库安装可以参考下面这篇博客:
CentOS7离线安装mysql5.7.32

三、安装Apache Airflow

1)更新pip

[aaa@qq.com bin]# pip3 install --upgrade pip
[aaa@qq.com bin]# pip3 install --upgrade setuptools

2)安装开发库和基础环境

开发库和基础环境一定要安装,可以避免安装airflow过程中产生的很多异常报错


yum -y groupinstall 'Development Tools'
yum -y install lapack-devel blas-devel bzip2-devel ncurses-devel readline-devel sqlite-devel 
yum -y install tcl-devel tk-devel libxml2-devel xz-devel gdbm-devel openssl-devel zlib-devel 
yum -y install cyrus-sasl-devel gcc gcc-devel libffi-devel openssl-devel gcc-c++ libsasl2-dev 
yum install -y python3-devel libevent-devel mysql-devel
  

3)创建Python虚拟环境

创建Python虚拟环境安装Airflow

#创建虚拟环境
[aaa@qq.com moudle]# python3 -m venv /moudle/apache-airflow
#进入创建的虚拟环境存放目录
[aaa@qq.com moudle]# cd /moudle/apache-airflow
#**虚拟环境
[aaa@qq.com apache-airflow]# source /moudle/apache-airflow/bin/activate
#虚拟环境中安装Apache Airflow
(apache-airflow) [aaa@qq.com apache-airflow]#
pip3 install airflow[mysql,jdbc,celery] -i  https://mirrors.aliyun.com/pypi/simple
# 安装Apache airflow需要的一些安装包
(apache-airflow) [aaa@qq.com apache-airflow]# pip3 install flask_bcrypt sasl thrift_sasl pyhs2 -i  https://mirrors.aliyun.com/pypi/simple
(apache-airflow) [aaa@qq.com apache-airflow]# pip3 install pymysql mysql -i  https://mirrors.aliyun.com/pypi/simple

4)设置Airflow系统环境变量

vim /etc/profile
export AIRFLOW_HOME=/moudle/apache-airflow
export PATH=$PATH:$AIRFLOW_HOME/bin

#**环境变量
source /etc/profile

5)初始化Airflow

初始化airflow,产生Airflow配置文件airflow.cfg
出现Done没有报错则成功初始化airflow

(apache-airflow) [aaa@qq.com airflow]# airflow initdb
DB: sqlite:moudle/apache-airflow/airflow.db
[2020-11-28 11:27:09,311] {db.py:378} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
......
Done.

6)修改Airflow配置文件airflow.cfg

vim /moudle/apache-airflow/airflow.cfg
[core]

dags_folder = /moudle/apache-airflow/dags

base_log_folder = /moudle/apache-airflow/logs

# Logging level
logging_level = INFO

# Logging level for Flask-appbuilder UI
fab_logging_level = WARN

colored_console_log = True

# Log format for when Colored logs is enabled
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter

# Format of Log line
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Log filename format
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /moudle/apache-airflow/logs/dag_processor_manager/dag_processor_manager.log


default_timezone = utc

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://airflow:123456!aaa@qq.com/airflow

# The encoding for the databases
sql_engine_encoding = utf-8

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 30

load_examples = True

# Where your Airflow plugins are stored
plugins_folder = /moudle/apache-airflow/plugins


# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner

endpoint_url = http://192.168.239.133:8088

[api]
# How to authenticate users of the API. See
# https://airflow.apache.org/docs/stable/security.html for possible values.
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
#auth_backend = airflow.api.auth.backend.deny_all
auth_backend = airflow.api.auth.backend.default

[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://192.168.239.133:8088

default_ui_timezone = Asia/Shanghai

web_server_host = 192.168.239.133

web_server_port = 8088

# Default DAG view. Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree

# "Default DAG orientation. Valid values are:"
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR



[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]

# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
# smtp_user =
# Example: smtp_password = airflow
# smtp_password =
smtp_port = 25
smtp_mail_from = aaa@qq.com

[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = sqla+mysql://airflow:123456!aaa@qq.com:3306/airflow

# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://airflow:123456!aaa@qq.com:3306/airflow

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it ``airflow flower``. This defines the IP that Celery Flower runs on
flower_host = 0.0.0.0

# The root URL for Flower
# Example: flower_url_prefix = /flower
flower_url_prefix =

7)Mysql数据库创建airflow数据库存储Airflow初始的元数据

mysql> mysql -uroot -p
mysql> create database airflow;
mysql> grant all on airflow.* to 'airflow'@'localhost' identified by '123456!a';
mysql> flush privileges;
mysql> grant all on airflow.* to 'airflow'@'%' identified by '123456!a';
mysql> flush privileges;

mysql> show global variables like '%timestamp%';
+---------------------------------+-------+
| Variable_name                   | Value |
+---------------------------------+-------+
| explicit_defaults_for_timestamp | OFF   |
| log_timestamps                  | UTC   |
+---------------------------------+-------+
2 rows in set (0.00 sec)

mysql> set global explicit_defaults_for_timestamp =1;
Query OK, 0 rows affected (0.00 sec)

8)初始化Airflow,把Airflow元数据信息存放到mysql的airflow数据库中

(apache-airflow) [aaa@qq.com apache-airflow]# airflow initdb;
DB: mysql://airflow:***@192.168.239.131/airflow
[2020-11-28 12:31:09,310] {db.py:378} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO  [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO  [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO  [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO  [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO  [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
INFO  [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO  [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO  [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO  [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO  [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO  [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
INFO  [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
INFO  [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
INFO  [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO  [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
INFO  [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
INFO  [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
INFO  [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance
INFO  [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table
INFO  [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table
INFO  [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table
INFO  [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table
INFO  [alembic.runtime.migration] Running upgrade da3f683c3a5a -> 92c57b58940d, Create FAB Tables
INFO  [alembic.runtime.migration] Running upgrade 92c57b58940d -> 03afc6b6f902, Increase length of FAB ab_view_menu.name column
WARNI [airflow.models.crypto] cryptography not found - values will not be stored encrypted.
Done.

9)启动Airflow

使用的是CeleryExecutor,需要顺序执行三个进程

airflow webserver
#如下是以守护进程启动
#airflow webserver -D

Apache Airflow安装

airflow scheduler
#airflow scheduler -D

Apache Airflow安装

airflow worker
#如下是以守护进程启动
#airflow worker -D

Apache Airflow安装
如上所示则成功启动Airflow,查看Airflow调度页面
Apache Airflow安装
下面测试Airflow调度功能。

9)测试Airflow调度功能,Airflow应用实例

准备一个Python脚本

vim airflow_test.py
print("hi~你好~")

通过Airflow的DAG脚本5m_001.py,定时调用airflow_test.py脚本

vim 5m_001.py
#-*- coding=utf-8 -*-
### 导入相关模块(默认即可)
import airflow
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators import  DummyOperator, BashOperator

### 定义默认全局参数
args = {
    'owner': 'aps', ##定义dag所有者,用作web显示
    #'start_date': datetime(2018, 4, 10, 20, 25), ##设置绝对时间,从具体某一天开始
    'start_date': airflow.utils.dates.days_ago(0),##设置相对时间,当前时间-1
    #'email': ['aaa@qq.com', 'aaa@qq.com', 'aaa@qq.com', 'aaa@qq.com','aaa@qq.com'],##airflow邮件发送接收人
    'email_on_failure': True,##发送条件
    'email_on_retry': False,##发送条件
    'depends_on_past': False,##是否依赖上一次
    #'wait_for_downstream': True,##是否等待依赖
    #'trigger_rule': u'all_success',##触发规则
    #'priority_weight': 2,##优先级
    'retries': 0, ##当作业报错后重试次数(可以单独定义到task中)
    'retry_delay': timedelta(minutes=1) ##重试时间间隔
}


### 定义全局DAG信息
dag = DAG(
    dag_id='5m_001',##定义dag_id,必须唯一,建议与文件名保持一致
    default_args=args,
    schedule_interval='* * * * *' ##调度间隔,具体设置参考crontab用法
    )


########################################################## APS ##########################################################

test = BashOperator(
    task_id = 'test',
    bash_command = 'python3 /moudle/data/airflow_test.py',
     #bash_command = 'sleep 1',
    dag = dag)


#update123.set_upstream(t_staging_optical_quality_raw_data)

DAG脚本5m_001.py放到airflow安装目录下的example_dags文件夹下

ls /moudle/apache-airflow/lib/python3.6/site-packages/airflow/example_dags/

Apache Airflow安装
查看DAG5m_001运行情况
Apache Airflow安装
查看任务日志
Apache Airflow安装
查看DAG代码
Apache Airflow安装
至此完成Apache Airflow安装。