这一节做为上一节多线程的延续,先说一下java原生的阻塞队列(Blocking Queue),以后再说一下JMS(Java Messaging Service,java消息服务)以及它的实现之一ActiveMQ消息队列,因此都归并到消息服务中讨论。java
BlockingQueue也是java.util.concurrent下的接口,它解决了多线程中如何高效传输数据的问题,经过这些高效而且线程安全的类,咱们能够搭建高质量的多线程程序。 主要用来控制线程同步的工具。 BlockingQueue是一个接口,里面的方法以下:数据库
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit);
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
复制代码
public class Product implements Runnable{
BlockingQueue<String> queue;
public Product(BlockingQueue<String> queue) {
//建立对象时就传入一个阻塞队列
this.queue = queue;
}
@Override
public void run(){
try {
System.out.println(Thread.currentThread().getName()+"开始生产");
String temp = Thread.currentThread().getName()+":生产线程";
queue.put(temp);//向队列中放数据,若是队列是满的话,会阻塞当前线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
消费者Consumer:apache
public class Consumer implements Runnable{
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
//使用有参构造函数的目的是我在建立这个消费者对象的时候就能够传进来一个队列
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
try {
while(true){
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName()+ "准备消费...");
String temp = queue.take();//从队列中取任务消费,若是队列为空,会阻塞当前线程
System.out.println(Thread.currentThread().getName() + " 获取到工做任务==== " +temp);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
测试类TestQueue:数组
public class TestQueue {
public static void main(String[] args) {
//新建一个阻塞队列,队列长度是5
BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
//BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
Consumer consumer = new Consumer(queue);
Product product = new Product(queue);
for(int i = 0;i<3;i++){
new Thread(product,"product"+i).start();
}
//for (int i = 0;i<5;i++){
new Thread(consumer,"consumer").start();
//}
}
}
复制代码
整套代码的意思就是初始化一个消息队列,里面放String类型,队列长度是5,使用生产者线程来模拟三个用户发出请求,把用户的请求数据暂时放在BlockingQueue队列里面,随后消费者线程不断的从队列里面取任务进行业务逻辑处理,直到队列里面消费的什么都不剩了。由此能够看出消息队列有两大特色:解耦和削峰填谷。生产者和消费者毛关系没有,生产者往队列里放数据,消费者从队列里取数据,它们都跟队列创建关系,解耦;生产者若是并发量很高也只不过是把数据先放到队列里,消费者能够慢慢吃,实际中不会立马拖垮服务端。 参考地址:http://blog.csdn.net/ghsau/article/details/8108292安全
JMS即Java消息服务(Java Message Service)用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。JMS是一种与厂商(或者说是平台)无关的 API。相似于JDBC(Java Database Connectivity):这里,JDBC 是能够用来访问许多不一样关系数据库的 API,而 JMS 则提供一样与厂商无关的访问方法,以访问消息收发服务。 许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ等等。 JMS 可让你经过消息收发服务从一个 JMS 客户机向另外一个 JMS客户机发送消息。 消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成;消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,能够将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。bash
JMS由如下元素组成: JMS提供者provider:面向消息中间件的,JMS规范的一个实现。提供者能够是Java平台的JMS实现,也能够是非Java平台的面向消息中间件的适配器。 JMS客户:生产或消费基于消息的Java应用程序或对象(即生产者和消费者都统称JMS客户)。 JMS生产者:建立并发送消息的JMS客户。 JMS消费者:接收消息的JMS客户。 JMS消息:能够在JMS客户之间传递数据的对象 JMS队列:一个容纳被发送的正在等待阅读的消息的区域。一个消息若是被阅读,它将被从队列中移走。 JMS主题:一种支持发送消息给多个订阅者的机制。微信
ActiveMQ是JMS规范的一种实现,下面说如何使用session
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector uri="http://localhost:8081"/>
<transportConnector uri="udp://localhost:61618"/>
</transportConnectors>
复制代码
测试代码以下: 生产者Product:数据结构
public class Product {
private String username = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageProducer producer = null;
/**
* @Description 初始化方法
* @Author 刘俊重
* @Date 2017/12/20
*/
private void init() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void productMessage(String message) throws JMSException {
this.init();
TextMessage textMessage = session.createTextMessage(message);
connection.start();
System.out.println("生产者准备发送消息:"+textMessage);
producer.send(textMessage);
System.out.println("生产者已发送完毕消息。。。");
}
public void close() throws JMSException {
System.out.println("生产者开始关闭链接");
if(null!=producer){
producer.close();
}
if(null!=session){
session.close();
}
if(null!=connection){
connection.close();
}
}
}
复制代码
消费者Consumer:多线程
public class Consumer implements MessageListener,ExceptionListener{
private String name = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private ActiveMQConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageConsumer consumer = null;
public static Boolean isconnection=false;
/**
* @Description 链接ActiveMQ
* @Author 刘俊重
* @Date 2017/12/20
*/
private void init() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(name,password,url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
public void consumerMessage() throws JMSException {
this.init();
connection.start();
//设置消息监听和异常监听
consumer.setMessageListener(this);
connection.setExceptionListener(this);
System.out.println("消费者开始监听....");
isconnection = true;
//Message receive = consumer.receive();
}
public void close() throws JMSException {
if(null!=consumer){
consumer.close();
}
if(null!=session){
session.close();
}
if(null!=connection){
connection.close();
}
}
/**
* 异常处理函数
*/
@Override
public void onException(JMSException exception) {
//发生异常关闭链接
isconnection = false;
}
/**
* 消息处理函数
*/
@Override
public void onMessage(Message message) {
try {
if(message instanceof TextMessage){
TextMessage textMsg = (TextMessage) message;
String text = textMsg.getText();
System.out.println("消费者接收到的消息======="+text);
}else {
System.out.println("接收的消息不符合");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
复制代码
注意:消费者须要实现MessageListener和ExceptionListener来监听收到消息和出错时的处理。 生产者测试类TestProduct:
public class TestProduct {
public static void main(String[] args) throws JMSException {
for(int i=0;i<100;i++){
Product product = new Product();
product.productMessage("Hello World!"+i);
product.close();
}
}
}
复制代码
TestProduct是用来模拟生成100条消息,写入到ActiveMQ队列中。 消费者测试类TestConsumer:
public class TestConsumer implements Runnable {
static Thread thread = null;
public static void main(String[] args) throws InterruptedException {
thread = new Thread(new TestConsumer());
thread.start();
while (true){
//时刻监听消息队列,若是线程死了,则新建一个线程
boolean alive = thread.isAlive();
System.out.println("当前线程状态:"+alive);
if(!alive){
thread = new Thread(new TestConsumer());
thread.start();
System.out.println("线程重启完成");
}
Thread.sleep(1000);
}
}
@Override
public void run() {
try {
Consumer consumer = new Consumer();
consumer.consumerMessage();
while (Consumer.isconnection) {
//System.out.println(123);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
复制代码
TestConsumer这里用了多线程,保证时刻有个线程活着等着接收ActiveMQ的消息队列并调用消费者处理。 总结:个人理解是线程间通讯使用queue,如BlockingQueue,进程间通讯使用JMS,如ActiveMQ。 另附上一篇将58架构师沈剑老师写的消息队列的文章,能够做为参考:http://dwz.cn/78yLxL 须要强调的是任何一项技术的引用都要为解决业务问题服务,而不能是单纯的炫技。举个例子,就拿消息服务来讲,好比用户注册某个网站,注册完了以后我要调用邮件和短信服务给他发通知,我可能还要经过他填的信息,给他推荐一下可能认识的用户,那么这里核心业务是注册,其它的发通知和推荐用户就能够放在消息队列里处理,先响应注册信息,随后调用其它服务来处理发通知和推荐用户这两个业务。可是网站前期可能用户量比较少,不用消息队列就能知足个人需求了,引用消息队列反而会增长项目的复杂性,因此新技术的使用必定是须要解决业务的问题,而不是单纯的炫技。 参考文档: http://blog.csdn.net/fanzhigang0/article/details/43764121 http://blog.csdn.net/u010702229/article/details/18085263
附一下我的微信公众号,欢迎跟我交流。