并发编程之:并行程序设计模式

1.Future模式

实现方式:java

首先实现Data接口服务器

public interface Data {
    String getResult();
}

实现真正执行业务的实现类并发

public class RealData implements Data{

    private final String result;

    public RealData(String param){
        StringBuffer sb = new StringBuffer();
        for (int i=0;i<10;i++){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sb.append(param).append(i);
        }
        result = sb.toString();
    }

    @Override
    public String getResult() {
        return result;
    }
}

实现真实业务类的代理类app

public class FutureData implements Data{

    protected RealData realData;
    protected boolean isReady = false;

    public synchronized void setRealData(RealData realData){
        if(isReady){
            return;
        }
        isReady = true;
        this.realData = realData;
        notifyAll();
    }

    public synchronized String getResult() {
        while (!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getResult();
    }

}

经过client开启线程,获取futureDataide

public class Client {

    public Data request(final String name){

        final FutureData futureData = new FutureData();
        new Thread(){
            @Override
            public void run() {
                futureData.setRealData(new RealData(name));
            }
        }.start();

        return futureData;
    }
}

测试工具

public static void main(String[] str){

        Client client = new Client();
        System.out.println("请求开始");
        Data data = client.request("jok");
        try {
            System.out.println("作其余事情");
            Thread.sleep(6000);
            System.out.println("其余事情完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(data.getResult());
    }

JDK实现方式测试

首先实现业务类this

public class RealData implements Callable<String> {

    private String name;

    public RealData(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        //只是业务实现
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i<10;i++){
            stringBuffer.append(name).append(i);
            Thread.sleep(1000);
        }
        return stringBuffer.toString();
    }
}

执行方法spa

public static void main(String[] args) throws InterruptedException, ExecutionException {
    FutureTask<String> future = new FutureTask<String>(new RealData("jok"));
    ExecutorService service = Executors.newCachedThreadPool();
    //开始执行
    System.out.println("开始执行");
    service.execute(future);
    //作其余事情
    System.out.println("执行其余事情");
    Thread.sleep(6000);
    System.out.println("执行其余事情完成");
    System.out.println(future.get());
}

2.Master-Worker模式

master-worker模式的核心思想是,系统由两类进程协做工做:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。线程

由5个worker线程执行100个任务的实现例子

worker工人类

public abstract class Worker implements Runnable{
    //任务子队列,用于取得子任务
    private Queue<Object>  workQueue;
    //子任务结果集
    private Map<String,Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    /**
     * 子任务处理逻辑,在子类中实现具体逻辑
     * @param input
     * @return
     */
    public abstract Object handle(Object input);

    @Override
    public void run() {
        while (true){
            //从任务列表获取子任务
            Object input = workQueue.poll();
            if(input == null)
                break;
            //处理子任务
            Object result = handle(input);
            //将结果存入到map中
            resultMap.put(Integer.toString(input.hashCode()),result);
        }
    }
}

Master类实现

public class Master {
    //任务队列
    protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<Object>();
    //Worker进程队列
    protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
    //子任务处理结果集
    protected Map<String,Object> resultMap = new ConcurrentHashMap<String,Object>();

    /**
     * 判断任务是否已经所有结束
     * @return
     */
    public boolean isComplete(){
        for (Map.Entry<String,Thread> entry : threadMap.entrySet()) {
            if(entry.getValue().getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    public Master(Worker worker,int workerCount){
        worker.setWorkQueue(workerQueue);
        worker.setResultMap(resultMap);
        for (int i=0;i<workerCount;i++){
            threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
        }
    }

    /**
     * 提交任务
     * @param o
     */
    public void submit(Object o){
        workerQueue.add(o);
    }

    /**
     * 获取子任务结果集
     * @return
     */
    public Map<String,Object> getResultMap(){
        return resultMap;
    }

    /**
     * 开始执行因此子任务
     */
    public void execute(){
        for (Thread thread : threadMap.values()){
            thread.start();
        }
    }

}

实现业务逻辑的worker类

/**
 * 计算传入integer数据立方
 * @authorAdministrator
 * @date2017/3/19
 */
public class PlusWorker extends Worker {

    @Override
    public Object handle(Object input) {
        Integer in = (Integer)input;
        return in*in*in;
    }
}

执行任务和结果处理代码

public static void main(String[] args){
    //5个工做线程
    Master master = new Master(new PlusWorker(),5);
    //提交100个任务
    for (int i=0;i<100;i++)
        master.submit(i);
    //开始执行,由5个线程执行100个任务
    master.execute();

    String key ;
    int result = 0;
    while (master.resultMap.size()>0 || !master.isComplete()){//当map中还存在结果,或者任务没有执行完时进入循环取结果
        for (Map.Entry<String,Object> entry :master.resultMap.entrySet()) {
            key = entry.getKey();
            result += (Integer) entry.getValue();
            master.resultMap.remove(key);//从resultMap中获取结果,取出一个就从map中移除一个
        }
    }
    //执行结束
    System.out.println(result);
}

3.Guarded Suspension模式

该模式为保护暂停,其核心思想就是当有多个请求过来时,会先将请求放入一个列表中,而后有单独的线程从列表中取出请求进行处理。这样的模式能够避免当请求并发过大的时候不会给服务器形成过大压力。

下面是有返回值的请求实例:

定义response接口

public interface Response {
    String getResult();
}

response实际实现类

public class RealResponse implements Response {

    private String result;

    public void setResult(String result){
        this.result = result;
    }

    @Override
    public String getResult() {
        return result;
    }
}

代理实现类

public class FutureResponse implements Response {

    private RealResponse realResponse;

    private boolean isReady = false;

    public synchronized void setRealResponse(RealResponse realResponse){
        if(isReady){
            return;
        }
        this.realResponse = realResponse;
        isReady = true;
        notifyAll();
    }

    @Override
    public synchronized String getResult() {
        if(!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realResponse.getResult();
    }
}

request请求对象

public class Request {

    private String name;

    private FutureResponse futureResponse;

    public void setFutureResponse(FutureResponse futureResponse) {
        this.futureResponse = futureResponse;
    }

    public void setName(String name) {
        this.name = name;
    }

    public FutureResponse getFutureResponse() {
        return futureResponse;
    }

    public String getName() {
        return name;
    }
}

存放request对象的列表工具

public class RequestQueue {
    private Queue<Request> requestQueue = new ConcurrentLinkedQueue<Request>();

    public synchronized Request getRequest(){
        if(requestQueue.size()==0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return requestQueue.poll();
    }
    public synchronized void setRequest(Request request){
        requestQueue.add(request);
        notifyAll();
    }

}

客户端程序

public class ClientThread extends Thread {

    private RequestQueue requestQueue;

    private List<Request>  myRequest = new ArrayList<Request>();

    public ClientThread(RequestQueue requestQueue){
        this.requestQueue = requestQueue;
    }

    @Override
    public void run() {
        for (int i=0;i<10;i++){
            Request request = new Request();
            FutureResponse futureResponse = new FutureResponse();
            //构造请求request
            request.setName("jok"+i);
            request.setFutureResponse(futureResponse);
            requestQueue.setRequest(request);
            myRequest.add(request);
        }
        System.out.println("请求执行构造发送完成");

    }

    public void showResult(){
        for (Request request:myRequest) {
            System.out.println(request.getName()+":执行结果="+request.getFutureResponse().getResult());
        }
    }
}

服务端程序

public class ServerThread extends Thread {

    private RequestQueue requestQueue = new RequestQueue();

    public RequestQueue getRequestQueue() {
        return requestQueue;
    }

    @Override
    public void run() {
        while (true){
            final Request request = requestQueue.getRequest();
            String result = "";
            try {
                //处理业务
                result = request.getName()+"---do";
                System.out.println("请求处理开始"+request.getName());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RealResponse realResponse = new RealResponse();
            realResponse.setResult(result);
            request.getFutureResponse().setRealResponse(realResponse);
            System.out.println("处理请求"+request.getName()+"结束"+result);
        }
    }
}

测试方法

public class Test {

    public static void main(String[] args){
        //服务器启动
        ServerThread serverThread = new ServerThread();
        serverThread.start();
        System.out.println("服务器启动完毕");

        //客户端开始
        System.out.println("客户端开始执行");
        ClientThread clientThread = new ClientThread(serverThread.getRequestQueue());
        clientThread.start();
        System.out.println("客户端发送完毕");

        clientThread.showResult();
    }
}

4.生产者-消费者模式

首先定义任务数据

public final class PCData {
    private final int data;

    public PCData(int data){
        this.data = data;
    }

    public int getData() {
        return data;
    }

    @Override
    public String toString() {
        return "data="+data;
    }
}

生产者代码

public class Producer implements Runnable {

    private volatile boolean isRunning = true;

    private BlockingQueue<PCData> blockingQueue;//共享空间

    private static AtomicInteger count = new AtomicInteger();

    public Producer(BlockingQueue<PCData> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {

        PCData data = null;
        try {
            while (isRunning){
                Thread.sleep(2000);
                data = new PCData(count.incrementAndGet());
                System.out.println("生产者线程"+Thread.currentThread().getId()+"生产数据:"+count);
                blockingQueue.offer(data,2, TimeUnit.SECONDS);//添加数据到阻塞队列
            }
            System.out.println("生产者线程中止"+Thread.currentThread().getId());
        }catch (Exception e){
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop(){
        isRunning = false;
    }
}

消费者代码

public class Consumer implements Runnable {

    private BlockingQueue<PCData> blockingQueue;

    public Consumer(BlockingQueue<PCData> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            PCData data = null;
            while (true){
                data = blockingQueue.take();//若是队列里面没有会wait等待数据放入
                if(data != null){
                    int n = data.getData()*data.getData();
                    System.out.println("消费者线程"+Thread.currentThread().getId()+"消费数据"+ MessageFormat.format("{0}*{1}={2}",data.getData(),data.getData(),n));
                    Thread.sleep(4000);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

测试程序代码

//定义阻塞队列做为共享数据空间
BlockingQueue<PCData> blockingQueue = new LinkedBlockingQueue<PCData>();
//生产者
Producer p = new Producer(blockingQueue);
Producer p2 = new Producer(blockingQueue);
//消费者
Consumer c = new Consumer(blockingQueue);
Consumer c2 = new Consumer(blockingQueue);
//线程池执行
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p);
service.execute(p2);
service.execute(c);
service.execute(c2);
Thread.sleep(6000);
p.stop();
p2.stop();
Thread.sleep(2000);
service.shutdown();
相关文章
相关标签/搜索