在讲ZeroMQ前先给你们讲一下什么是消息队列。java
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。其实简单点说,消息队列就是如何使各分载器如何实现负载均衡使得完成分布式目标。服务器
ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、链接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ是网络通讯中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。ZeroMQ几乎全部的I/O操做都是异步的,主线程不会被阻塞。ZeroMQ会根据用户调用zmq_init函数时传入的接口参数,建立对应数量的I/O Thread。每一个I/O Thread都有与之绑定的Poller,Poller采用经典的Reactor模式实现,Poller根据不一样操做系统平台使用不一样的网络I/O模型(select、poll、epoll、devpoll、kequeue等)。主线程与I/O线程经过Mail Box传递消息来进行通讯。Server开始监听或者Client发起链接时,在主线程中建立zmq_connecter或zmq_listener,经过Mail Box发消息的形式将其绑定到I/O线程,I/O线程会把zmq_connecter或zmq_listener添加到Poller中用以侦听读/写事件。Server与Client在第一次通讯时,会建立zmq_init来发送identity,用以进行认证。认证结束后,双方会为这次链接建立Session,之后双方就经过Session进行通讯。每一个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是经过plugin到Session中的Engine来与kernel交换I/O数据。网络
【1】Request-Response多线程
由请求端发起请求,而后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端能够是1~N个。该模型主要用于远程调用及任务分配等。Echo服务就是这种经典模型的应用。架构
下面经过Java实现这一模型:负载均衡
server port异步
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;socket
public class Server {tcp
public static void main(String[] args) throws InterruptedException { //实现服务器端的上下文及套接字 Context context = ZMQ.context(1); Socket responder = context.socket(ZMQ.REP); //使服务器端经过tcp协议通讯,监听5555端口 responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received Hello"); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(), 0); } //关闭服务器端的上下文及套接字 responder.close(); context.close(); }
}分布式
client port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Client {
public static void main(String[] args) { //创立客户端的上下文捷套接字 Context context = ZMQ.context(1); System.out.println("Connecting to hello world server…"); Socket requester = context.socket(ZMQ.REQ); //讲客户端绑定在5555端口 requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 100; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello " + requestNbr); requester.send(request.getBytes(), 0); byte[] reply = requester.recv(0); System.out.println("Received " + new String(reply) + " " + requestNbr); } //关闭客户端的上下文套接字 requester.close(); context.term(); }
}
【2】Publisher/Subscriber model
发布端单向分发数据,且不关心是否把所有信息发送给订阅端。若是发布端开始发布信息时,订阅端还没有链接上来,则这些信息会被直接丢弃。订阅端未链接致使信息丢失的问题,能够经过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的状况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝能够应用这种经典模型。
Server Port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class ZMQ_PUB {
public static void main(String[] args) throws InterruptedException { Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5555"); Thread.sleep(3000); for(int i=0;i<100;i++){ publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK); System.out.println("pub msg " + i); Thread.sleep(1000); } context.close(); publisher.close(); }
}
Client Port
import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class ZMQ_SUB { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://localhost:5555"); subscriber.subscribe("".getBytes()); for (int i=0;i<100;i++) { //Receive a message. String string = new String(subscriber.recv(0)); System.out.println("recv 1" + string); } //关闭套接字和上下文 subscriber.close(); context.term(); } }
【3】push/pull
push port import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Push { public static void main(String[] args) { Context context = ZMQ.context(1); Socket push = context.socket(ZMQ.PUSH); push.bind("ipc://fjs"); for (int i = 0; i < 10000000; i++) { push.send("hello".getBytes(), i); } push.close(); context.term(); } }
pull port
import java.util.concurrent.atomic.AtomicInteger;
import org.zeromq.ZMQ;
public class Pull {
public static void main(String args[]) { final AtomicInteger number = new AtomicInteger(0); for (int i = 0; i < 5; i++) { new Thread(new Runnable(){ private int here = 0; public void run() { // TODO Auto-generated method stub ZMQ.Context context = ZMQ.context(1); ZMQ.Socket pull = context.socket(ZMQ.PULL); pull.connect("ipc://fjs"); //pull.connect("ipc://fjs"); while (true) { String message = new String(pull.recv()); int now = number.incrementAndGet(); here++; if (now % 1000000 == 0) { System.out.println(now + " here is : " + here); } } } }).start(); } }
}
备注说明:
【1】如何利用Java使用ZeroMQ
首先下载zmq所需的zip包,解压之后将libzmq.dll和jzmq.dll文件放到本身电脑中的jdk安装路径中的bin文件夹下,最后须要将以前解压后的zmq.jar包放在项目的lib中或者
zeromq资源下载:
连接:http://pan.baidu.com/s/1miuvSfQ 密码:ttss
项目源码下载连接:
连接:http://pan.baidu.com/s/1dE5Plr7 密码:vqze