使多个对象都有机会处理请求,从而避免了请求的发送者和接收者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。多线程
当业务逻辑比较复杂且是异步请求时,咱们可使用责任链模型来划分职责,分工明确,减轻业务逻辑复杂度;可使用异步模型来优化责任链的处理性能。异步
import lombok.Data; /** * <p> * Request * </p> * * @author: kancy * @date: 2019/10/21 9:42 **/ @Data public class Request { private long Id; private String name; }
/** * <p> * ITaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public interface ITaskProcesser { void process(Request request); }
import java.util.concurrent.LinkedBlockingQueue; /** * <p> * AbstractThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public abstract class AbstractThreadTaskProcesser extends Thread implements ITaskProcesser { /** * 任务队列 */ private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue(); /** * 下一个处理器 */ private ITaskProcesser nextProcesser; /** * 是否中止线程 */ private volatile boolean isStop = false; /** * 线程是否运行 */ private boolean isRun = false; public AbstractThreadTaskProcesser() { setThreadTaskName(); } public AbstractThreadTaskProcesser(ITaskProcesser nextProcesser) { setThreadTaskName(); this.nextProcesser = nextProcesser; } /** * 责任链处理 * @param request */ @Override public void process(Request request) { // 添加请求任务 addRequestToQueue(request); // 启动线程 if (!isRun){ synchronized (this){ if (!isRun){ super.start(); isRun = true; } } } } /** * 异步线程处理:真正的在处理业务逻辑 */ @Override public void run() { while (!isStop){ try { // 取出一个任务 Request request = queue.take(); // 处理当前任务须要作的事情 boolean result = doRequest(request); // 处理成功后,将任务请求交给下一个任务处理器 if (result && nextProcesser != null){ nextProcesser.process(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 防止子类开启线程,有父类维护,使用时启动任务线程 */ @Override public synchronized void start() { // 关闭入口 throw new UnsupportedOperationException("自动启动线程,无需手动开启"); } /** * 子类处理请求 * @param request * return boolean 是否执行下一个处理器 */ protected abstract boolean doRequest(Request request); /** * 中止处理器 */ protected void stopProcesser(){ isStop = true; } /** * 获取任务队列 */ protected LinkedBlockingQueue<Request> getTaskQueue() { return queue; } /** * 获取下一个处理器 * @return */ protected ITaskProcesser getNextProcesser() { return nextProcesser; } /** * 添加请求到任务队列 * @param request */ private void addRequestToQueue(Request request){ queue.add(request); } /** * 线程线程名称 */ private void setThreadTaskName() { setName(String.format("%s_Thread_%s", getClass().getSimpleName(), getId())); } }
FirstThreadTaskProcesseride
/** * <p> * FirstThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public class FirstThreadTaskProcesser extends AbstractThreadTaskProcesser { public FirstThreadTaskProcesser(ITaskProcesser nextProcesser) { super(nextProcesser); } /** * 子类处理请求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println(Thread.currentThread().getName() + " 开始处理!"); return true; } }
PrintRequestThreadTaskProcesser性能
/** * <p> * PrintRequestThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 10:06 **/ public class PrintRequestThreadTaskProcesser extends AbstractThreadTaskProcesser { public PrintRequestThreadTaskProcesser(ITaskProcesser nextProcesser) { super(nextProcesser); } /** * 子类处理请求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println("request: " + request); return true; } }
BusinessThreadTaskProcesser测试
/** * <p> * BusinessThreadTaskProcesser * 针对耗时比较久的逻辑,能够用线程池多线程来优化性能 * </p> * * @author: kancy * @date: 2019/10/21 12:28 **/ public class BusinessThreadTaskProcesser implements ITaskProcesser { private final ExecutorService executorService = Executors.newFixedThreadPool(10); private final ITaskProcesser nextProcesser; public BusinessThreadTaskProcesser(ITaskProcesser nextProcesser) { this.nextProcesser = nextProcesser; } @Override public void process(Request request) { BusinessRequestHandler handler = new BusinessRequestHandler(request); executorService.submit(handler); // callNextProcesser(request); } class BusinessRequestHandler implements Runnable{ private Request request; public BusinessRequestHandler(Request request) { this.request = request; } @Override public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " 处理成功!"); callNextProcesser(request); } catch (InterruptedException e) { e.printStackTrace(); } } } private void callNextProcesser(Request request) { if (nextProcesser != null){ nextProcesser.process(request); } } }
LastThreadTaskProcesser优化
/** * <p> * FirstThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public class LastThreadTaskProcesser extends AbstractThreadTaskProcesser { /** * 子类处理请求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println(Thread.currentThread().getName() + " 处理完成!"); return true; } }
import java.util.concurrent.TimeUnit; /** * <p> * ChainTests * </p> * * @author: kancy * @date: 2019/10/21 9:39 **/ public class ChainTests { public static void main(String[] args) { ITaskProcesser lastThreadTaskProcesser = new LastThreadTaskProcesser(); BusinessThreadTaskProcesser businessThreadTaskProcesser = new BusinessThreadTaskProcesser(lastThreadTaskProcesser); ITaskProcesser printRequestThreadTaskProcesser = new PrintRequestThreadTaskProcesser(businessThreadTaskProcesser); ITaskProcesser firstThreadTaskProcesser = new FirstThreadTaskProcesser(printRequestThreadTaskProcesser); Thread r1 = new CreateRequestThread(firstThreadTaskProcesser); r1.start(); Thread r2 = new CreateRequestThread(firstThreadTaskProcesser); r2.start(); Thread r3 = new CreateRequestThread(firstThreadTaskProcesser); r3.start(); } static class CreateRequestThread extends Thread { ITaskProcesser iTaskProcesser; public CreateRequestThread(ITaskProcesser iTaskProcesser) { this.iTaskProcesser = iTaskProcesser; } @Override public void run() { for (int i = 0; i < 10; i++) { try { iTaskProcesser.process(new Request()); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
注:本文只作抛砖引玉,在实际开发过程当中,能够自行扩展。this