flask+uwsgi+docker的一个小应用
程序员文章站
2022-07-12 17:14:57
...
项目架构
需求
- 可以根据预设字段爬取平台信息
- 数据清洗后存入数据库
- 可以定时自动发送邮件给相关处理人
- 可以可视化展示数据
框架
app
├── docker-compose.yml # docker编排文件
├── flask #项目主目录
│ ├── cookie.txt #爬虫平台的登录cookie
│ ├── count.txt #程序运行计数
│ ├── Dockerfile #flask的dockerfile
│ ├── my_app #web应用主目录
│ │ ├── __init__.py # 应用初始化
│ │ ├── models.py #登录验证模型
│ │ ├── templates #html模板
│ │ │ ├── base.html
│ │ │ ├── login.html
│ │ │ └── pro.html
│ │ └── views.py #url
│ ├── requirements.txt
│ ├── run.py #app入口
│ ├── static #静态文件
│ │ ├── 3G.png
│ │ └── 4G.png
│ └── uwsgi.ini #uwsgi.ini配置文件
└── nginx #nginx容器文件
├── Dockerfile
└── nginx.conf
核心代码
app实现
app主要由初始化文件__init__.py
、model.py
、view.py
、run.py
和简单的jinjia2
模板构成实现一个简单的flask
页面
-
__init__.py
app初始化
"""
Version: 0.1
Author: 郭凯
Date: 2020/5/20
File: __init__.py.py
IDE: PyCharm
"""
from flask import Flask
# 实例化一个web应用
app = Flask(__name__)
# 载入views,如果有扩展可以载入其他,存放url,有点Django的意思
from my_app import views
-
model.py
只是一个简单的用户模型
"""
Version: 0.1
Author: 郭凯
Date: 2020/5/20
File: __init__.py.py
IDE: PyCharm
"""
from flask_login import UserMixin
class User(UserMixin):
pass
# 这里可以直接用数据库,也可以做加密处理,这里就直接偷懒了
users = [
{'id': 'guokai', 'username': '郭凯', 'password': 'guokai'},
{'id': 'test', 'username': 'test', 'password': 'test'},
....
]
def query_user(user_id):
for user in users:
if user_id == user['id']:
return user
-
view.py
这个程序有点乱,其实只要写url就好,这边我把逻辑写进去了,其实是根据原来的逻辑改的flask
"""
Version: 0.1
Author: 郭凯
Date: 2020/5/20
File: views.py.py
IDE: PyCharm
"""
# 基础函数
import telnetlib
import re
import requests
from bs4 import BeautifulSoup
from my_app import app
from flask import request, redirect, url_for, render_template, flash
from flask_login import LoginManager, login_user, logout_user, current_user, login_required
from my_app.models import User, query_user
import pymysql
import pandas as pd
import matplotlib.pyplot as plt
import time
app.secret_key = '1234567'
login_manager = LoginManager()
login_manager.login_view = 'login'
login_manager.login_message_category = 'info'
login_manager.login_message = '请登录'
login_manager.init_app(app)
# 单例实现,实现网站计数
# 为了使类只能出现一个实例,我们可以使用 __new__ 来控制实例的创建过程,代码如下:
# class MyClass(object):
# _instance = None
#
# def __new__(cls, *args, **kwargs):
# if not cls._instance:
# cls._instance = super(MyClass, cls).__new__(cls, *args, **kwargs)
# return cls._instance
#
#
# class HerClass(MyClass):
# count = 0
#
#
# counts = HerClass().count
# counts = 0
# 加载用户信息
@login_manager.user_loader
def load_user(user_id):
if query_user(user_id) is not None:
curr_user = User()
curr_user.id = user_id
return curr_user
# 登录
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user_id = request.form.get('userid')
user = query_user(user_id)
if user is not None and request.form['password'] == user['password']:
curr_user = User()
curr_user.id = user_id
# 通过Flask-Login的login_user方法登录用户
login_user(curr_user)
return redirect(url_for('index'))
flash('Wrong username or password!')
# GET 请求
return render_template('login.html')
# 登出
@app.route('/logout')
@login_required
def logout():
logout_user()
return 'Logged out successfully!'
# IP加SN号码查询光功率
def base(msg_get):
# 精准提取IP地址
ip = re.search(r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
msg_get).group(0)
sn = re.search(r'\d{5,10}', msg_get).group(0)
# pon = re.search(r'\d+-\d+-\d+', msg_get).group(0)
res = optical_power(ip, sn)
return 'text', res
# 产品号码查询IP,SN
def get_sn(clogCode):
这里是一个爬虫,保密
return opt_res
@app.route('/pro/', methods=['POST'])
@login_required
# 产品号码查询光功率
def pro():
# 提取产品号码
with open('count.txt', 'r') as f:
counts = int(f.read())
counts += 1
with open('count.txt', 'w') as f1:
f1.write(str(counts))
msg_get = request.form['msg']
product = re.search(r'\d+', msg_get).group(0)
res = get_sn(product)
print(res)
try:
b = dict(item.split(":") for item in res.split(";"))
except:
b = res
print(b)
return render_template("pro.html", res=b)
# return res
=
@app.route('/ess', methods=['GET'])
@login_required
def cess():
p_3g = get_3g_pic()
p_4g = get_4g_pic()
return render_template('ess.html', path_3g_=p_3g, path_4g_=p_4g)
# 连接数据库
def get_conn():
conn = pymysql.connect(host='这里保密', port=3306, user='crawler', passwd='这里保密', db='crawler',
charset='utf8')
return conn
# 下发sql
def query_all(cur, sql, args):
cur.execute(sql, args)
return cur.fetchall()
# 读取数据库
def read_sql(sql_ku):
conn = get_conn()
cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
sql = "select * from %s" % sql_ku
results = query_all(cur=cur, sql=sql, args=None)
# df = pd.DataFrame(results)
# print(results)
return results
# 3G短信图片
def get_3g_pic():
res = read_sql("export")
for i in res:
i["ENDTIME"] = i["ENDTIME"][0:13]
df = pd.DataFrame(res)
plt.figure(dpi=128, figsize=(20, 12))
x_data = df["ENDTIME"].value_counts().sort_index(ascending=False).index
y_data = df["ENDTIME"].value_counts().sort_index(ascending=False).values
x_data_limit = x_data[:24]
y_data_limit = y_data[:24]
plt.xticks(rotation=45)
plt.title("3G SUM", fontsize=24)
plt.xlabel("time") # x轴上的名字
plt.ylabel("sum") # y轴上的名字
# plt.xlim(50)
# 显示数值
for x, y in zip(x_data_limit, y_data_limit):
plt.text(x, y + 0.05, '%.0f' % y, ha='center', va='bottom', fontsize=11)
plt.bar(x_data_limit, y_data_limit)
path_3g = '/3G_%s.png' % time.strftime('%Y-%m-%d_%H', time.localtime())
plt.savefig('./static' + path_3g)
return path_3g
# 4G短信图片
def get_4g_pic():
res = read_sql("export_sgs")
for i in res:
i["ENDTIME"] = i["ENDTIME"][0:13]
df = pd.DataFrame(res)
plt.figure(dpi=128, figsize=(20, 12))
x_data = df["ENDTIME"].value_counts().sort_index(ascending=False).index
y_data = df["ENDTIME"].value_counts().sort_index(ascending=False).values
x_data_limit = x_data[:24]
y_data_limit = y_data[:24]
plt.xticks(rotation=45)
plt.title("4G SUM", fontsize=24)
plt.xlabel("time") # x轴上的名字
plt.ylabel("sum") # y轴上的名字
# plt.xlim(50)
# 显示数值
for x, y in zip(x_data_limit, y_data_limit):
plt.text(x, y + 0.05, '%.0f' % y, ha='center', va='bottom', fontsize=11)
plt.bar(x_data_limit, y_data_limit)
path_4g = '/4G_%s.png' % time.strftime('%Y-%m-%d_%H', time.localtime())
plt.savefig('./static' + path_4g)
return path_4g
# 登录系统,保存cookie
def cook():
with open('cookie.txt', 'r') as f:
ck = f.read()
headers = {
"""
请求头
"""
}
res = requests.get('这里保密', headers=headers)
login_status = re.search(r'您没有登录本系统或者您的登录已经超时;请登录本系统!', res.text)
while login_status is not None:
res = requests.get('这里保密')
cookies = requests.utils.dict_from_cookiejar(res.cookies)
# print(cookies)
login_data = {
”“”
登录字段
“”“
}
for item, value in cookies.items():
ck = item + '=' + value
headers = {
"""
请求头
"""
}
requests.post('这里保密', data=login_data, headers=headers)
res = requests.get('这里保密', headers=headers)
login_status = re.search(r'您没有登录本系统或者您的登录已经超时;请登录本系统!', res.text)
print('登录成功!!')
with open('cookie.txt', 'w') as f:
f.write(ck)
print(ck)
return ck
# 查看光猫光功率
def optical_power(ip, sn):
try:
telnet = telnetlib.Telnet(ip, 23)
"""
设备登录过程
"""
print('登录成功')
except:
print('OLT IP:%s;ONU SN号:%s;OLT不在线或管理IP输错!!' % (ip, sn))
return 'OLT IP:%s;ONU SN号:%s;错误:OLT不在线或管理IP输错!!' % (ip, sn)
telnet.write(bytes("enable\n", encoding="utf8"))
telnet.expect([b">", b"#"], timeout=20)
telnet.write(bytes("config\n", encoding="utf8"))
telnet.expect([b">", b"#"], timeout=20)
telnet.write(bytes("switch language-mode\n", encoding="utf8"))
telnet.expect([b">", b"#"], timeout=20)
print('切换语言成功')
telnet.write(bytes("display ont info by-loid %s\n" % sn, encoding="utf8"))
telnet.write(bytes(" ", encoding="utf8"))
y = telnet.read_until(b"#")
y = str(y, encoding="gbk", errors='ignore')
if re.search(r'(?<=ONT编号 : )\d+(?=\s)', y) is not None:
pon_id = re.search(r'(?<=ONT编号 : )\d+(?=\s)', y).group(0)
pon = re.search(r'(?<=框/槽/端口 : )\S+(?=\s)', y).group(0)
else:
telnet.write(bytes("display ont info by-password %s\n" % sn, encoding="utf8"))
y = telnet.read_until(b"#")
y = str(y, encoding="gbk", errors='ignore')
if re.search(r'/\d{1,2}\s+(\d+)', y) is not None:
pon_id = re.search(r'/\d{1,2}\s+(\d+)', y).group(1)
pon = re.search(r'(?<=\s)\d+/\d+/\d+(?=\s)', y).group(0)
print(pon)
else:
telnet.write(bytes("quit\nquit\ny\n", encoding="utf8"))
print('OLT IP:%s\nONU SN号:%s\nONT编号无法查询!!\n请检查IP及SN编码是否正确!!' % (ip, sn))
return 'OLT IP:%s;ONU SN号:%s;错误:ONT编号无法查询!!' % (ip, sn)
# telnet.expect([b">", b"#"], timeout=20)
# print('Id查询成功')
pon_list = re.findall(r'\d+', pon)
telnet.write(bytes("interface gpon %d/%d\n" % (int(pon_list[0]), int(pon_list[1])), encoding="utf8"))
telnet.expect([b">", b"#"], timeout=20)
print('进入PON')
telnet.write(bytes("display ont optical-info %d %d\n" % (int(pon_list[2]), int(pon_id)), encoding="utf8"))
telnet.write(bytes(" q", encoding="utf8"))
print('读取')
x = telnet.read_until(b"#")
res = str(x, encoding="gbk", errors='ignore')
if re.search(r'接收功率\(dBm\)\s+:\s*(-*\d+\.\d*)', res) is not None:
cat_res = re.search(r'接收功率\(dBm\)\s+:\s*(-*\d+\.\d*)', res).group(1)
olt_res = re.search(r'OLT接收ONT光功率\(dBm\)\s+:\s*(-*\d+\.\d*)', res).group(1)
result = 'OLT IP:%s;ONU SN号:%s;光猫接收光功率(dBm):%s;OLT接收光猫光功率(dBm):%s' % (ip, sn, cat_res, olt_res)
else:
result = 'OLT IP:%s;ONU SN号:%s;错误:光猫不在线' % (ip, sn)
print(result)
telnet.write(bytes("quit\nquit\nquit\ny\n", encoding="utf8"))
print('退出')
return result
# 查看光猫网口有没有接网线,端口协商速率
def negotiation_rate():
pass
# 查看光猫历史注册记录
def history_register():
pass
# 查看光猫MAC地址
def mac_address():
pass
# 查看未注册光猫
def no_register():
pass
# 查看PON口下光猫收光
def pon_power():
pass
# 查看PON口下历史告警清单(取前10条)
def pon_waring():
pass
@app.route('/')
@login_required
def index():
with open('count.txt', 'r') as f2:
counts_show = int(f2.read())
return render_template('base.html', count=counts_show, user_name=query_user(current_user.get_id())['username'])
-
run.py
程序入口
from my_app import app
if __name__ == "__main__":
app.run()
看一下简单的效果
- 登录首页
- 登录后的样子
- 可视化展示,这边其实是绘图后保存成静态文件,然后jinjia2调用
部署
项目采用nginx+uwsgi+docker部署
-
nginx
简单的Dockerfile
# Use the Nginx image
# 使用Nginx镜像
FROM nginx
# Remove the default nginx.conf
# 移除官方的配置文件, 并换为自己的
RUN rm /etc/nginx/conf.d/default.conf
# Replace with our own nginx.conf
COPY nginx.conf /etc/nginx/conf.d/
-
nginx
配置文件
server {
listen 80; # 监听80端口
charset UTF-8;
client_max_body_size 30M;
location / {
include uwsgi_params;
uwsgi_pass flask:8080; # flask指容器名字,该配置是指将信息转发至flask容器的8080端口
}
}
- app的Dockerfile
# Use the Python3.6 image
# 使用python 3.6作为基础镜像
FROM python:3.6
# Set the working directory to /app
# 设置工作目录,作用是启动容器后直接进入的目录名称
WORKDIR /app
# Copy the current directory contents into the container at /app
# . 表示和Dockerfile同级的目录
# 该句将当前目录下的文件复制到docker镜像的/app目录中
ADD . /app
# Install the dependencies
# 安装相关依赖
RUN pip install -r requirements.txt
# run the command to start uWSGI
# 容器启动后要执行的命令 -> 启动uWSGI服务器
CMD ["uwsgi", "uwsgi.ini"]
uwsgi.ini
[uwsgi]
wsgi-file = run.py # 指定要加载的WSGI文件,也即包含Flask实例(app)的文件名称
callable = app # 指出uWSGI加载的模块中哪个变量将被调用, 也即Flask实例名称
socket = :8080 # 指定socket文件,也可以指定为127.0.0.1:9000,用于配置监听特定端口的套接字
processes = 4 # 指定开启的工作进程数量(这里是开启4个进程)
threads = 2 # 设置每个工作进程的线程数
master = true # 启动主进程,来管理其他进程,其它的uwsgi进程都是这个master进程的子进程
chmod-socket = 660 # unix socket是个文件,所以会受到unix系统的权限限制。如果我们的uwsgi客户端没有权限访问uWSGI socket,可以用这个选项设置unix socket的权限
vacuum = true # 当服务器退出的时候自动删除unix socket文件和pid文件
die-on-term = true # ?
buffer-size = 65535 # 设置用于uwsgi包解析的内部缓存区大小为128k。默认是4k
limit-post = 104857600 # 限制http请求体大小
docker-compose.yml
version: "3.6"
services:
flask:
build: ./flask # 指向相关镜像的Dockerfile所在目录
container_name: flask
restart: always
environment: # 配置容器的环境变量
- APP_NAME=MyFlaskApp
expose: # 将该容器的8080端口开放给同一网络下的其他容器和服务
- 8080
nginx:
build: ./nginx
container_name: nginx
restart: always
ports: # HOST:CONTAINER 将主机的80端口映射到容器的80端口,相当于将nginx容器的80端口开放给外部网络
- "80:80"
- 部署,cd到app目录下执行
docker-compose build
docker-compose up
数据清洗及定时邮件
- 数据清洗的数据涉及太多隐私,不做展示,一般都是用
pandas
就对了 - 定时邮件 ——部分代码
def send_mail(to, title, res_, starttime, stoptime):
with open('send_mail.json', 'r', encoding='utf8') as f:
body = json.load(f)
sender_mail = body['sender']
sender_pass = body['password'] # 同样是乱打的
# 设置总的邮件体对象,对象类型为mixed
msg_root = MIMEMultipart('mixed')
# 邮件添加的头尾信息等
# msg_root['From'] = '郭凯<aaa@qq.com>'
msg_root['To'] = body['to'][0]
# 邮件的主题,显示在接收邮件的预览页面
subject = '%s%s——%s' % (title, starttime, stoptime)
msg_root['subject'] = Header(subject, 'utf-8')
# # 构造文本内容
# text_info = '测试成功'
# text_sub = MIMEText(text_info, 'plain', 'utf-8')
# msg_root.attach(text_sub)
#
# # 构造超文本
# 构造超文本
html_info = res_
html_sub = MIMEText(html_info, 'html', 'utf-8')
msg_root.attach(html_sub)
# 构造附件
txt_file = open(r'./结果.xlsx', 'rb').read()
txt = MIMEText(txt_file, 'base64', 'utf-8')
txt["Content-Type"] = 'application/octet-stream'
# 以下代码可以重命名附件为hello_world.txt
txt.add_header('Content-Disposition', 'attachment', filename='结果.xlsx')
msg_root.attach(txt)
try:
sftp_obj = smtplib.SMTP_SSL(body['send_server'], body['port'])
# 登录之前先调用starttls()函数
# sftp_obj.starttls()
sftp_obj.login(sender_mail, sender_pass)
# for i in body['to']:
sftp_obj.sendmail(sender_mail, to, msg_root.as_string())
sftp_obj.quit()
print('sendemail successful!')
except Exception as e:
print('sendemail failed next is the reason')
print(e)
if __name__ == '__main__':
with open('send_mail.json', 'r', encoding='utf8') as f:
body1 = json.load(f)
print('开始')
start_time1 = time.strftime('%Y-%m-%d %H:00:00', time.localtime(time.time() - 2 * 60 * 60))
end_time1 = time.strftime('%Y-%m-%d %H:00:00', time.localtime(time.time()))
print('%s——%s' % (start_time1, end_time1))
print('获取数据中,请耐心等待····')
# t1 = threading.Thread(target=get_res_sgs, args=(start_time1, end_time1))
# t2 = threading.Thread(target=get_res, args=(start_time1, end_time1))
data1 = get_res(start_time1, end_time1)
print('原始数据写入数据库中,请稍后')
write_sql(data1, 'crawler')
df1 = read_sql('contact')
df2 = pd.DataFrame(data1)
# 利用内连接判断df2
# print(df2['PHONE'].to_list())
df3 = df2[df2['PHONE'].isin(df1['PHONE'].to_list())]
# 以df1为基准,
# df3 = pd.merge(df1, df2, how='left', on=['PHONE'], right_index=True)
# df3.sort_index(inplace=True)
df3.to_excel("结果.xlsx", sheet_name='详细结果')
df_dic = df3.to_dict(orient='records')
print('筛选出%d条数据' % len(df_dic))
if len(df_dic) != 0:
print('筛选数据写入数据库中,请稍后')
write_sql(df_dic, 'export')
print('生成邮件中》》')
html_ = get_html(start_time1, end_time1, df_dic)
print('发送邮件中。。')
for i in body1['to']:
send_mail(i, '3G短信', html_, start_time1, end_time1)
# sgs数据
data1 = get_res_sgs(start_time1, end_time1)
print('原始数据写入数据库中,请稍后')
write_sql(data1, 'clawer_sgs')
df1 = read_sql('contact')
df2 = pd.DataFrame(data1)
# 利用内连接判断df2
# print(df2['PHONE'].to_list())
df3 = df2[df2['PHONE'].isin(df1['PHONE'].to_list())]
# 以df1为基准,
# df3 = pd.merge(df1, df2, how='left', on=['PHONE'], right_index=True)
# df3.sort_index(inplace=True)
df3.to_excel("结果.xlsx", sheet_name='详细结果')
df_dic = df3.to_dict(orient='records')
print('筛选出%d条数据' % len(df_dic))
if len(df_dic) != 0:
print('筛选数据写入数据库中,请稍后')
write_sql(df_dic, 'export_sgs')
print('生成邮件中》》')
html_ = get_html(start_time1, end_time1, df_dic)
print('发送邮件中。。')
for i in body1['to']:
send_mail(i, '4G短信', html_, start_time1, end_time1)
# print('程序休眠中。。。。。')
# time.sleep(2 * 60 * 60)
# send_mail(1, 2, 3)
- 效果