pyzmq的Polling and Sockets 博客分类: python pythonzmq
程序员文章站
2024-02-23 11:58:28
...
Polling and Sockets
一个线程中有多个sokect,同时需要收发数据,zmq提供polling sockets实现,不用在recv()时阻塞。
下面这个例程中创建一个command server来告诉worker何时退出,worker从Publisher获得订阅并打印,('exit'时退出)。
1.PUSH server ,命令服务
import zmq import time import sys import random from multiprocessing import Process def server_push(port="5556"): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:%s" % port) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): if reqnum < 6: socket.send("Continue") else: socket.send("Exit") break time.sleep (1)
2.PUB server,发布消息
def server_pub(port="5558"): context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) publisher_id = random.randrange(0,9999) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): # Wait for next request from client topic = random.randrange(8,10) messagedata = "server#%s" % publisher_id print "%s %s" % (topic, messagedata) socket.send("%d %s" % (topic, messagedata)) time.sleep(1)
3.客户端
def client(port_push, port_sub): context = zmq.Context() socket_pull = context.socket(zmq.PULL) socket_pull.connect ("tcp://localhost:%s" % port_push) print "Connected to server with port %s" % port_push socket_sub = context.socket(zmq.SUB) socket_sub.connect ("tcp://localhost:%s" % port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, "9") print "Connected to publisher with port %s" % port_sub # 初始化Poller poller = zmq.Poller() poller.register(socket_pull, zmq.POLLIN) poller.register(socket_sub, zmq.POLLIN) # Work on requests from both server and publisher should_continue = True while should_continue: socks = dict(poller.poll()) if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: message = socket_pull.recv() print "Recieved control command: %s" % message if message == "Exit": print "Recieved exit command, client will stop recieving messages" should_continue = False if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: string = socket_sub.recv() topic, messagedata = string.split() print "Processing ... ", topic, messagedata
运行
if __name__ == "__main__": # Now we can run a few servers server_push_port = "5556" server_pub_port = "5558" Process(target=server_push, args=(server_push_port,)).start() Process(target=server_pub, args=(server_pub_port,)).start() Process(target=client, args=(server_push_port,server_pub_port,)).start()
看一下api
poll
(timeout=None)
Poll the registered 0MQ or native fds for I/O.
timeout (float, int) – The timeout in milliseconds. If None, no timeout (infinite). This is in milliseconds to be compatible with select.poll() . |
events – The list of events that are ready to be processed. This is a list of tuples of the form (socket, event) , where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to callevents = dict(poller.poll()) , which turns the list of tuples into a mapping ofsocket : event . |
list of tuples |
至于POLLIN,POLLOUT:
- flag (int, default=POLLIN|POLLOUT) – 0MQ poll flags. If flag|POLLIN, recv events will be flushed. If flag|POLLOUT, send events will be flushed. Both flags can be set at once, which is the default.
如此这般,不停的轮询注册在poller中的sockter状态,类似与java nio中将channel注册到selector。发现某个socket数据接收就绪(POLLIN),执行业务代码。
但是,用'if '的处理方式有点丑,所以pyzmq提供实现tornador ioloop的IOStream 的类:ZMQStream 来处理polling event,并且这样就可以使用回调。
首先,安装tornador : pip install tornado
然后,改造上面的代码:
import zmq import time import sys import random from multiprocessing import Process from zmq.eventloop import ioloop, zmqstream ioloop.install()ioloop.install()用来告诉tornador.ioloop.IOLoop使用zmq的poller。
PUSH server和PUB server的代码不用改
把2个处理业务的函数拎出来做回调
def getcommand(msg): print "Received control command: %s" % msg if msg[0] == "Exit": print "Received exit command, client will stop receiving messages" should_continue = False ioloop.IOLoop.instance().stop()#退出请停止 def process_message(msg): print "Processing ... %s" % msg
客户端改成这样:
def client(port_push, port_sub): context = zmq.Context() socket_pull = context.socket(zmq.PULL) socket_pull.connect ("tcp://localhost:%s" % port_push) stream_pull = zmqstream.ZMQStream(socket_pull) stream_pull.on_recv(getcommand) print "Connected to server with port %s" % port_push socket_sub = context.socket(zmq.SUB) socket_sub.connect ("tcp://localhost:%s" % port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, "9") stream_sub = zmqstream.ZMQStream(socket_sub) stream_sub.on_recv(process_message) print "Connected to publisher with port %s" % port_sub ioloop.IOLoop.instance().start() print "Worker has stopped processing messages." if __name__ == "__main__": # Now we can run a few servers server_push_port = "5556" server_pub_port = "5558" Process(target=server_push, args=(server_push_port,)).start() Process(target=server_pub, args=(server_pub_port,)).start() Process(target=client, args=(server_push_port,server_pub_port,)).start()
将原来的socket装饰成zmqstream,然后将ioloop实例run起来,其它就不需要我操心了,妥妥的傻瓜式。
推荐阅读
-
pyzmq的4种模式(REP/REQ)笔记 博客分类: python pythonzmq
-
pyzmq的4种模式(PUSH/PULL)笔记 博客分类: python pythonzmq
-
pyzmq提升REQ/REP模式可靠性(1) 博客分类: python pythonzmq
-
pyzmq的Polling and Sockets 博客分类: python pythonzmq
-
pyzmq的Device笔记 博客分类: python pythonzmq
-
pyzmq提升REQ/REP模式可靠性(1) 博客分类: python pythonzmq
-
pyzmq的Monitor Queue 博客分类: python pythonzmq