欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flask+uwsgi+docker的一个小应用

程序员文章站 2022-07-12 17:14:57
...

项目架构

需求

  • 可以根据预设字段爬取平台信息
  • 数据清洗后存入数据库
  • 可以定时自动发送邮件给相关处理人
  • 可以可视化展示数据

框架

app
├── docker-compose.yml                                      # docker编排文件
├── flask                                                   #项目主目录 
│   ├── cookie.txt                                          #爬虫平台的登录cookie
│   ├── count.txt                                           #程序运行计数
│   ├── Dockerfile                                          #flask的dockerfile
│   ├── my_app                                              #web应用主目录
│   │   ├── __init__.py                                     # 应用初始化
│   │   ├── models.py                                       #登录验证模型
│   │   ├── templates                                       #html模板
│   │   │   ├── base.html
│   │   │   ├── login.html
│   │   │   └── pro.html
│   │   └── views.py                                       #url
│   ├── requirements.txt
│   ├── run.py                                             #app入口
│   ├── static                                             #静态文件
│   │   ├── 3G.png
│   │   └── 4G.png
│   └── uwsgi.ini                                          #uwsgi.ini配置文件
└── nginx                                                  #nginx容器文件
    ├── Dockerfile
    └── nginx.conf

核心代码

app实现

app主要由初始化文件__init__.pymodel.pyview.pyrun.py和简单的jinjia2模板构成实现一个简单的flask页面

  • __init__.py app初始化
"""
Version: 0.1
Author: 郭凯
Date: 2020/5/20
File: __init__.py.py
IDE: PyCharm

"""
from flask import Flask

# 实例化一个web应用
app = Flask(__name__)
# 载入views,如果有扩展可以载入其他,存放url,有点Django的意思
from my_app import views
  • model.py 只是一个简单的用户模型
"""
Version: 0.1
Author: 郭凯
Date: 2020/5/20
File: __init__.py.py
IDE: PyCharm

"""
from flask_login import UserMixin


class User(UserMixin):
    pass

# 这里可以直接用数据库,也可以做加密处理,这里就直接偷懒了
users = [
    {'id': 'guokai', 'username': '郭凯', 'password': 'guokai'},
    {'id': 'test', 'username': 'test', 'password': 'test'},
    ....
]


def query_user(user_id):
    for user in users:
        if user_id == user['id']:
            return user
  • view.py 这个程序有点乱,其实只要写url就好,这边我把逻辑写进去了,其实是根据原来的逻辑改的flask
"""
Version: 0.1
Author: 郭凯
Date: 2020/5/20
File: views.py.py
IDE: PyCharm

"""
# 基础函数
import telnetlib
import re
import requests
from bs4 import BeautifulSoup
from my_app import app
from flask import request, redirect, url_for, render_template, flash
from flask_login import LoginManager, login_user, logout_user, current_user, login_required
from my_app.models import User, query_user
import pymysql
import pandas as pd
import matplotlib.pyplot as plt
import time

app.secret_key = '1234567'

login_manager = LoginManager()
login_manager.login_view = 'login'
login_manager.login_message_category = 'info'
login_manager.login_message = '请登录'
login_manager.init_app(app)


# 单例实现,实现网站计数
# 为了使类只能出现一个实例,我们可以使用 __new__ 来控制实例的创建过程,代码如下:

# class MyClass(object):
#     _instance = None
#
#     def __new__(cls, *args, **kwargs):
#         if not cls._instance:
#             cls._instance = super(MyClass, cls).__new__(cls, *args, **kwargs)
#         return cls._instance
#
#
# class HerClass(MyClass):
#     count = 0
#
#
# counts = HerClass().count
# counts = 0


# 加载用户信息
@login_manager.user_loader
def load_user(user_id):
    if query_user(user_id) is not None:
        curr_user = User()
        curr_user.id = user_id

        return curr_user


# 登录
@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        user_id = request.form.get('userid')
        user = query_user(user_id)
        if user is not None and request.form['password'] == user['password']:
            curr_user = User()
            curr_user.id = user_id

            # 通过Flask-Login的login_user方法登录用户
            login_user(curr_user)

            return redirect(url_for('index'))

        flash('Wrong username or password!')

    # GET 请求
    return render_template('login.html')


# 登出
@app.route('/logout')
@login_required
def logout():
    logout_user()
    return 'Logged out successfully!'


# IP加SN号码查询光功率
def base(msg_get):
    # 精准提取IP地址
    ip = re.search(r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
                   msg_get).group(0)
    sn = re.search(r'\d{5,10}', msg_get).group(0)
    # pon = re.search(r'\d+-\d+-\d+', msg_get).group(0)

    res = optical_power(ip, sn)
    return 'text', res


# 产品号码查询IP,SN
def get_sn(clogCode):
    这里是一个爬虫,保密
    return opt_res


@app.route('/pro/', methods=['POST'])
@login_required
# 产品号码查询光功率
def pro():
    # 提取产品号码
    with open('count.txt', 'r') as f:
        counts = int(f.read())
    counts += 1
    with open('count.txt', 'w') as f1:
        f1.write(str(counts))
    msg_get = request.form['msg']
    product = re.search(r'\d+', msg_get).group(0)
    res = get_sn(product)
    print(res)
    try:
        b = dict(item.split(":") for item in res.split(";"))
    except:
        b = res
    print(b)

    return render_template("pro.html", res=b)
    # return res

=
@app.route('/ess', methods=['GET'])
@login_required
def cess():
    p_3g = get_3g_pic()
    p_4g = get_4g_pic()
    return render_template('ess.html', path_3g_=p_3g, path_4g_=p_4g)


# 连接数据库
def get_conn():
    conn = pymysql.connect(host='这里保密', port=3306, user='crawler', passwd='这里保密', db='crawler',
                           charset='utf8')
    return conn


# 下发sql
def query_all(cur, sql, args):
    cur.execute(sql, args)
    return cur.fetchall()


# 读取数据库
def read_sql(sql_ku):
    conn = get_conn()
    cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
    sql = "select * from %s" % sql_ku
    results = query_all(cur=cur, sql=sql, args=None)
    # df = pd.DataFrame(results)
    # print(results)
    return results


# 3G短信图片
def get_3g_pic():
    res = read_sql("export")
    for i in res:
        i["ENDTIME"] = i["ENDTIME"][0:13]
    df = pd.DataFrame(res)
    plt.figure(dpi=128, figsize=(20, 12))
    x_data = df["ENDTIME"].value_counts().sort_index(ascending=False).index
    y_data = df["ENDTIME"].value_counts().sort_index(ascending=False).values
    x_data_limit = x_data[:24]
    y_data_limit = y_data[:24]
    plt.xticks(rotation=45)
    plt.title("3G SUM", fontsize=24)
    plt.xlabel("time")  # x轴上的名字
    plt.ylabel("sum")  # y轴上的名字
    # plt.xlim(50)
    # 显示数值
    for x, y in zip(x_data_limit, y_data_limit):
        plt.text(x, y + 0.05, '%.0f' % y, ha='center', va='bottom', fontsize=11)
    plt.bar(x_data_limit, y_data_limit)
    path_3g = '/3G_%s.png' % time.strftime('%Y-%m-%d_%H', time.localtime())
    plt.savefig('./static' + path_3g)
    return path_3g


# 4G短信图片
def get_4g_pic():
    res = read_sql("export_sgs")
    for i in res:
        i["ENDTIME"] = i["ENDTIME"][0:13]
    df = pd.DataFrame(res)
    plt.figure(dpi=128, figsize=(20, 12))
    x_data = df["ENDTIME"].value_counts().sort_index(ascending=False).index
    y_data = df["ENDTIME"].value_counts().sort_index(ascending=False).values

    x_data_limit = x_data[:24]
    y_data_limit = y_data[:24]
    plt.xticks(rotation=45)
    plt.title("4G SUM", fontsize=24)
    plt.xlabel("time")  # x轴上的名字
    plt.ylabel("sum")  # y轴上的名字
    # plt.xlim(50)
    # 显示数值
    for x, y in zip(x_data_limit, y_data_limit):
        plt.text(x, y + 0.05, '%.0f' % y, ha='center', va='bottom', fontsize=11)
    plt.bar(x_data_limit, y_data_limit)
    path_4g = '/4G_%s.png' % time.strftime('%Y-%m-%d_%H', time.localtime())
    plt.savefig('./static' + path_4g)
    return path_4g


# 登录系统,保存cookie
def cook():
    with open('cookie.txt', 'r') as f:
        ck = f.read()
    headers = {
        """
        请求头
        """
    }
    res = requests.get('这里保密', headers=headers)
    login_status = re.search(r'您没有登录本系统或者您的登录已经超时;请登录本系统!', res.text)
    while login_status is not None:
        res = requests.get('这里保密')
        cookies = requests.utils.dict_from_cookiejar(res.cookies)
        # print(cookies)
        login_data = {
            ”“”
            登录字段
            “”“

        }
        for item, value in cookies.items():
            ck = item + '=' + value
        headers = {
           """
        请求头
        """
        }
        requests.post('这里保密', data=login_data, headers=headers)
        res = requests.get('这里保密', headers=headers)
        login_status = re.search(r'您没有登录本系统或者您的登录已经超时;请登录本系统!', res.text)
    print('登录成功!!')
    with open('cookie.txt', 'w') as f:
        f.write(ck)
    print(ck)
    return ck


# 查看光猫光功率
def optical_power(ip, sn):
    try:
        telnet = telnetlib.Telnet(ip, 23)
        """
        设备登录过程
        """
        print('登录成功')
    except:
        print('OLT IP:%s;ONU SN号:%s;OLT不在线或管理IP输错!!' % (ip, sn))
        return 'OLT IP:%s;ONU SN号:%s;错误:OLT不在线或管理IP输错!!' % (ip, sn)
    telnet.write(bytes("enable\n", encoding="utf8"))
    telnet.expect([b">", b"#"], timeout=20)
    telnet.write(bytes("config\n", encoding="utf8"))
    telnet.expect([b">", b"#"], timeout=20)
    telnet.write(bytes("switch language-mode\n", encoding="utf8"))
    telnet.expect([b">", b"#"], timeout=20)
    print('切换语言成功')
    telnet.write(bytes("display ont info by-loid %s\n" % sn, encoding="utf8"))
    telnet.write(bytes("          ", encoding="utf8"))
    y = telnet.read_until(b"#")
    y = str(y, encoding="gbk", errors='ignore')
    if re.search(r'(?<=ONT编号                  : )\d+(?=\s)', y) is not None:
        pon_id = re.search(r'(?<=ONT编号                  : )\d+(?=\s)', y).group(0)
        pon = re.search(r'(?<=框/槽/端口               : )\S+(?=\s)', y).group(0)
    else:
        telnet.write(bytes("display ont info by-password %s\n" % sn, encoding="utf8"))
        y = telnet.read_until(b"#")
        y = str(y, encoding="gbk", errors='ignore')
        if re.search(r'/\d{1,2}\s+(\d+)', y) is not None:
            pon_id = re.search(r'/\d{1,2}\s+(\d+)', y).group(1)
            pon = re.search(r'(?<=\s)\d+/\d+/\d+(?=\s)', y).group(0)
            print(pon)
        else:
            telnet.write(bytes("quit\nquit\ny\n", encoding="utf8"))
            print('OLT IP:%s\nONU SN号:%s\nONT编号无法查询!!\n请检查IP及SN编码是否正确!!' % (ip, sn))
            return 'OLT IP:%s;ONU SN号:%s;错误:ONT编号无法查询!!' % (ip, sn)

    # telnet.expect([b">", b"#"], timeout=20)
    # print('Id查询成功')
    pon_list = re.findall(r'\d+', pon)
    telnet.write(bytes("interface  gpon %d/%d\n" % (int(pon_list[0]), int(pon_list[1])), encoding="utf8"))
    telnet.expect([b">", b"#"], timeout=20)
    print('进入PON')
    telnet.write(bytes("display  ont  optical-info %d %d\n" % (int(pon_list[2]), int(pon_id)), encoding="utf8"))
    telnet.write(bytes("  q", encoding="utf8"))
    print('读取')
    x = telnet.read_until(b"#")
    res = str(x, encoding="gbk", errors='ignore')
    if re.search(r'接收功率\(dBm\)\s+:\s*(-*\d+\.\d*)', res) is not None:
        cat_res = re.search(r'接收功率\(dBm\)\s+:\s*(-*\d+\.\d*)', res).group(1)
        olt_res = re.search(r'OLT接收ONT光功率\(dBm\)\s+:\s*(-*\d+\.\d*)', res).group(1)
        result = 'OLT IP:%s;ONU SN号:%s;光猫接收光功率(dBm):%s;OLT接收光猫光功率(dBm):%s' % (ip, sn, cat_res, olt_res)
    else:
        result = 'OLT IP:%s;ONU SN号:%s;错误:光猫不在线' % (ip, sn)
    print(result)
    telnet.write(bytes("quit\nquit\nquit\ny\n", encoding="utf8"))
    print('退出')
    return result


# 查看光猫网口有没有接网线,端口协商速率
def negotiation_rate():
    pass


# 查看光猫历史注册记录
def history_register():
    pass


# 查看光猫MAC地址
def mac_address():
    pass


# 查看未注册光猫
def no_register():
    pass


# 查看PON口下光猫收光
def pon_power():
    pass


# 查看PON口下历史告警清单(取前10条)
def pon_waring():
    pass


@app.route('/')
@login_required
def index():
    with open('count.txt', 'r') as f2:
        counts_show = int(f2.read())
    return render_template('base.html', count=counts_show, user_name=query_user(current_user.get_id())['username'])

  • run.py程序入口
from my_app import app

if __name__ == "__main__":
    app.run()

看一下简单的效果

  • 登录首页
    flask+uwsgi+docker的一个小应用
  • 登录后的样子
    flask+uwsgi+docker的一个小应用
  • 可视化展示,这边其实是绘图后保存成静态文件,然后jinjia2调用
    flask+uwsgi+docker的一个小应用

部署

    项目采用nginx+uwsgi+docker部署
  • nginx简单的Dockerfile
# Use the Nginx image
# 使用Nginx镜像
FROM nginx

# Remove the default nginx.conf
# 移除官方的配置文件, 并换为自己的
RUN rm /etc/nginx/conf.d/default.conf

# Replace with our own nginx.conf
COPY nginx.conf /etc/nginx/conf.d/
  • nginx配置文件
server {

   listen 80;                    # 监听80端口
   charset UTF-8;
   client_max_body_size 30M;
   location / {
       include uwsgi_params;
       uwsgi_pass flask:8080;   # flask指容器名字,该配置是指将信息转发至flask容器的8080端口
   }
}
  • app的Dockerfile
# Use the Python3.6 image
# 使用python 3.6作为基础镜像
FROM python:3.6

# Set the working directory to /app
# 设置工作目录,作用是启动容器后直接进入的目录名称
WORKDIR /app

# Copy the current directory contents into the container at /app
# . 表示和Dockerfile同级的目录
# 该句将当前目录下的文件复制到docker镜像的/app目录中
ADD . /app

# Install the dependencies
# 安装相关依赖
RUN pip install -r requirements.txt

# run the command to start uWSGI
# 容器启动后要执行的命令 -> 启动uWSGI服务器
CMD ["uwsgi", "uwsgi.ini"]
  • uwsgi.ini
[uwsgi]
wsgi-file = run.py         # 指定要加载的WSGI文件,也即包含Flask实例(app)的文件名称
callable = app             # 指出uWSGI加载的模块中哪个变量将被调用, 也即Flask实例名称
socket = :8080             # 指定socket文件,也可以指定为127.0.0.1:9000,用于配置监听特定端口的套接字
processes = 4              # 指定开启的工作进程数量(这里是开启4个进程)
threads = 2                # 设置每个工作进程的线程数
master = true              # 启动主进程,来管理其他进程,其它的uwsgi进程都是这个master进程的子进程
chmod-socket = 660         # unix socket是个文件,所以会受到unix系统的权限限制。如果我们的uwsgi客户端没有权限访问uWSGI socket,可以用这个选项设置unix socket的权限
vacuum = true              # 当服务器退出的时候自动删除unix socket文件和pid文件
die-on-term = true         # ?
buffer-size = 65535        # 设置用于uwsgi包解析的内部缓存区大小为128k。默认是4k
limit-post = 104857600     # 限制http请求体大小
  • docker-compose.yml
version: "3.6"

services:

  flask:
    build: ./flask           # 指向相关镜像的Dockerfile所在目录
    container_name: flask
    restart: always
    environment:             # 配置容器的环境变量
      - APP_NAME=MyFlaskApp
    expose:                  # 将该容器的8080端口开放给同一网络下的其他容器和服务
      - 8080

  nginx:
    build: ./nginx
    container_name: nginx
    restart: always
    ports:                   # HOST:CONTAINER 将主机的80端口映射到容器的80端口,相当于将nginx容器的80端口开放给外部网络
      - "80:80"
  • 部署,cd到app目录下执行
    docker-compose build
    docker-compose up

数据清洗及定时邮件

  • 数据清洗的数据涉及太多隐私,不做展示,一般都是用pandas就对了
  • 定时邮件 ——部分代码
def send_mail(to, title, res_, starttime, stoptime):
    with open('send_mail.json', 'r', encoding='utf8') as f:
        body = json.load(f)
    sender_mail = body['sender']
    sender_pass = body['password']  # 同样是乱打的

    # 设置总的邮件体对象,对象类型为mixed
    msg_root = MIMEMultipart('mixed')
    # 邮件添加的头尾信息等
    # msg_root['From'] = '郭凯<aaa@qq.com>'
    msg_root['To'] = body['to'][0]
    # 邮件的主题,显示在接收邮件的预览页面
    subject = '%s%s——%s' % (title, starttime, stoptime)
    msg_root['subject'] = Header(subject, 'utf-8')
    # # 构造文本内容
    # text_info = '测试成功'
    # text_sub = MIMEText(text_info, 'plain', 'utf-8')
    # msg_root.attach(text_sub)
    #
    # # 构造超文本
    # 构造超文本
    html_info = res_
    html_sub = MIMEText(html_info, 'html', 'utf-8')
    msg_root.attach(html_sub)
    # 构造附件
    txt_file = open(r'./结果.xlsx', 'rb').read()
    txt = MIMEText(txt_file, 'base64', 'utf-8')
    txt["Content-Type"] = 'application/octet-stream'
    # 以下代码可以重命名附件为hello_world.txt
    txt.add_header('Content-Disposition', 'attachment', filename='结果.xlsx')
    msg_root.attach(txt)

    try:
        sftp_obj = smtplib.SMTP_SSL(body['send_server'], body['port'])
        # 登录之前先调用starttls()函数
        # sftp_obj.starttls()
        sftp_obj.login(sender_mail, sender_pass)
        # for i in body['to']:
        sftp_obj.sendmail(sender_mail, to, msg_root.as_string())
        sftp_obj.quit()
        print('sendemail successful!')

    except Exception as e:
        print('sendemail failed next is the reason')
        print(e)


if __name__ == '__main__':
    with open('send_mail.json', 'r', encoding='utf8') as f:
        body1 = json.load(f)
    print('开始')
    start_time1 = time.strftime('%Y-%m-%d %H:00:00', time.localtime(time.time() - 2 * 60 * 60))
    end_time1 = time.strftime('%Y-%m-%d %H:00:00', time.localtime(time.time()))
    print('%s——%s' % (start_time1, end_time1))
    print('获取数据中,请耐心等待····')
    # t1 = threading.Thread(target=get_res_sgs, args=(start_time1, end_time1))
    # t2 = threading.Thread(target=get_res, args=(start_time1, end_time1))

    data1 = get_res(start_time1, end_time1)
    print('原始数据写入数据库中,请稍后')
    write_sql(data1, 'crawler')
    df1 = read_sql('contact')
    df2 = pd.DataFrame(data1)
    # 利用内连接判断df2
    # print(df2['PHONE'].to_list())
    df3 = df2[df2['PHONE'].isin(df1['PHONE'].to_list())]
    # 以df1为基准,
    # df3 = pd.merge(df1, df2, how='left', on=['PHONE'], right_index=True)
    # df3.sort_index(inplace=True)
    df3.to_excel("结果.xlsx", sheet_name='详细结果')
    df_dic = df3.to_dict(orient='records')

    print('筛选出%d条数据' % len(df_dic))
    if len(df_dic) != 0:
        print('筛选数据写入数据库中,请稍后')
        write_sql(df_dic, 'export')
    print('生成邮件中》》')
    html_ = get_html(start_time1, end_time1, df_dic)
    print('发送邮件中。。')
    for i in body1['to']:
        send_mail(i, '3G短信', html_, start_time1, end_time1)

    # sgs数据
    data1 = get_res_sgs(start_time1, end_time1)
    print('原始数据写入数据库中,请稍后')
    write_sql(data1, 'clawer_sgs')
    df1 = read_sql('contact')
    df2 = pd.DataFrame(data1)
    # 利用内连接判断df2
    # print(df2['PHONE'].to_list())
    df3 = df2[df2['PHONE'].isin(df1['PHONE'].to_list())]
    # 以df1为基准,
    # df3 = pd.merge(df1, df2, how='left', on=['PHONE'], right_index=True)
    # df3.sort_index(inplace=True)
    df3.to_excel("结果.xlsx", sheet_name='详细结果')
    df_dic = df3.to_dict(orient='records')

    print('筛选出%d条数据' % len(df_dic))
    if len(df_dic) != 0:
        print('筛选数据写入数据库中,请稍后')
        write_sql(df_dic, 'export_sgs')
    print('生成邮件中》》')
    html_ = get_html(start_time1, end_time1, df_dic)
    print('发送邮件中。。')
    for i in body1['to']:
        send_mail(i, '4G短信', html_, start_time1, end_time1)
    # print('程序休眠中。。。。。')
    # time.sleep(2 * 60 * 60)
# send_mail(1, 2, 3)
  • 效果
    flask+uwsgi+docker的一个小应用
相关标签: python