pyzmq的Monitor Queue 博客分类: python pythonzmq
程序员文章站
2024-02-23 12:21:04
...
前面讲过zmq的device,用来充当客户端与服务端的中间件,以增加灵活性,让服务端也变成可插拔。然而device是zmq封装好的,怎样才能一窥内部的数据流呢?看图
一看这图就明白了,MonitoredQueue在创建Queue同时,还提供第3个PUB socket来发布途经这个Queue的进出信息。
import time import zmq from zmq.devices.basedevice import ProcessDevice from zmq.devices.monitoredqueuedevice import MonitoredQueue from zmq.utils.strtypes import asbytes from multiprocessing import Process import random frontend_port = 5559 backend_port = 5560 monitor_port = 5562 number_of_workers = 2
创建这个MonitorQueue
def monitordevice(): in_prefix=asbytes('in') out_prefix=asbytes('out') monitoringdevice = MonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, in_prefix, out_prefix) monitoringdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port) monitoringdevice.bind_out("tcp://127.0.0.1:%d" % backend_port) monitoringdevice.bind_mon("tcp://127.0.0.1:%d" % monitor_port) monitoringdevice.setsockopt_in(zmq.HWM, 1) monitoringdevice.setsockopt_out(zmq.HWM, 1) monitoringdevice.start() print "Program: Monitoring device has started"
REP server
def server(backend_port): print "Program: Server connecting to device" context = zmq.Context() socket = context.socket(zmq.REP) socket.connect("tcp://127.0.0.1:%s" % backend_port) server_id = random.randrange(1,10005) while True: message = socket.recv() print "Server: Received - %s" % message socket.send("Response from server #%s" % server_id)
REQ client
def client(frontend_port, client_id): print "Program: Worker #%s connecting to device" % client_id context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://127.0.0.1:%s" % frontend_port) request_num = 1 socket.send ("Request #%s from client#%s" % (request_num, client_id)) # Get the reply. message = socket.recv_multipart() print "Client: Received - %s" % message
最后来一个SUB客户端接收从MonitorQueue的PUB端发布的消息
def monitor(): print "Starting monitoring process" context = zmq.Context() socket = context.socket(zmq.SUB) print "Collecting updates from server..." socket.connect ("tcp://127.0.0.1:%s" % monitor_port) socket.setsockopt(zmq.SUBSCRIBE, "") while True: string = socket.recv_multipart() print "Monitoring Client: %s" % string
然后在不同进程中分别启动以上各项
monitoring_p = Process(target=monitordevice) monitoring_p.start() server_p = Process(target=server, args=(backend_port,)) server_p.start() monitorclient_p = Process(target=monitor) monitorclient_p.start() time.sleep(2) for client_id in range(number_of_workers): Process(target=client, args=(frontend_port, client_id,)).start() time.sleep(10) server_p.terminate() monitorclient_p.terminate() monitoring_p.terminate()
如此即可观察到路过Queue的数据流。
整个过程简单明了,没有什么弯弯绕,感觉zmq上手还是很容易的。
推荐阅读
-
pyzmq的Monitor Queue 博客分类: python pythonzmq
-
pyzmq的4种模式(PAIR)笔记 博客分类: python pythonzmq
-
pyzmq的4种模式(REP/REQ)笔记 博客分类: python pythonzmq
-
pyzmq的4种模式(PUB/SUB)笔记 博客分类: python pythonzmq
-
pyzmq的4种模式(PUSH/PULL)笔记 博客分类: python pythonzmq
-
pyzmq的4种模式(PAIR)笔记 博客分类: python pythonzmq
-
pyzmq提升REQ/REP模式可靠性(1) 博客分类: python pythonzmq
-
pyzmq的Polling and Sockets 博客分类: python pythonzmq
-
pyzmq的Device笔记 博客分类: python pythonzmq
-
pyzmq提升REQ/REP模式可靠性(1) 博客分类: python pythonzmq