Apache Airflow安装
程序员文章站
2022-05-01 13:03:47
...
Apache Airflow安装
一、安装Python3
首先在CentOS7服务器上安装Python3,Python3安装可以参考下面这篇博客:
CentOS7安装Python3.7
二、安装mysql5.7.32
其次安装Mysql5.7数据库存储airflow元数据信息,Mysql5.7数据库安装可以参考下面这篇博客:
CentOS7离线安装mysql5.7.32
三、安装Apache Airflow
1)更新pip
[aaa@qq.com bin]# pip3 install --upgrade pip
[aaa@qq.com bin]# pip3 install --upgrade setuptools
2)安装开发库和基础环境
开发库和基础环境一定要安装,可以避免安装airflow过程中产生的很多异常报错
yum -y groupinstall 'Development Tools'
yum -y install lapack-devel blas-devel bzip2-devel ncurses-devel readline-devel sqlite-devel
yum -y install tcl-devel tk-devel libxml2-devel xz-devel gdbm-devel openssl-devel zlib-devel
yum -y install cyrus-sasl-devel gcc gcc-devel libffi-devel openssl-devel gcc-c++ libsasl2-dev
yum install -y python3-devel libevent-devel mysql-devel
3)创建Python虚拟环境
创建Python虚拟环境安装Airflow
#创建虚拟环境
[aaa@qq.com moudle]# python3 -m venv /moudle/apache-airflow
#进入创建的虚拟环境存放目录
[aaa@qq.com moudle]# cd /moudle/apache-airflow
#**虚拟环境
[aaa@qq.com apache-airflow]# source /moudle/apache-airflow/bin/activate
#虚拟环境中安装Apache Airflow
(apache-airflow) [aaa@qq.com apache-airflow]#
pip3 install airflow[mysql,jdbc,celery] -i https://mirrors.aliyun.com/pypi/simple
# 安装Apache airflow需要的一些安装包
(apache-airflow) [aaa@qq.com apache-airflow]# pip3 install flask_bcrypt sasl thrift_sasl pyhs2 -i https://mirrors.aliyun.com/pypi/simple
(apache-airflow) [aaa@qq.com apache-airflow]# pip3 install pymysql mysql -i https://mirrors.aliyun.com/pypi/simple
4)设置Airflow系统环境变量
vim /etc/profile
export AIRFLOW_HOME=/moudle/apache-airflow
export PATH=$PATH:$AIRFLOW_HOME/bin
#**环境变量
source /etc/profile
5)初始化Airflow
初始化airflow,产生Airflow配置文件airflow.cfg
出现Done没有报错则成功初始化airflow
(apache-airflow) [aaa@qq.com airflow]# airflow initdb
DB: sqlite:moudle/apache-airflow/airflow.db
[2020-11-28 11:27:09,311] {db.py:378} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
......
Done.
6)修改Airflow配置文件airflow.cfg
vim /moudle/apache-airflow/airflow.cfg
[core]
dags_folder = /moudle/apache-airflow/dags
base_log_folder = /moudle/apache-airflow/logs
# Logging level
logging_level = INFO
# Logging level for Flask-appbuilder UI
fab_logging_level = WARN
colored_console_log = True
# Log format for when Colored logs is enabled
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
# Format of Log line
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
# Log filename format
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /moudle/apache-airflow/logs/dag_processor_manager/dag_processor_manager.log
default_timezone = utc
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://airflow:123456!aaa@qq.com/airflow
# The encoding for the databases
sql_engine_encoding = utf-8
# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True
# The SqlAlchemy pool size is the maximum number of database connections
# in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 30
load_examples = True
# Where your Airflow plugins are stored
plugins_folder = /moudle/apache-airflow/plugins
# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner
endpoint_url = http://192.168.239.133:8088
[api]
# How to authenticate users of the API. See
# https://airflow.apache.org/docs/stable/security.html for possible values.
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
#auth_backend = airflow.api.auth.backend.deny_all
auth_backend = airflow.api.auth.backend.default
[webserver]
# The base url of your website as airflow cannot guess what domain or
# cname you are using. This is used in automated emails that
# airflow sends to point links to the right web server
base_url = http://192.168.239.133:8088
default_ui_timezone = Asia/Shanghai
web_server_host = 192.168.239.133
web_server_port = 8088
# Default DAG view. Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree
# "Default DAG orientation. Valid values are:"
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
# smtp_user =
# Example: smtp_password = airflow
# smtp_password =
smtp_port = 25
smtp_mail_from = aaa@qq.com
[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = sqla+mysql://airflow:123456!aaa@qq.com:3306/airflow
# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://airflow:123456!aaa@qq.com:3306/airflow
# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it ``airflow flower``. This defines the IP that Celery Flower runs on
flower_host = 0.0.0.0
# The root URL for Flower
# Example: flower_url_prefix = /flower
flower_url_prefix =
7)Mysql数据库创建airflow数据库存储Airflow初始的元数据
mysql> mysql -uroot -p
mysql> create database airflow;
mysql> grant all on airflow.* to 'airflow'@'localhost' identified by '123456!a';
mysql> flush privileges;
mysql> grant all on airflow.* to 'airflow'@'%' identified by '123456!a';
mysql> flush privileges;
mysql> show global variables like '%timestamp%';
+---------------------------------+-------+
| Variable_name | Value |
+---------------------------------+-------+
| explicit_defaults_for_timestamp | OFF |
| log_timestamps | UTC |
+---------------------------------+-------+
2 rows in set (0.00 sec)
mysql> set global explicit_defaults_for_timestamp =1;
Query OK, 0 rows affected (0.00 sec)
8)初始化Airflow,把Airflow元数据信息存放到mysql的airflow数据库中
(apache-airflow) [aaa@qq.com apache-airflow]# airflow initdb;
DB: mysql://airflow:***@192.168.239.131/airflow
[2020-11-28 12:31:09,310] {db.py:378} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl MySQLImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
INFO [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
INFO [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
INFO [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
INFO [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
INFO [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance
INFO [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table
INFO [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table
INFO [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table
INFO [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table
INFO [alembic.runtime.migration] Running upgrade da3f683c3a5a -> 92c57b58940d, Create FAB Tables
INFO [alembic.runtime.migration] Running upgrade 92c57b58940d -> 03afc6b6f902, Increase length of FAB ab_view_menu.name column
WARNI [airflow.models.crypto] cryptography not found - values will not be stored encrypted.
Done.
9)启动Airflow
使用的是CeleryExecutor,需要顺序执行三个进程
airflow webserver
#如下是以守护进程启动
#airflow webserver -D
airflow scheduler
#airflow scheduler -D
airflow worker
#如下是以守护进程启动
#airflow worker -D
如上所示则成功启动Airflow,查看Airflow调度页面
下面测试Airflow调度功能。
9)测试Airflow调度功能,Airflow应用实例
准备一个Python脚本
vim airflow_test.py
print("hi~你好~")
通过Airflow的DAG脚本5m_001.py,定时调用airflow_test.py脚本
vim 5m_001.py
#-*- coding=utf-8 -*-
### 导入相关模块(默认即可)
import airflow
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators import DummyOperator, BashOperator
### 定义默认全局参数
args = {
'owner': 'aps', ##定义dag所有者,用作web显示
#'start_date': datetime(2018, 4, 10, 20, 25), ##设置绝对时间,从具体某一天开始
'start_date': airflow.utils.dates.days_ago(0),##设置相对时间,当前时间-1
#'email': ['aaa@qq.com', 'aaa@qq.com', 'aaa@qq.com', 'aaa@qq.com','aaa@qq.com'],##airflow邮件发送接收人
'email_on_failure': True,##发送条件
'email_on_retry': False,##发送条件
'depends_on_past': False,##是否依赖上一次
#'wait_for_downstream': True,##是否等待依赖
#'trigger_rule': u'all_success',##触发规则
#'priority_weight': 2,##优先级
'retries': 0, ##当作业报错后重试次数(可以单独定义到task中)
'retry_delay': timedelta(minutes=1) ##重试时间间隔
}
### 定义全局DAG信息
dag = DAG(
dag_id='5m_001',##定义dag_id,必须唯一,建议与文件名保持一致
default_args=args,
schedule_interval='* * * * *' ##调度间隔,具体设置参考crontab用法
)
########################################################## APS ##########################################################
test = BashOperator(
task_id = 'test',
bash_command = 'python3 /moudle/data/airflow_test.py',
#bash_command = 'sleep 1',
dag = dag)
#update123.set_upstream(t_staging_optical_quality_raw_data)
DAG脚本5m_001.py放到airflow安装目录下的example_dags文件夹下
ls /moudle/apache-airflow/lib/python3.6/site-packages/airflow/example_dags/
查看DAG5m_001运行情况
查看任务日志
查看DAG代码
至此完成Apache Airflow安装。
上一篇: jar包解压后还原可执行jar包
下一篇: Hive 常见的数据倾斜及调优技巧