前言:首先简单模拟一个场景,前端有一个输入框,有一个按钮,点击这个按钮能够实现搜索输入框中的相关的文本和图片(相似于百度、谷歌搜索).看似一个简单的功能,后端处理也不难,前端发起一个请求,后端接受到这个请求,获取前端输入的内容,而后用搜索服务查找相关的数据返回给前端。可是问题来了,可能不是一个用户在搜索,假若有一万个用户同时发起请求呢?后端如何处理?若是按照单机的 处理方式,很容易线程堵死,程序崩溃、数据库崩塌。本文来介绍一下如何经过线程池来处理前端的请求。html
本篇博客的目录前端
一:线程池的优势java
二:定义一个线程池web
三: 线程池实现类数据库
四:执行任务编程
五:总结后端
本篇博客技术总架构图:安全
一:线程池的好处服务器
1.1:好处多线程
1.1.1 线程池能够异步的执行任务,当任务进来的时候线程池首先会判断当前是否有存活可用的线程,若是有的话,线程会执行这个任务。可是任务此时能够马上返回,并不必定必须等待任务执行完毕才会返回。假如是同步阻塞的话,当一个线程遇到Exception的时候,假如这个线程没有获得处理,那么就会形成线程堵塞,资源囤积,最终的结果只能是cpu资源耗尽,全部的任务没法处理。以前咱们的线上就出现了不少dubbo服务访问超时问题,最后发现就是cpu资源耗尽,报了一个unable to create new Thread,这样就没法处理任务(最后咱们进行了物理扩容而且合理限定了线程池的最大线程数量才解决这个问题)
1.1.2:线程池能够集中管理线程,能够控制线程的运行周期,这里包括动态添加线程或者移除线程。有一个很重要的点是这样的:线程的上下文切换是很是消耗性能的;假如来了一个任务,线程执行一次,而后马上销毁;再来一个任务,再建立一个任务,用完再销毁这个线程。那么为何不能对这个线程进行复用呢?
1.1.3:线程池的优点只有在高请求量才会体现出来,若是请求量比较好,须要处理的任务不多,那么使用线程池的做用并不明显。可是并非线程数量越多越好,具体的数量须要评估每一个任务的处理时间以及当前计算机的处理能力和数量,这个是有具体的数据体现的,咱们来看一下实际数据比较:
二:定义一个线程池
2.1:首先咱们来定义一个线程池的接口,其中包含线程池开始任务、关闭线程池,增长线程、减小线程,线程池的使命就是管理线程的生命周期,包括加、减小和删除线程,还有让线程开始执行任务,自身的关闭和开启!
public interface ThreadPool<Job extends Runnable> { /** * 线程池开始 */ void execute(Job job); /** * 关闭线程池 */ void shutDown(); /** * 添加线程 * @param num */ void addWorkers(int num); /** * 减小线程 * @param num */ void removeWorker(int num); /** * 获取正在等待的线程数量 * @return */ int getJobSize(); }
三:线程池实现类
解释: 线程池的实现类,首先就是定义线程池的默认数量,为2*cpu核心数+1,这是比较合理的计算公式。最小数量定义为1,最大数量定义为10。还用一个LinkedList来做为工做线程的集合容器。这里为何要用linkedList而不是ArrayList呢?由于linkedList是一个双向链表,双向链表能够实现先进先出或者后进先出等集合。而后咱们定义了worker来封装具体执行任务的线程,用Job来封装要执行的任务。而后在构造方法里用initWorkers方法来初始化线程池,建立指定的默认数量的线程,指定名称(用AtomicLong:原子线程安全的)并添加到管理线程的集合workers中(这个list通过synchronizedList修饰它已经成为了一个同步的集合,所作的操做都是线程安全的)。在execute中,首先获取须要指定的任务(Job),为了保证线程安全,会锁住全部的任务集合(放心synchronized这个关键字的做用,它通过jdk1.7已经优化过了,性能消耗有质的提高)。这里为何要锁住jobs这个集合呢,答案是:为了防止在多线程环境下,有多个job同时添到这个jobs里面,任务要一个个的执行,防止没法执行任务。接着再用addLast方法将任务添加到链表的最后一个,这里就是一个先进先出的队列(先进入的线程会优先被执行)再调用jobs的notify方法唤醒其余job。而在下面的添加线程或者移除线程的方法,都必需要锁住整个工做队列,这里为了防止,执行的时候忽然发现job不见了,或者添加的时候取不到最新的job等多线程下的安全问题,而且在worker线程中增长了一个running字段,用于控制线程的运行或者中止(run方法是否执行的控制条件)
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { private static final int MAX_WORKER_NUMBERS = 10; private static final int DEFAULT_WORKERS_NUMBERS = 2 * (Runtime.getRuntime().availableProcessors()) + 1; private static final int MIN_WORDER_NUMBERS = 1; private final LinkedList<Job> jobs = new LinkedList<Job>(); //管理工做线程的集合 private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); private int workerNum = DEFAULT_WORKERS_NUMBERS; private AtomicLong threadNum = new AtomicLong(); /** * 线程开始运行 */ public DefaultThreadPool() { initWorkers(DEFAULT_WORKERS_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORDER_NUMBERS ? MIN_WORDER_NUMBERS : num; } /** * 初始化线程 * * @param defaultWorkersNumbers */ private void initWorkers(int defaultWorkersNumbers) { for (int i = 0; i < defaultWorkersNumbers; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-worker" + threadNum.incrementAndGet()); thread.start(); } } /** * 执行任务 * * @param job */ @Override public void execute(Job job) { if (job != null) { synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } /** * 关闭线程 */ @Override public void shutDown() { for (Worker worker : workers) { if (worker != null) { worker.shutDown(); } } } /** * 添加线程 * * @param num */ @Override public void addWorkers(int num) { synchronized (jobs) { if (num + this.workerNum > MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum; } initWorkers(num); this.workerNum += num; } } /** * 移除线程 * * @param num */ @Override public void removeWorker(int num) { synchronized (jobs) { if (num > workerNum) { throw new IllegalArgumentException("much workNum"); } int count = 0; while (count < num) { Worker worker = workers.get(count); if (workers.remove(worker)) { worker.shutDown(); count++; } } this.workerNum -= count; } } /** * 获取工做线程的数量 * * @return */ @Override public int getJobSize() { return jobs.size(); } /** * 工做线程 */ public class Worker implements Runnable { private volatile boolean running = false; @Override public void run() { while (running) { Job job = null; synchronized (jobs) { while (jobs.isEmpty()) { try { jobs.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if (job != null) { try { job.run(); } catch (Exception ex) { ex.printStackTrace(); } } } } public void shutDown() { this.running = false; } } }
四:简易的web http处理线程池
4.1:定义一个类,叫作SimlpeHttpHandler,其中维护着一个叫作HttpRequestHandler的job,这个job的做用就是经过socket监听固定的端口(8080),而后经过流读取web目录中的文件,根据不一样的文件格式封装打印返回
public class SimleHttpHandler { static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<HttpRequestHandler>(1); public String basePath; private ServerSocket serverSocket; @Resource private HttpRequestHandler httpRequstHandler; int port = 8080; /** * 设置端口 * * @param port */ public void setPort(int port) { if (port > 0) { this.port = port; } } /** * 设置基本路径 * * @param basePath */ public void setBasePath(String basePath) { if (basePath != null) { boolean exist = new File(basePath).exists(); boolean directory = new File(basePath).isDirectory(); if (exist && directory) { this.basePath = basePath; } } } /** * 开始线程 * * @throws Exception */ public void start() throws Exception { serverSocket = new ServerSocket(port); Socket socket = null; while ((socket = serverSocket.accept()) != null) { try { threadPool.execute(new HttpRequestHandler(socket, basePath)); } catch (Exception ex) { ex.printStackTrace(); } finally { serverSocket.close(); } } } }
4.2:定义一个类叫作HttpRequestHandler实现Runnable接口,而后构造进入socket和路径,在run方法中调用具体的处理方法:我将具体的业务封装到
ServerRequestManager中,而后调用它的dealRequest方法进行具体的业务处理:
@Component public class HttpRequestHandler implements Runnable { private Socket socket; private String basePath; @Resource private ServerRequestManager serverRequestManager; public HttpRequestHandler(Socket socket, String basePath) { this.basePath = basePath; this.socket = socket; } @Override public void run() { serverRequestManager.dealRequest(basePath); } }
4.3:服务器的具体处理逻辑,这里就是根据当前的路径用流读取路径中的文件,一旦检测到文件的后缀是.jpg或者ico,就将其输出为http的内容类型为img类型的图片,不然输出为text类型。最后用colse方法来关闭流
*/ @Component public class ServerRequestManager { private Socket socket; public static final String httpOK = "HTTP/1.1 200 ok"; public static final String molly = "Server:Molly"; public static final String contentType = "Content-Type:"; /** * 处理请求 * * @param basePath */ public void dealRequest(String basePath) { String content = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); if (filePath.endsWith("jpg") || filePath.endsWith("ico")) { in = new FileInputStream(filePath); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); int index = 0; while ((index = in.read()) != -1) { byteArrayOutputStream.write(index); } byte[] array = byteArrayOutputStream.toByteArray(); out.print(httpOK); out.print(molly); out.println(contentType + " image/jpeg"); out.println("Content-Length" + array.length); out.print(""); socket.getOutputStream().write(array, 0, array.length); } else { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.print(httpOK); out.print(molly); out.print(contentType + "text/html; Charset =UTF-8"); out.print(""); while ((content = br.readLine()) != null) { out.print(content); } } out.flush(); } catch (final Exception ex) { ex.printStackTrace(); out.println("HTTP/1.1 500"); out.println(""); out.flush(); } finally { close(br, in, out, socket); } } /** * 关闭流 * * @param closeables */ public static void close(Closeable... closeables) { if (closeables != null) { for (Closeable closeable : closeables) { try { closeable.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
五:总结
本篇博客总结了如何开发一个简单的线程池,固然功能不够齐全,比不上jdk的线程池,没有阻塞队列和超时时间和拒绝策略等;而后会用socket监听8080端口,获取web根目录读取目录下的文件,而后输出对应的格式内容。实现的功能很简单,没有什么复杂的,不过我觉的这篇这篇博客能让我学习的地方就是线程池的使用方法,在处理高并发的请求时,线程池技术基本是必不可少的。
参考资料《java并发编程的艺术》
*假如你想学习java,或者看本篇博客有任务问题,能够添加java群:618626589