ZMQ模式详解——发布/订阅模式
一个例子
1.一个服务器负责生成天气相关数据(邮编、温度、湿度),然后将这些数据发布到所有需要知悉天气的客户端;
2.一个客户端需要时刻更新和获取最新的邮政编码,在没有获取之前可默认为纽约地区。
特点:
1.一个发布者,多个订阅者的关系,1:n;
2.当发布者数据变化时发布数据,所有订阅者均能够接收到数据并处理。
这就是发布/订阅模式。
特别提示
1.使用SUB设置一个订阅时,必须使用zmq_setsockopt()对消息进行过滤,例如:
# python:
# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
// c++:
// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
另外值得注意的是:zmq_setsockopt()过滤是根据参数前缀进行过滤的。如下图所示:
当zmq_setsockopt()的第二个参数设置为空时,表示不过滤任何消息:
socket.setsockopt_string(zmq.SUBSCRIBE, '')
setsockopt的详细解释,可参考http://api.zeromq.org/3-2:zmq-setsockopt。
2. PUB-SUB模式是异步的
订阅者调用zmq_send()来发送消息是会报错的,同样发布者使用zmq_recv()来接收消息也会报错。
3.PUB和SUB谁bind谁connect并无严格要求(虽本质并无区别),但仍建议PUB使用bind,SUB使用connect
4.”slow joiner”可能导致发布者的第一笔消息总是丢失
由于套接字建立连接需要时间,如果在发布者已经发出消息,但订阅者仍未就绪的情况下就可能出现订阅者无法收到消息的情况。为了解决这个问题,需要做发布者订阅者的同步机制。
同步机制探讨:
- 通过delay或sleep延迟启动:这不是一个好方法,由于不同机器性能不同,可能导致需要延迟的时间不同,无法根本解决这个问题,同时这种编码方式也过于粗暴;
-
在丢失的消息不影响程序正确性的基础上,可以不用考虑同步。
为了进一步探讨同步机制,可参考:http://zguide.zeromq.org/page:all#sockets-and-patterns
5.一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没。
例如:
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")
socket.connect("tcp://localhost:5557")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
# Process 5 updates
total_temp = 0
while True:
string = socket.recv_string()
#zipcode, temperature, relhumidity = string.split()
print(string)
可以看到是交替接收的。
在实际使用过程中,需要区分是哪一个发布者发过来的消息,可直接在发送消息时在前面加上发布者特有标识,在订阅者使用:
subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
进行过滤。
当一个订阅者订阅了多个发布者时,还比较容易处理(如上例所示),但当该订阅者还有其他类型的endpoint时,就需要考虑怎么保证从两个endpoint获取数据时,一方不会阻塞另外一方。
为此,zmq提供了两种方法:
import zmq
import time
# Prepare our context and sockets
context = zmq.Context()
# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
# Process messages from both sockets
# We prioritize traffic from the task ventilator
while True:
# Process any waiting tasks
while True:
try:
msg = receiver.recv(zmq.DONTWAIT)
except zmq.Again:
break
# process task
# Process any waiting weather updates
while True:
try:
msg = subscriber.recv(zmq.DONTWAIT)
except zmq.Again:
break
# process weather update
# No activity, so sleep for 1 msec
time.sleep(0.001)
import zmq
# Prepare our context and sockets
context = zmq.Context()
# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
# Process messages from both sockets
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
break
if receiver in socks:
message = receiver.recv()
# process task
if subscriber in socks:
message = subscriber.recv()
# process weather update
6.如果一个发布者(publisher)没有任何订阅者(subcriber)连接,则发布者会简单的丢弃所有的消息。
7.如果使用TCP同时订阅者(subcriber)很慢,这会导致消息在发布者(publisher)端排队,造成消息堆积影响程序性能,为了解决这个问题,需要合理设置high-water mark(高水位线)。
ZeroMQ uses the concept of HWM (high-water mark) to define the capacity of its internal pipes. Each connection out of a socket or into a socket has its own pipe, and HWM for sending, and/or receiving, depending on the socket type. Some sockets (PUB, PUSH) only have send buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both send and receive buffers.
When your socket reaches its HWM, it will either block or drop data depending on the socket type. PUB and ROUTER sockets will drop data if they reach their HWM, while other socket types will block. Over the inproc transport, the sender and receiver share the same buffers, so the real HWM is the sum of the HWM set by both sides.
要注意的是,PUB、ROUTER套接字在到达HWM后会丢弃数据,其他的会阻塞。另外,inproc的transport,发送端和接收端共享同一个缓存,因此实际HWM是两者HWM之和。
下面是对必须处理未知订阅方的发布方来说的一个更明智的“最佳实践”:
总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.