Django实现聊天机器人
演示效果如下所示:
实现原理
用户在聊天界面调用celery异步任务,celery异步任务执行完毕后发送结果给channels,然后channels通过websocket将结果实时推送给用户。对于简单的算术运算,celery一般自行计算就好了。对于网上查找诗人简介这样的任务,celery会调用python爬虫(requests+parsel)爬取古诗文网站上的诗人简介,把爬取结果实时返回给用户。
接下来我们来看下具体的代码实现吧。
第一步 安装环境依赖
首先在虚拟环境中安装django和以下主要项目依赖。本项目使用了最新版本,为3.x版本。
# 主要项目依赖 pip install django pip install channels pip install channels_redis pip install celery pip install redis pip install eventlet # windows only # 爬虫依赖 pip install requests pip install parsel
新建一个名为myproject的项目,新建一个app名为bots。如果windows下安装报错,如何解决自己网上去找吧,很容易解决。修改settings.py, 将channels和chat加入到installed_apps里,并添加相应配置,如下所示:
installed_apps = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'channels', # channels应用 'bots', # bots应用 ] # 设置asgi应用 asgi_application = 'myproject.asgi.application' # 生产环境中使用redis做后台,安装channels_redis import os channel_layers = { "default": { "backend": "channels_redis.core.redischannellayer", "config": { "hosts": [os.environ.get('redis_url', 'redis://127.0.0.1:6379/2')], }, }, }
最后将bots应用的urls.py加入到项目urls.py中去,这和常规django项目无异。
# myproject/urls.py from django.conf.urls import include from django.urls import path from django.contrib import admin urlpatterns = [ path('bots/', include('bots.urls')), path('admin/', admin.site.urls), ]
第二步 配置celery
pip安装好celery和redis后,我们要对其进行配置。分别修改myproject目录下的__init__.py和celery.py(新建), 添加如下代码:
# __init__.py from .celery import app as celery_app __all__ = ('celery_app',) # celery.py import os from celery import celery # 设置环境变量 os.environ.setdefault('django_settings_module', 'myproject.settings') # 实例化 app = celery('myproject') # namespace='celery'作用是允许你在django配置文件中对celery进行配置 # 但所有celery配置项必须以celery开头,防止冲突 app.config_from_object('django.conf:settings', namespace='celery') # 自动从django的已注册app中发现任务 app.autodiscover_tasks() # 一个测试任务 @app.task(bind=true) def debug_task(self): print(f'request: {self.request!r}')
接着修改settings.py, 增加如下celery配置:
# celery配置 celery_broker_url = "redis://127.0.0.1:6379/0" celery_timezone = time_zone # celery内容等消息的格式设置,默认json celery_accept_content = ['application/json', ] celery_task_serializer = 'json' celery_result_serializer = 'json'
完整celery配置见:django进阶:万字长文教你使用celery执行异步和周期性任务(多图)
第三步 编写机器人聊天主页面
本例我们只需要利用django普通视图函数编写1个页面,用于展示首页(index)与用户交互的聊天页面。这个页面对应的路由及视图函数如下所示:
# bots/urls.py from django.urls import path from . import views urlpatterns = [ path('', views.index, name='index'), ] # bots/views.py from django.shortcuts import render def index(request): return render(request, 'bots/index.html', {})
接下来我们编写模板文件index.html,它的路径位置如下所示:
bots/ __init__.py templates/ bots/ index.html urls.py views.py
index.html内容如下所示。
<!doctype html> <html> <head> <meta charset="utf-8"/> <title>django+channels+celery聊天机器人</title> </head> <body> <textarea id="chat-log" cols="100" rows="20" readonly></textarea> <br/> <input id="chat-message-input" type="text" size="100" placeholder="输入`help`获取帮助信息."/><br/><input id="chat-message-submit" type="button" value="send"/> <script> var wss_protocol = (window.location.protocol == 'https:') ? 'wss://': 'ws://'; var chatsocket = new websocket( wss_protocol + window.location.host + '/ws/bots/' ); chatsocket.onopen = function(e) { document.queryselector('#chat-log').value += ('欢迎来到大江狗django聊天机器人. 请输入`help`获取帮助信息.\n')} chatsocket.onmessage = function(e) { var data = json.parse(e.data); var message = data['message']; document.queryselector('#chat-log').value += (message + '\n'); }; chatsocket.onclose = function(e) { document.queryselector('#chat-log').value += ('socket closed unexpectedly, please reload the page.\n')}; document.queryselector('#chat-message-input').focus(); document.queryselector('#chat-message-input').onkeyup = function(e) { if (e.keycode === 13) { // enter, return document.queryselector('#chat-message-submit').click(); } }; document.queryselector('#chat-message-submit').onclick = function(e) { var messageinputdom = document.queryselector('#chat-message-input'); var message = messageinputdom.value; chatsocket.send(json.stringify({ 'message': message })); messageinputdom.value = ''; }; </script> </body> </html>
第四步 编写后台websocket路由及处理方法
当 channels 接受 websocket 连接时, 它也会根据根路由配置去查找相应的处理方法。只不过channels的websocket路由不在urls.py中配置,处理函数也不写在views.py。在channels中,这两个文件分别变成了routing.py和consumers.py。
在bots应用下新建routing.py, 添加如下代码。它的作用是将发送至ws/bots/的websocket请求转由botconsumer处理。
from django.urls import re_path from . import consumers websocket_urlpatterns = [ re_path(r'ws/bots/$', consumers.botconsumer.as_asgi()), ]
注意:定义websocket路由时,推荐使用常见的路径前缀 (如/ws) 来区分 websocket 连接与普通 http 连接, 因为它将使生产环境中部署 channels 更容易,比如nginx把所有/ws的请求转给channels处理。
与django类似,我们还需要把这个app的websocket路由加入到项目的根路由中去。编辑myproject/asgi.py, 添加如下代码:
# myproject/asgi.py import os from channels.auth import authmiddlewarestack from channels.routing import protocoltyperouter, urlrouter from django.core.asgi import get_asgi_application import chat.routing import bots.routing os.environ.setdefault("django_settings_module", "myproject.settings") application = protocoltyperouter({ "http": get_asgi_application(), # websocket请求使用的路由 "websocket": authmiddlewarestack( urlrouter( bots.routing.websocket_urlpatterns ) ) })
接下来在bots应用下新建consumers.py, 添加如下代码:
import json from asgiref.sync import async_to_sync from channels.generic.websocket import websocketconsumer from . import tasks commands = { 'help': { 'help': '命令帮助信息.', }, 'add': { 'args': 2, 'help': '计算两个数之和, 例子: `add 12 32`.', 'task': 'add' }, 'search': { 'args': 1, 'help': '通过名字查找诗人介绍,例子: `search 李白`.', 'task': 'search' }, } class botconsumer(websocketconsumer): def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] response_message = '请输入`help`获取命令帮助信息。' message_parts = message.split() if message_parts: command = message_parts[0].lower() if command == 'help': response_message = '支持的命令有:\n' + '\n'.join( [f'{command} - {params["help"]} ' for command, params in commands.items()]) elif command in commands: if len(message_parts[1:]) != commands[command]['args']: response_message = f'命令`{command}`参数错误,请重新输入.' else: getattr(tasks, commands[command]['task']).delay(self.channel_name, *message_parts[1:]) response_message = f'收到`{message}`任务.' async_to_sync(self.channel_layer.send)( self.channel_name, { 'type': 'chat.message', 'message': response_message } ) def chat_message(self, event): message = event['message'] # send message to websocket self.send(text_data=json.dumps({ 'message': f'[机器人]: {message}' }))
上面代码中最重要的一行如下所示。botconsumer在接收到路由转发的前端消息后,对其解析,将当前频道名和解析后的参数一起交由celery异步执行。celery执行任务完成以后会将结果发到这个频道,这样就实现了channels和celery的通信。
getattr(tasks, commands[command]['task']).delay(self.channel_name, *message_parts[1:])
第五步 编写celery异步任务
在bots目录下新建`tasks.py`,添加如下代码:
from asgiref.sync import async_to_sync from celery import shared_task from channels.layers import get_channel_layer from parsel import selector import requests channel_layer = get_channel_layer() @shared_task def add(channel_name, x, y): message = '{}+{}={}'.format(x, y, int(x) + int(y)) async_to_sync(channel_layer.send)(channel_name, {"type": "chat.message", "message": message}) print(message) @shared_task def search(channel_name, name): spider = poemspider(name) result = spider.parse_page() async_to_sync(channel_layer.send)(channel_name, {"type": "chat.message", "message": str(result)}) print(result) class poemspider(object): def __init__(self, keyword): self.keyword = keyword self.url = "https://so.gushiwen.cn/search.aspx" def parse_page(self): params = {'value': self.keyword} response = requests.get(self.url, params=params) if response.status_code == 200: # 创建selector类实例 selector = selector(response.text) # 采用xpath选择器提取诗人介绍 intro = selector.xpath('//textarea[starts-with(@id,"txtareauthor")]/text()').get() print("{}介绍:{}".format(self.keyword, intro)) if intro: return intro print("请求失败 status:{}".format(response.status_code)) return "未找到诗人介绍。"
以上两个任务都以channel_name为参数,任务执行完毕后通过channel_layer的send方法将结果发送到指定频道。
注意:
- 默认获取channel_layer的方式是调用接口:channels.layers.get_channel_layer()。如果是在consumer中调用接口的话可以直接使用self.channel_layer。
- 对于channel layer的方法(包括send()、group_send(),group_add()等)都属于异步方法,这意味着在调用的时候都需要使用await,而如果想要在同步代码中使用它们,就需要使用装饰器asgiref.sync.async_to_sync
第六步 运行看效果
如果不出意外,你现在的项目布局应该如下所示。说实话,整个项目一共没几个文件,python的简洁和效率真是出了名的好啊。
连续运行如下命令,就可以看到我们文初的效果啦。
# 启动django测试服务器 python manage.py makemigrations python manage.py migrate python manage.py runserver # windows下启动celery需eventlet # 启动celery前确定redis服务已开启哦 celery -a myproject worker -l info -p eventlet
小结
本文我们使用django + channels + celery + redis打造了一个聊天机器人,既会算算术,还会查古诗文。借用这个实现原理,你可以打造非常有趣的实时聊天应用哦,比如在线即时问答,在线客服,实时查询订单,django版的siri美女等等。
django channels + websocket + celery聊天机器人项目源码地址:
以上就是django实现聊天机器人的详细内容,更多关于django 聊天机器人的资料请关注其它相关文章!