欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flask微信公众号后端自动回复及部署(2)

程序员文章站 2022-03-28 11:10:47
...

对于一个比较完备的微信自动回复功能 有几个需要注意的点需要提前说明
项目github:https://github.com/sph116/flask_WX

1.本篇实现的自动回复功能,将会分离自动回复的问题回答的配置信息。如下图,可交给业务岗人员进行编辑。此模板文件以上转至github

flask微信公众号后端自动回复及部署(2)
其中关键词用来匹配用户输入的关键词,以进入不同的回复模块。
不同模块下,具有1级问题及2级问题,输入一级问题序号返回一级问题下的二级问题列表,输入二级问题序号,返回问题所匹配的答案。

2.每次用户发起的请求,微信都会对后端重复发起三次请求。若第一次请求未来得及返回数据,又再次接到第二次请求,将会导致重复的返回信息。所以我们需要对于接收的请求进行去重,重复的请求我们将只返回一条数据.。
微信的请求,会带上用户的id及时间戳,我的实现方式是使用redis进行去重判断。将用户id+时间戳作为key,每次接收请求若redis不存在此key则正常返回,若redis存在此key则返回空值。

3.我们根据序号来返回相关问题,但显然不同关键词模块下都有相同的序号。所以我们需要判断用户上一次输入,是在哪个关键词模块。这一部分的实现方式依然是使用redis,将用户名作为key值,用户所在的关键词模块为value,并设置3分钟左右的失效时间。每次接收到请求,根据用户id查找redis是否存在值吗,若不存在。则返回数据让用户重新选择输入关键词,若存在值。则进入相关的关键词模块。

相关源码:
flask app.py 使用gevent进行协程封装 提升并发量

from gevent import monkey; monkey.patch_all()   # 猴子补丁打在最前面 
import difflib
from gevent import pywsgi
from flask import Flask
from flask import request, jsonify
from wechatpy.utils import check_signature
from wechatpy.exceptions import InvalidSignatureException
from wechatpy.replies import TextReply, ArticlesReply
from wechatpy import parse_message
from Check_duplication import Redis
import pandas as pd
import re

app = Flask(__name__)

def Reply_text(msg, reply_text):
    """
    用于回复文本数据
    :param msg:
    :param reply_text:
    :return:
    """
    reply_data = TextReply(content=reply_text, message=msg)
    reply_xml = reply_data.render()
    return reply_xml

def Reply_Article(msg, reply_dic):
    reply = ArticlesReply(message=msg)
    reply.add_article(reply_dic)
    return reply.render()


def extract_que(que_path):
    """
    提取所有问题回复数据
    返回问题分级字典 和答案字典
    :param que_path:
    :return:
    """
    que_df = pd.read_excel(que_path)

    que_dict = {}  # 序号对应问题字典
    ans_dict = {}  # 问题对应答案字典
    for column in que_df:
        que_dict[column] = {}

        for que in list(que_df[column]):
            if type(que) == str:
                que = que.replace("\n", "")

                if "  " in que:
                    ques = que.split("答案:")[0]
                    ques = ques.replace(re.search("(\d+(\.\d+)?)", ques).group(), '').strip()
                    ans = que.split("答案:")[1]
                    ans_dict[ques] = ans.strip()

                else:
                    que_index = que.split(".")[0]
                    if len(que_index) == 2:
                        que = que[3:]
                    else:
                        que = que[2:]

                    que_dict[column][que_index] = {}

                    # que_dict[column][que_index][que] = {j[2: 5]: j.split('\n')[0][5:] for j in list(que_df[column]) if "  " in j and que_index+"." in j}
                    que_dict[column][que_index][que] = {}

                    ls = list(que_df[column])

                    # if que_index == "3":
                    #     pass
                    for j in ls:
                        if type(j) == str:
                            if "  " in j and que_index + "." in j:
                                j = j.strip()
                                if re.search("(\d+(\.\d+)?)", j).group().split('.')[0] != que_index:
                                    pass
                                else:
                                    if len(que_index) == 2:
                                        que_dict[column][que_index][que][j[0: 4]] = j.split('\n')[0][4:]
                                    else:
                                        que_dict[column][que_index][que][j[0: 3]] = j.split('\n')[0][3:]

    return que_dict, ans_dict

@app.route('/check_token', methods=['GET'])
def Check_token():
    """
    用来验证微信公众号后台链接
    :return:
    """
    rq_dict = request.args
    if len(rq_dict) == 0:
        return ""
    signature = request.args.get('signature')   # 提取请求参数
    timestamp = request.args.get('timestamp')
    nonce = request.args.get('nonce')
    echostr = request.args.get('echostr')
    try:
        check_signature(token='jxgj8888', signature=signature, timestamp=timestamp, nonce=nonce)  # 使用wechat库验证
    except InvalidSignatureException as e:
        return ''
    else:
        return echostr  # 返回数据




@app.route('/check_token', methods=['POST'])
def Reply_user():
    """
    用于自动回复客服消息
    :return:
    """

    que_dict, ans_dict = extract_que('问题数据路径')   # 加载问题及回复信息
    req_key_word = que_dict.keys()                                                  # 所有可回复的关键词

    wechat_send_data = request.data                           # 接收消息提醒 为xml格式
    msg = parse_message(wechat_send_data)                     # wechat模块解析数据
    FromUserName = msg.source           # 消息的发送用户
    CreateTime = msg.create_time        # 消息的创建时间
    ToUserName = msg.target             # 消息的目标用户


    duplication_flag = Redis.check_duplication("{}{}".format(FromUserName, CreateTime))  # 消息查重
    if duplication_flag == 1:
        pass
    else:
        print("推送重复")
        return ''   # 若重复 返回1

    if msg.type == "event":               # 为事件消息
        if msg.event == "subscribe":      # 关注事件回复
            return Reply_text(msg, '====自动回复=====\n   欢迎关注, 回复 {} 即可获取相关信息。').format("、".join(req_key_word))
        elif msg.event == "unsubscribe":  # 取关事件回复
            return Reply_text(msg, "====自动回复=====\n  下次再见。")
        elif msg.event == "click" and msg.key == "zhinengkefu":
            return Reply_text(msg, '====自动回复=====\n  回复 {} 即可获取相关信息。').format("、".join(req_key_word))
        else:
            return ''


    elif msg.type == "text":              # 为文本消息

        text_type = Redis.save_uesr_rec(FromUserName)     # 查询用户上次五分钟之内的浏览记录
        send_text = msg.content  # 用户发送的文本消息

        if text_type != False:     # 用户具有上次浏览记录
            text_type = text_type.decode("utf-8")

            if send_text in req_key_word:   # 如果输入为关键词
                Redis.save_uesr_rec(user_id=FromUserName, type='save', rec=send_text)   # 存储本次浏览记录
                ques = que_dict[send_text]
                reply_text = '====自动回复=====\n{}\n  请回复问题前序号, 例"1"'.format(
                    ''.join(["{}.{}\n".format(i, list(ques[i].keys())[0]) for i in ques]))
                return Reply_text(msg, reply_text)

            elif text_type in req_key_word:  # 具有上次浏览记录
                if re.search("(\d+(\.\d+)?)", send_text):   # 如果可以提取出数字
                    Redis.save_uesr_rec(user_id=FromUserName, type='save', rec=text_type)  # 存储本次浏览记录
                    ques_index = re.search("(\d+(\.\d+)?)", send_text).group()
                    try:
                        if "." not in ques_index:  # 询问1级标题
                            ques = list(que_dict[text_type][str(send_text)].values())[0]
                            reply_text = '====自动回复=====\n{}\n  请回复问题前序号, 例"1.1"'.format(
                                ''.join(["{}{}\n".format(i, ques[i]) for i in ques]))
                        else:       # 询问二级标题
                            ques = list(que_dict[text_type][str(send_text.split('.')[0])].values())[0]
                            que_ans = ans_dict[ques[str(send_text)]]
                            
                                return Reply_Article(msg, reply_dic)
                            reply_text = '====自动回复=====\n{}\n'.format(que_ans)
                    except Exception as e:
                        print("失败 输入信息为 {}-{} {}".format(send_text, text_type, e))
                        reply_text = '====自动回复=====\n  我不懂您的意思, 请回复 {} 即可获取相关信息。'.format("、".join(req_key_word))

                else:  # 无法提取出数字 首先进行模糊匹配 匹配失败 返回 如下
                    pro_que = difflib.get_close_matches(send_text, ans_dict.keys(), 1, cutoff=0.6)
                    if pro_que == []:
                        reply_text = '====自动回复=====\n  我不懂您的意思, 请回复 {} 即可获取相关信息。'.format("、".join(req_key_word))
                    else:
                        que = pro_que[0]
                        que_ans = ans_dict[que]
                        reply_text = '====自动回复=====\n 请问您要询问的问题是否是?\n {} \n回复:{}'.format(que, que_ans)
                return Reply_text(msg, reply_text)


        else:      #  用户不具有上次浏览记录
            if send_text in req_key_word:   # 根据用户回复关键字 返回相关问题
                Redis.save_uesr_rec(user_id=FromUserName, type='save', rec=send_text)   # 存储本次浏览记录
                ques = que_dict[send_text]
                reply_text = '====自动回复=====\n{}\n  请回复问题前序号, 例"1"'.format(
                    ''.join(["{}.{}\n".format(i, list(ques[i].keys())[0]) for i in ques]))
            else:  # 若无关键字 首先进行模糊匹配 匹配失败 返回建议信息

                pro_que = difflib.get_close_matches(send_text, ans_dict.keys(), 1, cutoff=0.6)
                if pro_que == []:
                    reply_text = '====自动回复=====\n  我不懂您的意思, 请回复 {} 即可获取相关信息。'.format("、".join(req_key_word))
                else:
                    que = pro_que[0]
                    que_ans = ans_dict[que]
                    reply_text = '====自动回复=====\n 请问您要询问的问题是否是?\n {} \n回复:{}'.format(que, que_ans)
            return Reply_text(msg, reply_text)


if __name__ == '__main__':
    app.debug = True  # 1.0以后版本不通过本方法启动调试模式
    server = pywsgi.WSGIServer(('0.0.0.0', 80), app)
    server.serve_forever()

    # app.run(debug=True, processes=True)

redis操作模块 Check_duplication.py




import redis

class Operation_Redis():

    """
    redis 数据库连接池 及增删改查
    """

    def __init__(self):
        self.host = ""
        self.psw = ""
        self.port = 6379
        self.db = 1
        self.pool = redis.ConnectionPool(host=self.host, password=self.psw, port=self.port, db=self.db, max_connections=50)

    def check_duplication(self, username_time):
        """
        检测键值是否存在于redis 若存在 若存在返回空字符串 不存在则插入
        :param username_time:
        :return:
        """
        r = redis.Redis(connection_pool=self.pool, decode_responses=True)
        flag = r.exists(username_time, '')    # 判断键值是否存在   hexists
        if flag == 0:
            r.set(username_time, '', ex=86400)  # 存储
            return 1
        elif flag == 1:
            return ''


    def save_uesr_rec(self, user_id, type='', rec=''):
        """
        判断用户上一次停留位置
        用户查看问题后 将位置存储mysql 超过五分钟删除
        下次查看 若大于5分钟 则返回重新浏览
        小于 则进入上次停留位置
        :param user_id:
        :param rec:
        :return:
        """
        r = redis.Redis(connection_pool=self.pool, decode_responses=True)
        flag = r.exists(user_id)  # 判断键是否存在   # hexists
        if flag == 1:
            rec = r.get(user_id)
            r.delete(user_id)
            if type != "":
                r.set(user_id, rec, ex=300)

            return rec
        else:
            if type != "":
                r.set(user_id, rec, ex=300)
            return False

Redis = Operation_Redis()

相关演示
flask微信公众号后端自动回复及部署(2)

相关标签: 实战项目