Django通道简要介绍
根据官方文档,Django 通道是:
一个让 Django 不仅可以处理纯 HTTP 请求,包括 HTTP2 和WebSockets,也有能力在请求发送到缩略图或后台运算时也能运行运行代码。
如果你以前用过 Django,你就知道 project 是多重要。目前 Django 的诸多特性依赖于库,比如 Celery(在请求之外处理复杂任务),或者 Node.js,dijango-websocket-redis,或者 gevent-stocketio 来支持 WebSocket。
由于 Celery 的原因(它是一个事实标准),其他所有的实现方法以非标准的形式在 Django 的局限内有各自的问题。我们在以往的博文里提到了成功实现的不同方式。
一个合乎标准的方式更容易维护,更安全,多数开发者熟悉其内容也更容易交接。
在这篇博客中我会快速的介绍开发应用 Django 通道网站所涉及的概念,同时介绍一个用 WebSocket 给客户端推送通知的例子。
应用
我们举的例子是对用 gevent-socketio 实现博客实时通知应用的修改。目的是让你看到用 Django 通道在同样的条件下实施会有多简单,代码可以在GitHub上找到。
面向事件的Diango
默认的 Django 网站请求-响应模型:一个请求进来,被传递给视图,视图产生一个回应,然后回应被发送到客户端,所有一切都是单线程完成的。
在大多数应用中都完全胜任,但它有自己的局限。如果是多个请求就会让工作进程持续好长时间,后续的请求要排队等待。这就是用 Celery 来做缩略图之类事情的原因:在图片上传时,我们建立缩略图任务并及时响应到客户端,在此同时 Celery 在自己的进程中处理图片。
同样的情况会发生在和客户端的实时双向对话中。设想一个请求-响应,我们需要一个进程对一个客户端来收发消息直到连接终止。
Django 通道提供了另一个模型:面向事件的模型。在这个模型中,事件取代了请求和响应。一个请求事件被接收到会被传递给合适的处理者来产生一个新的响应事件被传回到客户端。
事件模型可以应用到其他情况而不只是对请求-响应模型的模仿。比如由外界条件触发的传感器,它产生一个事件给事件处理者,然后会依次产生另一个事件通知所有对原始事件感兴趣的人。
但是这个进程怎么工作的?我们需要在开始实例前认识下channel。
什么是通道
根据 Django 通道的官方文档,通道是:
…一个有序的,先进先出的消息队列,通常一次只有一个消息接收者。
多个生产者将消息写入 channel(用一个名字来识别),然后一个用户订阅了那么 channel 就可用,它会取出队列中的第一条消息。就是这么简单。
通道改变了Django的工作方式,让它像worker一样工作。每个worker听从通道上所有用户的吩咐当有消息是用户会被通知。要想这事发生,我们需要三个层:
接口服务器:连接网站与客户端,通过一个 WSGIn接头和一个独立的WebSocket服务器。
通道后端:它在接口和worker间传递消息。(为单一服务器提供存储,一个数据库或者Redis),Python 代码都在这里。
worker:它们收听所有的通道,当消息来时唤醒用户(函数)。
接口服务器把连接(HTTP,WebSocket等)转换成通道中的消息,worker负责处理这些消息。这里的门道在于消息不需要从接口服务器产生。消息可以在任何地方产生,view,form,signal,随你心意。
是时候干活了。
我们的第一个用户
我们从安装Django1.8(1.9也行)开始。首先,需要安装安装 channel 包,它是依赖 PyPi 的。如果你想安装最新版本的通道,看看官方介绍文档。
接下来把 channel 加入到 INSTALLED_APPS 设置中:
INSTALLED_APPS = ( ... 'channels_test', # Our test app 'channels', )
就是这样。通道默认配置使用在内存中的后端,它能很好的在单个服务器的网站进行工作。
我们将要写一个简单的用户来接收“http.message”通道上的消息,然后回应通道一个新消息。让我们在测试 Django 应用这中建立一个模组叫“consumer.py”:
# consumers.py from json import dumps from django.http import HttpResponse from django.utils.timezone import now def http_consumer(message): response = HttpResponse( "It is now {} and you've requested {} with {} as request parameters.".format( now(), message.content['path'], dumps(message.content['get']) ) ) message.reply_channel.send(response.channel_encode())
消息从标准“reques.http”通道中出来,用户写一个新消息在回应通道中回应。值得注意的是,他们是两种不同的通道:普通通道传递消息给用户,和回应通道。只用接口服务器侦听回应通道,它知道那个通道连接那个客户端,所以他知道回应该发给谁。
在开始进程前,我们需要一个方法来告诉 Django 将“request.http”通道消息发送给我们的新用户。在设置中继续创建一个模组叫“routing.py”:
channel_routing = { "http.request": "channels_test.consumers.http_consumer" }
现在我们运行服务器(是开发服务器或者 WSGI 服务器无关紧要),给我们的网站一个请求:
$ curl https://localhost:8000/some/path?foo=bar It is now 2016-02-01 11:49:25.166799+00:00 and you've requested /some/path with {"foo": ["bar"]} as request parameters.
我们得到了想要的请求,通道解决了我们大部分问题。现在让我们玩点更有趣的。
实时通知
我们已经多次提及了实时通知的要点,这给了我们一个很好的机会来应用一个通道,并比较了两个解决方案。
我们将改进现有的项目,追踪用户在特定地理位置发出有趣的实时通知。在这个项目中我们用到了 gevent-socketio], SocketIO, 和 RabbitMQ(还有 Node.js)。我们将用通道、普通WebSockets和Redis来做同样的事情。
如前所述,我们用 WebStocket 推送通知到客户端。通道对 WebStocket 有完整的支持。所有我们要做的不过是在我们的“tracker”应用中加上几个通道:
# routing.py channel_routing = { "websocket.connect": "tracker.consumers.websocket_connect", "websocket.keepalive": "tracker.consumers.websocket_keepalive", "websocket.disconnect": "tracker.consumers.websocket_disconnect" }
我们不在意“websocket.message”通道也不打算接收用户消息。我们的目标是向所有连接的客户端发出推送消息。用一个群来做这件事非常容易。让我们看看用户:
# tracker/consumers.py import logging from channels import Group from channels.sessions import channel_session from channels.auth import channel_session_user_from_http logger = logging.getLogger(__name__) # Connected to websocket.connect and websocket.keepalive @channel_session_user_from_http def websocket_connect(message): logger.info('websocket_connect. message = %s', message) # transfer_user(message.http_session, message.channel_session) Group("notifications").add(message.reply_channel) # Connected to websocket.keepalive @channel_session def websocket_keepalive(message): logger.info('websocket_keepalive. message = %s', message) Group("notifications").add(message.reply_channel) # Connected to websocket.disconnect @channel_session def websocket_disconnect(message): logger.info('websocket_disconnect. message = %s', message) Group("notifications").discard(message.reply_channel)
无论何时一个客户端连接,就会有一条消息通过“websocket.connect”发送,然后我们需要加上一个回应通道,还要把它放进“notifications”群。群允许我们同时发送相同的消息到所有的通道。我们要保持通道群的更新,即当客户端连接,我们将回应通道就啊如群;断开连接,将它移除。群也会在一定时间后清理通道,我们用“websocket.keepalive”通道把接收到keepalive消息的通道加入到“notifications”群。如果通道已存在不会被再次加入。
注意我们没有向群里发送任何东西。我们要在 AreaOfInterest 中的 Incident 被报告或更新时通知用户。我们简单的加入 post_save 信号:
# signals.py import logging from json import dumps from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from channels import Group from .models import Incident, AreaOfInterest logger = logging.getLogger(__name__) def send_notification(notification): logger.info('send_notification. notification = %s', notification) Group("notifications").send({'text': dumps(notification)}) @receiver(post_save, sender=Incident) def incident_post_save(sender, **kwargs): send_notification({ 'type': 'post_save', 'created': kwargs['created'], 'feature': kwargs['instance'].geojson_feature }) if not kwargs['instance'].closed: areas_of_interest = [ area_of_interest.geojson_feature for area_of_interest in AreaOfInterest.objects.filter( polygon__contains=kwargs['instance'].location, severity__in=kwargs['instance'].alert_severities, ) ] if areas_of_interest: send_notification(dict( type='alert', feature=kwargs['instance'].geojson_feature, areas_of_interest=[ { 'id': area_of_interest['id'], 'name': area_of_interest['properties']['name'], 'severity': area_of_interest['properties']['severity'], 'url': area_of_interest['properties']['url'], } for area_of_interest in areas_of_interest ] )) @receiver(post_save, sender=AreaOfInterest) def area_of_interest_post_save(sender, **kwargs): send_notification({ 'type': 'post_save', 'created': kwargs['created'], 'feature': kwargs['instance'].geojson_feature }) @receiver(post_delete, sender=Incident) @receiver(post_delete, sender=AreaOfInterest) def post_delete(sender, **kwargs): send_notification({ 'type': 'post_delete', 'feature': kwargs['instance'].geojson_feature })
所有消息都发生在 send_notification,如你所见只是一行代码(回头去看旧的实现方法)。其余代码都和以前的一样。
至此,我们只用了内存通道后端,要使我们的统治系统在多服务器环境中工作,我们需要使用数据库后端或者 Redis 后端。让我们用后者,必须在setting.py模组中加入以下代码片段:
CHANNEL_LAYERS = { "default": { "BACKEND": "asgi_redis.RedisChannelLayer", "CONFIG": { "hosts": [("localhost", 6379)], }, "ROUTING": "tracker_project.routing.channel_routing", }, }
让这个后端工作,我们还需要安装 asgi_redis 包(内存和数据库层包含在通道包)。最后一件要在 Django 这边做的更该是建一个 asgi.py 模组,它的功能相当于WSGI服务器的wsgi.py。
# asgi.py import os from channels.asgi import get_channel_layer os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tracker_project.settings") channel_layer = get_channel_layer()
这个模块将会被用来运行接口服务器。
我们还需要把客户端代码用 WebSocket 取代 Scoket.io,鉴于它们非常相似,我们不去涉及细节,你可以去这里看看。
剩下的就是运行服务器了,为了开发,我们可以用 runserver 管理命令。它已经修改用来运行 Daphne ASGI 服务器和一个 worker:
(tracker_project_venv)$ ./manage.py runserver Worker thread running, channels enabled Performing system checks... System check identified no issues (0 silenced). February 01, 2016 - 13:19:43 Django version 1.8.8, using settings 'tracker_project.settings' Starting development server at https://127.0.0.1:8000/ Quit the server with CONTROL-C.
在真实环境中,我们需要运行 Daphne 和足够多的 worker。Daphne 这样运行:
(tracker_project_venv)$ daphne tracker_project.asgi:channel_layer
我们在 asgi.py 模组告诉 Daphne 去寻找通道层。每个 worker 这样运行:
(venv)$ python ./manage.py runworker
所有东西具备且运行,你就可以报告事件接收通知,如果视图是打开的。
授权认证说明
我们没有说到的是 WebSocket 是有授权认证的
WebSocket 的初始连接是普通的 HTTP 请求,所以我们可以利用 Django 现有的通过会话管理运用通道的@channel_session_user_from_http 装饰器。如果用户没有得到授权(相当于没有有效会话)便不能回复通道消息,也不会加入“notification”群。
我们要确保构建 WebSocket 是通过了会话密钥:
var socket = new WebSocket('ws://localhost:8000?session_key=' + sessionKey);
还不完善,但我们实现了。有更多的 WebSocket 授权方式(我偏爱给予令牌的系统,比如 JWT),但要接受这个需要另一篇文章了。
结论
Django 通道将会根本上改变我们工作的方式。它让我们生活更容易,使我们能用 Django 网站解决更广泛地问题,但这个项目最大的特点在于:他是完全易于操作的。在我们的例子中,我们只是引入了我们喜爱的通道而已。我们不需要改变给予请求-响应的原有代码。我们得到了世上最好的。
通道还没有为生产环境准备好,但他一旦被集成到下一版本的 Django 中,不需要多长时间就能成为一个稳定的框架。另一个很酷的事情是,它会作为一个 Django 1.8 的外部应用,你可以将它集成到你现有的网站。