自定义消息队列

  传统的网统统信,通常是请求---响应式,以TCP模式为例,在高并发状况下,每每伴随大量的客户端Sokcet请求,服务器要不断处理来自客户端的请求,ServerSocket要不断产生新的子线程去响应客户端的请求,会给服务器带来很大的访问压力。java

  在这种状况下,消息队列可谓为咱们提供了一种新的思路。队列是数据结构中的一种线性表,队列中存储的元素遵照FIFO(First In First Out,先进先出)的规则,这使得队列中的元素是有序的。咱们能够将队列中的元素入队与出队视为生产和消费,此外,再结合发布订阅模式(监听模式),就能够经过队列在不一样的线程之间进行通讯。服务器

  我在这里为你们提供一个简单的自定义消息队列,谨供参考。数据结构

package com.itszt.mq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 一个消息队列监听器,只要生产者生产出消息并推入队列,就会通知处理器执行消费操做
 */
public class PushBlockQueue extends LinkedBlockingQueue<Object> {
    //多线程执行,采用线程池
    private static ExecutorService es = Executors.newFixedThreadPool(10);
    //单例中的饿汉模式,实例化一个队列单例
    private static PushBlockQueue pbq = new PushBlockQueue();
    //状态标识位
    private boolean flag = false;

    private PushBlockQueue() {
    }

    public static PushBlockQueue getInstance() {
        return pbq;
    }

    /**
     * 队列监听启动
     */
    public void start() {
        if (!this.flag) {
            flag = true;
        } else {
            throw new IllegalArgumentException("队列已启动,不可重复启动!");
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (flag) {
                        //从队列中取消息
                        Object obj = take();
                        //线程池派出线程来消费取出的消息
                        es.execute(new PushBlockQueueHandler(obj));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 中止队列监听
     */
    public void stop(){
        this.flag=false;
    }
}
----------------------------------------------------
package com.itszt.mq;

/**
 *至关于队列消息的消费者
 */
public class PushBlockQueueHandler implements Runnable{
    //消费的对象
    private Object obj;

    public PushBlockQueueHandler(Object obj){
        this.obj=obj;
    }
    //消费线程
    @Override
    public void run() {
        doBusiness();
    }
    //消费行为
    private void doBusiness() {
        System.out.println(Thread.currentThread().getName()+"-收到消息:"+obj);
    }
}
-----------------------------------------------
package com.itszt.mq;

import java.util.Scanner;

/**
 * 自定义消息队列测试类
 */
public class MQTest {
    public static void main(String[] args) {
        //获取消息队列的单例,并启动队列监听器
        PushBlockQueue.getInstance().start();
        //循环向队列写入数据
        /**
         * 生产者----生产消息----》入队列----监听器----通知消费者---》消费
         */
        Scanner sc=new Scanner(System.in);
        try {
            while (true){
                String content = sc.nextLine();
                if(content.trim().equals("stop")){
                    System.exit(1);
                }
                PushBlockQueue.getInstance().put(content);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   启动程序,你(主线程)就能够在控制台里经过消息队列向其余线程发布消息了!多线程

中国梦,你们的梦!
pool-1-thread-1-收到消息:中国梦,你们的梦!
为中华民族的伟大复兴而不懈奋斗!
pool-1-thread-2-收到消息:为中华民族的伟大复兴而不懈奋斗! 
相关文章
相关标签/搜索