所以我正在阅读
this article关于如何在ZMQ中为(X)PUB /(X)SUB消息传递创建代理/代理.有什么建筑应该是这样漂亮的图片:
但是当我看到XSUB
socket description时,由于其出局路由策略是N / A,因此我没有通过它转发所有订阅
那么在ZeroMQ中如何实现(un)订阅转发,这样的转发应用程序的最小用户代码是什么(可以在简单的Publisher和Subscriber样本之间插入)?
解决方法
XPUB接收消息 – 接收到的唯一消息是已连接订户的订阅,这些消息应通过XSUB上传到上游.
xpub = ctx.socket(zmq.XPUB) xpub.bind(xpub_url) xsub = ctx.socket(zmq.XSUB) xsub.bind(xsub_url) pub = ctx.socket(zmq.PUB) pub.bind(pub_url) zmq.proxy(xpub,xsub,pub)
这将将消息传递到/从xpub和xsub.或者,您可以添加PUB套接字来监视任一方向通过的流量.
如果您希望中间的用户代码实现额外的路由逻辑,您将执行此操作,
它重新实现了zmq_proxy的内部循环:
def broker(ctx): xpub = ctx.socket(zmq.XPUB) xpub.bind(xpub_url) xsub = ctx.socket(zmq.XSUB) xsub.bind(xsub_url) poller = zmq.Poller() poller.register(xpub,zmq.POLLIN) poller.register(xsub,zmq.POLLIN) while True: events = dict(poller.poll(1000)) if xpub in events: message = xpub.recv_multipart() print "[BROKER] subscription message: %r" % message[0] xsub.send_multipart(message) if xsub in events: message = xsub.recv_multipart() # print "publishing message: %r" % message xpub.send_multipart(message) # insert user code here