Python gevent + zmq + redis 搭配小实践
前些日子与 darkforday 交流时听说了 Python 中 基于 libev 封装的 gevent 库, 编码和性能上也是值得关注的. 简单介绍下 gevent (来自 tutorial ): “ gevent is a concurrency library based around libev. It provides a clean API for a variety of concu
前些日子与 darkforday 交流时听说了 Python 中
基于 libev 封装的 gevent 库, 编码和性能上也是值得关注的.
简单介绍下 gevent (来自 tutorial ):
“
gevent is a concurrency library based around libev.
It provides a clean API for a variety of concurrency
and network related tasks.
翻译即:
gevent 是一个基于 libev的并发库, 提供了大量清楚简洁的并发和网络
相关任务的API
“
现在在使用 Python 工作开发, 刚好可以使用 gevent 试试, 先给
自己定一个小需求, 写一个小demo实践.
那么场景来了:
开发一个小服务程序, 负责从 redis 中 pop 消息, 然后将消息打包后
交给 [zeromq] 的 DEALER socket 转发, 如果使用C/C++实现,
想到的有以下实现:
1. 使用boost的asio接口, 自己实现 redis pop 协议接收消息, 需要 boost buffer缓存.
zeromq dealer 注册到 zmq poller 中, 定期的接收消息
2. 使用 libev libevent 等接口, 方法与1相似
3. 封装 epoll, 然后再与1相似, 自己造*实现可调整的 buffer.
当然, Python 有现成的 redis-py 和 pyzmq 实现接口, 另外pyzmq 支持
gevent 调用, 即将其 socket 交给 gevent 管理. 而 redis-py 也可以将其
socket 类更换为 gevent 的socket[ 参见博客 Concurrent connections to Redis with gevent and redis-py ].
那么两个库的socket类都使用了 gevent 的 socket 类替代(这个想法也许可以用在C/C++中改进).
代码参考:
gevent with redis-py
gevent with zmq and zmq poller
完整的demo实现代码放在 github gist
代码详解:
本机测试定义地址 Redis-server 地址和 Router 服务
地址(Router代码未放上来)
# test ip and port REDIS_IP = "127.0.0.1" REDIS_PORT = 6379 ROUTER_IP = "127.0.0.1" ROUTER_PORT = 59144 # also we could use 44944, just interesting number in chinese :)
导入 gevent 库, 在 1.0.1版本后 gevent.init.py 里
all未引入 socket, 所以在不修改源码的情况下, 使用
from … import … 语句使用 gevent.socket
# gevent, we use gevent to manipulate io tasks (here all socket classes) import gevent # after gevent 1.0.1 in __init__.py, the __all__ has no 'socket' attribute # another way: from ... import ... from gevent import socket as gevent_socket
导入 redis, 引入相关类型和定义 Redis key
# redis import redis.connection # redis's socket resign to gevent.socket redis.connection.socket = gevent_socket # Instantiate two redis client # redis_pop_client pop list # redis_setvalue_client set value from redis import RedisError from redis import ConnectionPool from redis import Redis REDIS_CONNECTION_POOL = ConnectionPool(max_connections = 8, host = REDIS_IP, port = REDIS_PORT) redis_pop_client = Redis(connection_pool=REDIS_CONNECTION_POOL) redis_setvalue_client = Redis(connection_pool=REDIS_CONNECTION_POOL) # Redis key TEST_REDIS_LIST_KEY = "kaka:test:list" TEST_REDIS_SETVALUE_KEY = "kaka:test:key"
导入 zmq, pyzmq 的 zmq.green 基于gevent实现
# zmq, based on gevent from zmq import green as zmq context = zmq.Context() dealer = context.socket(zmq.DEALER) dealer.set_hwm(1000) # zmq default high water mark is 1000, explicitly here router_addr = "tcp://" + ROUTER_IP + ":" + str(ROUTER_PORT) dealer.connect(router_addr) # regiser dealer in zmq poller, # in each poll task, poller will check dealer ZMQ_POLLIN event poller = zmq.Poller() poller.register(dealer, zmq.POLLIN)
在定义完成业务逻辑代码后, 将业务逻辑的函数引用传入 gevent.spawn中,
再整合成一个 task list, 最后 gevent.joinall(tasks) 进入 gevent loop
处理 Redis 与 zmq的网络通信任务.
参考资料:
gevent源码分析-(深度分析gevent运行流程)
gevent 官方文档
gevent 官方文档 中文
gevent socket
gevent pool
gevent with redis-py
gevent with zmq
题外话:
1. gevent 的协程式的异步开发给自己之前写的C/C++网络通信开发
提供了一个新的思路.
2. 在Python中把 zmq 和 redis的socket类都交给 gevent管理,
得益于Python 语言和第三方接口的支持, 在之前使用C/C++或者Lua
时并未很好地实现(造*).
3. 写完小例子后, 跑去 * 回答之前看过的问题了 :)
原创文章,转载请注明: 转载自kaka_ace's blog
本文链接地址: Python gevent + zmq + redis 搭配小实践