欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

pyzmq的Monitor Queue 博客分类: python pythonzmq

程序员文章站 2024-02-23 12:21:04
...

    前面讲过zmq的device,用来充当客户端与服务端的中间件,以增加灵活性,让服务端也变成可插拔。然而device是zmq封装好的,怎样才能一窥内部的数据流呢?看图pyzmq的Monitor Queue
            
    
    博客分类: python pythonzmq

    一看这图就明白了,MonitoredQueue在创建Queue同时,还提供第3个PUB socket来发布途经这个Queue的进出信息。

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes
from multiprocessing import Process
import random

frontend_port = 5559
backend_port = 5560
monitor_port = 5562
number_of_workers = 2

   创建这个MonitorQueue

def monitordevice():
    in_prefix=asbytes('in')
    out_prefix=asbytes('out')
    monitoringdevice = MonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, in_prefix, out_prefix)
    
    monitoringdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port)
    monitoringdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    monitoringdevice.bind_mon("tcp://127.0.0.1:%d" % monitor_port)
    
    monitoringdevice.setsockopt_in(zmq.HWM, 1)
    monitoringdevice.setsockopt_out(zmq.HWM, 1)
    monitoringdevice.start()  
    print "Program: Monitoring device has started"

   REP server

def server(backend_port):
    print "Program: Server connecting to device"
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://127.0.0.1:%s" % backend_port)
    server_id = random.randrange(1,10005)
    while True:
        message = socket.recv()
        print "Server: Received - %s" % message  
        socket.send("Response from server #%s" % server_id)

   REQ client

def client(frontend_port, client_id):
    print "Program: Worker #%s connecting to device" % client_id
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:%s" % frontend_port)
    request_num = 1
    socket.send ("Request #%s from client#%s" % (request_num, client_id))
    #  Get the reply.
    message = socket.recv_multipart()
    print "Client: Received - %s" % message

   最后来一个SUB客户端接收从MonitorQueue的PUB端发布的消息

def monitor():
    print "Starting monitoring process"
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    print "Collecting updates from server..."
    socket.connect ("tcp://127.0.0.1:%s" % monitor_port)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    while True:
        string = socket.recv_multipart()
        print "Monitoring Client: %s" % string

   然后在不同进程中分别启动以上各项

monitoring_p = Process(target=monitordevice)
monitoring_p.start()  
server_p = Process(target=server, args=(backend_port,))
server_p.start()  
monitorclient_p = Process(target=monitor)
monitorclient_p.start()  
time.sleep(2)   

for client_id in range(number_of_workers):
    Process(target=client, args=(frontend_port, client_id,)).start()

time.sleep(10)
server_p.terminate()
monitorclient_p.terminate()
monitoring_p.terminate()

    如此即可观察到路过Queue的数据流。

    整个过程简单明了,没有什么弯弯绕,感觉zmq上手还是很容易的。

 

 

相关标签: python zmq