ZeroMQ(也称为 ØMQ,0MQ 或 zmq)是一个可嵌入的网络通信库(对 Socket 进行了封装)。 它提供了携带跨越多种传输协议(如:进程内,进程间,TCP 和多播)的原子消息的 sockets 。 有了ZeroMQ,咱们能够经过发布-订阅、任务分发、和请求-回复等模式来创建 N-N 的 socket 链接。 ZeroMQ 的异步 I / O 模型为咱们提供可扩展的基于异步消息处理任务的多核应用程序。 它有一系列语言API(几乎囊括全部编程语言),并可以在大多数操做系统上运行。java
传统的 TCP Socket 的链接是1对1的,能够认为“1个 socket = 1个链接”,每个线程独立维护一个 socket 。可是 ZMQ 摒弃了这种1对1的模式,ZMQ的 Socket 能够很轻松地实现1对N和N对N的链接模式,一个 ZMQ 的 socket 能够自动地维护一组链接,用户没法操做这些链接,用户只能操做套接字而不是链接自己。因此说在 ZMQ 的世界里,链接是私有的。python
ZMQ 提供了三种基本的通讯模型,分别是 Request-Reply 、Publish-Subscribe 和 Parallel Pipeline ,接下来举例说明三种模型并给出相应的代码实现。编程
以 “Hello World” 为例。客户端发起请求,并等待服务端回应请求。客户端发送一个简单的 “Hello”,服务端则回应一个 “World”。能够有 N 个客户端,一个服务端,所以是 1-N 链接。json
服务端代码以下:服务器
import org.zeromq.ZMQ; public class hwserver { public static void main(String[] args) throws InterruptedException { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket responder = context.socket(ZMQ.REP); responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received" + new String(request)); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(),0); } responder.close(); context.term(); } }
import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() print("Received request: %s" % message) # Do some 'work' time.sleep(1) socket.send(b"World")
客户端代码以下:网络
import org.zeromq.ZMQ; public class hwclient { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket requester = context.socket(ZMQ.REQ); requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 10; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello" + requestNbr); requester.send(request.getBytes(),0); byte[] reply = requester.recv(0); System.out.println("Reveived " + new String(reply) + " " + requestNbr); } requester.close(); context.term(); } }
import zmq context = zmq.Context() print("Connecting to hello world server...") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") for request in range(10): print("Sending request %s ..." % request) socket.send(b"Hello") message = socket.recv() print("Received reply %s [ %s ]" % (request,message))
从以上的过程,咱们能够了解到使用 ZMQ 写基本的程序的方法,须要注意的是:dom
下面以一个天气预报的例子来介绍该模式。异步
服务端不断地更新各个城市的天气,客户端能够订阅本身感兴趣(经过一个过滤器)的城市的天气信息。socket
服务端代码以下:tcp
import org.zeromq.ZMQ; import java.util.Random; public class wuserver { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5556"); publisher.bind("icp://weather"); Random srandom = new Random(System.currentTimeMillis()); while (!Thread.currentThread().isInterrupted()) { int zipcode, temperature, relhumidity; zipcode = 10000 + srandom.nextInt(10000); temperature = srandom.nextInt(215) - 80 + 1; relhumidity = srandom.nextInt(50) + 10 + 1; String update = String.format("%05d %d %d", zipcode, temperature, relhumidity); } publisher.close(); context.term(); } }
from random import randrange import zmq context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
客户端代码以下:
import org.zeromq.ZMQ; import java.util.StringTokenizer; public class wuclient { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket suscriber = context.socket(ZMQ.SUB); suscriber.connect("tcp://localhost:5556"); String filter = (args.length > 0) ? args[0] : "10001"; suscriber.suscribe(filter.getBytes()); //过滤条件 int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { String string = suscriber.recvStr(0).trim(); StringTokenizer sscanf = new StringTokenizer(string, " "); int zipcode = Integer.valueOf(sscanf.nextToken()); int temperature = Integer.valueOf(sscanf.nextToken()); int relhumidity = Integer.valueOf(sscanf.nextToken()); total_temp += temperature; } System.out.println("Average temperature for zipcode '" + filter + "' was " + (int) (total_temp / update_nbr)); suscriber.close(); context.term(); } }
import sys import zmq context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:5556") zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" if isinstance(zip_filter, bytes): zip_filter = zip_filter.decode('ascii') socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) total_temp = 0 for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print("Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)))
服务器端生成随机数 zipcode、temperature、relhumidity 分别表明城市代码、温度值和湿度值,而后不断地广播信息。而客户端经过设置过滤参数,接受特定城市代码的信息,最终将收集到的温度求平均值。
须要注意的是:
Parallel Pipeline 处理模式以下:
import org.zeromq.ZMQ; import java.io.IOException; import java.util.Random; import java.util.StringTokenizer; public class taskvent { public static void main(String[] args) throws IOException { ZMQ.Context context = new ZMQ.context(1); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.bind("tcp://*:5557"); ZMQ.Socket sink = context.socket(ZMQ.PUSH); sink.connect("tcp://localhost:5558"); System.out.println("Please enter when the workers are ready: "); System.in.read(); System.out.println("Sending task to workes\n"); sink.send("0",0); Random srandom = new Random(System.currentTimeMillis()); int task_nbr; int total_msec = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; workload = srandom.nextInt(100) + 1; total_msec += workload; System.out.print(workload + "."); String string = String.format("%d", workload); sender.send(string, 0); } System.out.println("Total expected cost: " + total_msec + " msec"); sink.close(); sender.close(); context.term(); } }
import zmq import time import random try: raw_input except NameError: raw_input = input context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") print("Please enter when workers are ready: ") _ = raw_input() print("Sending tasks to workers...") sink.send(b'0') random.seed() total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send_string(u'%i' % workload) print("Total expected cost: %s msec" % total_msec) time.sleep(1)
import org.zeromq.ZMQ; public class taskwork { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.connect("tcp://localhost:5557"); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.connect("tcp://localhost:5558"); while (!Thread.currentThread().isInterrupted()) { String string = receiver.recv(0).trim(); Long mesc = Long.parseLong(string); System.out.flush(); System.out.print(string + "."); Sleep(mesc); sender.send("".getBytes(), 0); } sender.close(); receiver.close(); context.term(); } }
import zmq import time import sys context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") while True: s = receiver.recv() sys.stdout.write('.') sys.stdout.flush() time.sleep(int(s) * 0.001) sender.send(b'')
import org.zeromq.ZMQ; public class tasksink { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.bind("tcp://*:5558"); String string = new String(receiver.recv(0)); long tstart = System.currentTimeMillis(); int task_nbr; int total_mesc = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { string = new String(receiver.recv(0).trim()); if ((task_nbr / 10) * 10 == task_nbr) { System.out.print(":"); } else { System.out.print("."); } } long tend = System.currentTimeMillis(); System.out.println("\nTotal elapsed time: " + (tend - tstart) + "msec"); receiver.close(); context.term(); } }
import time import zmq import sys context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") s = receiver.recv() tstart = time.time() for task_nbr in range(1, 100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') sys.stdout.flush() tend = time.time() print("Total elapsed time: %d msec" % ((tend - tstart) * 1000))
如下两点须要注意:
欢迎进入博客 :linbingdong.com 获取最新文章哦~
欢迎关注公众号: FullStackPlan 获取更多干货哦~