欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

Python gevent + zmq + redis 搭配小实践

程序员文章站 2022-05-22 13:25:17
...

前些日子与 darkforday 交流时听说了 Python 中 基于 libev 封装的 gevent 库, 编码和性能上也是值得关注的. 简单介绍下 gevent (来自 tutorial ): “ gevent is a concurrency library based around libev. It provides a clean API for a variety of concu

前些日子与 darkforday 交流时听说了 Python 中
基于 libev 封装的 gevent 库, 编码和性能上也是值得关注的.

简单介绍下 gevent (来自 tutorial ):

gevent is a concurrency library based around libev.
It provides a clean API for a variety of concurrency
and network related tasks.

翻译即:
gevent 是一个基于 libev的并发库, 提供了大量清楚简洁的并发和网络
相关任务的API

现在在使用 Python 工作开发, 刚好可以使用 gevent 试试, 先给
自己定一个小需求, 写一个小demo实践.
那么场景来了:
开发一个小服务程序, 负责从 redis 中 pop 消息, 然后将消息打包后
交给 [zeromq] 的 DEALER socket 转发, 如果使用C/C++实现,
想到的有以下实现:
1. 使用boost的asio接口, 自己实现 redis pop 协议接收消息, 需要 boost buffer缓存.
zeromq dealer 注册到 zmq poller 中, 定期的接收消息

2. 使用 libev libevent 等接口, 方法与1相似

3. 封装 epoll, 然后再与1相似, 自己造*实现可调整的 buffer.

当然, Python 有现成的 redis-py 和 pyzmq 实现接口, 另外pyzmq 支持
gevent 调用, 即将其 socket 交给 gevent 管理. 而 redis-py 也可以将其
socket 类更换为 gevent 的socket[ 参见博客 Concurrent connections to Redis with gevent and redis-py ].
那么两个库的socket类都使用了 gevent 的 socket 类替代(这个想法也许可以用在C/C++中改进).

代码参考:
gevent with redis-py
gevent with zmq and zmq poller

完整的demo实现代码放在 github gist

代码详解:
本机测试定义地址 Redis-server 地址和 Router 服务
地址(Router代码未放上来)

# test ip and port 
REDIS_IP = "127.0.0.1"
REDIS_PORT = 6379
ROUTER_IP = "127.0.0.1"
ROUTER_PORT = 59144 # also we could use 44944, just interesting number in chinese :) 

导入 gevent 库, 在 1.0.1版本后 gevent.init.py 里
all未引入 socket, 所以在不修改源码的情况下, 使用
from … import … 语句使用 gevent.socket

# gevent, we use gevent to manipulate io tasks (here all socket classes)
import gevent
# after gevent 1.0.1 in   __init__.py, the  __all__ has no 'socket' attribute   
# another way:   from ... import ... 
from gevent import socket as gevent_socket 

导入 redis, 引入相关类型和定义 Redis key

# redis
import redis.connection
# redis's socket resign to gevent.socket 
redis.connection.socket = gevent_socket
#  Instantiate two redis client
# redis_pop_client        pop list 
# redis_setvalue_client   set value 
from redis import RedisError
from redis import ConnectionPool
from redis import Redis  
REDIS_CONNECTION_POOL = ConnectionPool(max_connections = 8, host = REDIS_IP, port = REDIS_PORT)
redis_pop_client = Redis(connection_pool=REDIS_CONNECTION_POOL)
redis_setvalue_client = Redis(connection_pool=REDIS_CONNECTION_POOL)
# Redis key 
TEST_REDIS_LIST_KEY = "kaka:test:list"
TEST_REDIS_SETVALUE_KEY = "kaka:test:key"

导入 zmq, pyzmq 的 zmq.green 基于gevent实现

# zmq, based on gevent  
from zmq import green as zmq
context = zmq.Context()
dealer = context.socket(zmq.DEALER)
dealer.set_hwm(1000) # zmq  default  high water mark is 1000, explicitly  here 
router_addr = "tcp://" + ROUTER_IP + ":" + str(ROUTER_PORT)
dealer.connect(router_addr)
# regiser dealer in zmq poller, 
# in each poll task, poller will check dealer ZMQ_POLLIN event 
poller = zmq.Poller()
poller.register(dealer, zmq.POLLIN)

在定义完成业务逻辑代码后, 将业务逻辑的函数引用传入 gevent.spawn中,
再整合成一个 task list, 最后 gevent.joinall(tasks) 进入 gevent loop
处理 Redis 与 zmq的网络通信任务.

参考资料:
gevent源码分析-(深度分析gevent运行流程)
gevent 官方文档
gevent 官方文档 中文
gevent socket
gevent pool
gevent with redis-py
gevent with zmq

题外话:
1. gevent 的协程式的异步开发给自己之前写的C/C++网络通信开发
提供了一个新的思路.
2. 在Python中把 zmq 和 redis的socket类都交给 gevent管理,
得益于Python 语言和第三方接口的支持, 在之前使用C/C++或者Lua
时并未很好地实现(造*).
3. 写完小例子后, 跑去 * 回答之前看过的问题了 :)

原创文章,转载请注明: 转载自kaka_ace's blog

本文链接地址: Python gevent + zmq + redis 搭配小实践