面试过程当中,各面试官通常都会教科书式的问你几个多线程的问题,但又不知从何问起。因而就来一句,你了解多线程吗?拜托,这个好伤自尊的!java
相信老司机们对于java的多线程问题处理,稳如老狗了。你问我了解不?都懒得理你。面试
不过,既然是面对的是面试官,那你还得一一说来。redis
今天咱们就从多个角度来领略下多线程技术吧!算法
其实有的语言是没有多线程的概念的,而java则是从一出生便有了多线程天赋。为何?
多线程技术通常又被叫作并发编程,目的是为了程序运行得更快。
其基本原理是,是由cpu进行不一样线程的调度,从而实现多个线程的同时运行效果。
多进程和多线程相似,只是多进程不会共享内存资源,切换开销更大,因此多线程是更明智的选择。
而在计算机出现早期,或者也许你也能找到单核的cpu,这时候的多线程是经过不停地切换惟一一个能够运行的线程来实现的,因为切换速度比较快,因此感受就是多线程同时在运行了。在这种状况下,多线程与多进程等同的。可是,至少也让用户有了能够同时处理多任务的能力了,也是颇有用的。
而当下的多核cpu时代,则是真正能够同时运行多个线程的时代,什么四核八线程,八核八线程.... 意味着能够同时并行n个线程。若是咱们能让全部可用的线程都利用起来,那么咱们的程序运行速度或者说总体性能将会获得极大提高。这是咱们技术人员的目标。spring
看起来,多线程确实挺好,可是凡事皆有度。过尤不及。编程
若是只运行与cpu能力范围内的n线程,那是绝对ok的。但当你线程数超过这个n时,就会涉及到cpu的调度问题,调度时即会涉及一个上下文切换问题,这是要耗费时间和资源的东西。当cpu疲于奔命调度切换时,则多线程就是一个负担了。api
多线程要注意的问题多了去了,毕竟这是一门不简单的学问,可是咱们也能够总结下:tomcat
1. 线程安全性问题;若是连正确性都没法保障,谈性能有何意义?
2. 资源隔离问题;是你就是你的,不是你的就不是你的。
3. 可读性问题;若是为了多线程,将代码搞得一团糟,是否值得?
4. 外部环境问题;若是外部环境很糟糕,那么你内部性能再好,你能把压力给外部吗?
安全
这个问题确实有点low, 不过也是一个体现真实实践的地方!服务器
1. 继承Thread类,而后 new MyThread.start();
2. 继承Runnable类, 而后 new Thread(runnable).start();
3. 继承Callable类,而后使用 ExecutorService.submit(callable);
4. 使用线程池技术,直接建立n个线程,将上面的方法再来一遍,new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 简化版: Executors.newFixedThreadPool(n).submit(runnable);
理论始终太枯燥,不如来点实际的。
有同窗说,我平时就写写业务代码,而业务代码基本由用户触发,一条线程走到底,哪来的多线程实践?
好,咱们能够就这个问题来讲下,这种业务的多线程:
1. 好比一个http请求,对应一个响应,若是不使用多线程,会怎么样?咱们能够简单地写一个socket服务器,进行处理业务,可是这绝对不是你想看到的。好比咱们经常使用的 spring+tomcat, 哪里没有用到多线程技术?
http-nio-8080-exec-xxx #就是一个线程池中例子。
2. 任何一个java应用,启动起来以后,都会有不少的GC线程运行,这难道不是多线程?如:
"G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007fb91008f000 nid=0x40e7 runnabl "Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007fb910061800 nid=0x40de runnable
如上这些多线程场景吧,面试官说,就算你了解其原理,那也不算是你的。你有真正使用过多线程吗?
接下来,咱们就来讲道说道,实际业务场景中,有哪些是咱们可能会用上的,供你们参考:
看下多线程中几个有趣或者经典的场景用法!
场景1. 我有一个发邮件的功能,用户操做成功后,我给他发送邮件,如何高效稳定地完成?
场景2. 我有m个线程在循环执行主方法,为实现高效处理,将分离n*m个子线程执行相关联流程,要求子线程必须等到主线程执行完成后才能执行,如何保证?
场景3. 某合做公司要求请求其api的qps不得大于n,如何保证?
场景4. 一个大任务如何提升响应速度?
场景5. 我有n个线程同时开始处理一个事务,要求至少等到一个线程执行完毕后,才能进行响应返回,如何高效处理?
场景6. 抽象任务,后台运行处理任务多线程?
你们应该已经见过世面了,这点问题还不至于,对吧。那你能够拿出你的方案了。
下面是个人解决方案:
场景1. 我有一个发邮件的功能,用户操做成功后,我给他发送邮件,如何高效稳定地完成?
场景1解决:(常规型)
这个能够说最实用最简单的多线程应用场景了,不过如今进行微服务化以后,可能会有一些不一样。换汤不换药。
针对C端用户的多线程,咱们是不建议使用 new Thread() 这种方式的,线程池是个经常使用伎俩。
ExecutorService mailExecutors = Executors.newFixedThreadPool(20); public void sendMail() { mailExecutors.submit(() -> { // do send mail biz, http, rpc,... System.out.println("sending mail"); }); }
场景2. 我有m个线程在循环执行主方法,为实现高效处理,将分离n*m个子线程执行相关联流程,要求子线程必须等到主线程执行完成后才能执行,如何保证?
场景2解决:(全部等待型)
主任务,只管调度子线程,在子线程使用闭锁在适当的地方进行等待,主线程循环分配完成后,打开闭锁,放行全部子线程便可。
具体代码以下:
private void mainWork() { try { resetRedisZsetLockGate(); for (String linkTraceCacheKey : expiredKeys) { subWork(linkTraceCacheKey); } } finally { releaseRedisZsetLock(); } } private void subWork(String linkTraceCacheKey) { deleteService.execute(new Runnable() { @Override public void run() { // do other biz blockingWaitRedisZsetLock(); postSth(linkTraceCacheKey); } }); } /** * 重置锁网关,每次主方法的调度都将获得一个私有的锁 */ private void resetRedisZsetLockGate() { redisZsetScanLockGate = new CountDownLatch(1); } /** * 阻塞等待 锁 */ private void blockingWaitRedisZsetLock() { final CountDownLatch myGate = redisZsetScanLockGate; try { myGate.await(); } catch (InterruptedException e) { logger.error("等待锁中断异常", e); Thread.currentThread().interrupt(); } } /** * 释放锁 */ private void releaseRedisZsetLock() { final CountDownLatch myGate = redisZsetScanLockGate; myGate.countDown(); }
场景3. 某合做公司要求请求其api的qps不得大于n,如何保证?
场景3解决:(流量控制型、有限资源型)
这种问题准确的说,使用单机的多线程仍是有点难控制的,可是咱们只是为了讲清道理,具体(集群)作法只要稍作变通便可。
简单点说,就是做用一个 Semphore 信号量进行数量控制,当数量未到时,直接多线程并发请求,到达限制后,则等待有空闲位置再进行!
public class AbstractConcurrentSimpleLiteJobBase { /** * 并发查询:5 , 动态配置化 */ private final Semaphore maxConcurrentQueryLock; /** * 同步等待结束锁,视状况使用,同一个线程可能提交屡次任务,由同一个 holder 管理 */ private final ThreadLocal<List<Future<?>>> endGateTaskFutureContainer = new ThreadLocal<>(); @Resource private ThreadPoolTaskExecutor threadPoolTaskExecutor; public AbstractConcurrentSimpleLiteJobBase() { maxConcurrentQueryLock = new Semaphore(getMaxConcurrentThreadNum()); } /** * 获取最大容许的并发数,子类可自定义, 默认:5 * * @return 最大并发数 */ protected int getMaxConcurrentThreadNum() { return 5; } /** * 提交一个任务到线程池执行 * * @param task 任务 */ protected void submitTask(Runnable task) { // 考虑是否要阻塞等待结果 Future<?> future1 = threadPoolTaskExecutor.submit(() -> { try { maxConcurrentQueryLock.acquire(); } catch (InterruptedException ie) { // ignore... log.error("【任务运行】异常,中断", ie); Thread.currentThread().interrupt(); return; } try { task.run(); } finally { maxConcurrentQueryLock.release(); } }); endGateCountDown(future1); } /** * 等待线程结果完成,并清理 gate 信息 */ private void awaitForComplete() { try { // 同步等待执行完成,防止并发任务执行 for(Future<?> future1 : endGateTaskFutureContainer.get()) { future1.get(); } endGateTaskFutureContainer.remove(); } catch (ExecutionException e) { log.error("【任务执行】异常,抛出异常", e); } catch (InterruptedException e) { log.error("【任务执行】异常,中断", e); } } }
场景4. 一个大任务如何提升响应速度?
场景4解决:(大任务拆分型)
针对大任务的处理,基本想到的都是相似于分布式计算之类的东西(map/reduce),在java单机操做来讲,标准的解决方案是 Fork/Join 框架。
public class MyForkJoinTask extends RecursiveTask<Integer> { //原始数据 private List<Integer> records; public MyForkJoinTask(List<Integer> records) { this.records = records; } @Override protected Integer compute() { //任务拆分到可接受程度后,运行处理逻辑 if (records.size() < 3) { return doRealCompute(); } // 不然一直往下拆分任务 int size = records.size(); MyForkJoinTask aTask = new MyForkJoinTask(records.subList(0, size / 2)); MyForkJoinTask bTask = new MyForkJoinTask(records.subList(size / 2, records.size())); //两个任务并发执行 invokeAll(aTask, bTask); //结果合并 return aTask.join() + bTask.join(); } /** * 真正任务处理逻辑 */ private int doRealCompute() { try { Thread.sleep((long) (records.size() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("计算任务:" + Arrays.toString(records.toArray())); return records.size(); } // 测试任务 public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(5); List<Integer> originalData = new ArrayList<>(); originalData.add(1); originalData.add(2); originalData.add(3); originalData.add(4); originalData.add(5); originalData.add(6); originalData.add(7); originalData.add(8); originalData.add(9); originalData.add(10); originalData.add(11); originalData.add(12); originalData.add(13); MyForkJoinTask myForkJoinTask = new MyForkJoinTask(originalData); long t1 = System.currentTimeMillis(); ForkJoinTask<Integer> affectNums = forkJoinPool.submit(myForkJoinTask); System.out.println("affect nums: " + affectNums.get()); long t2 = System.currentTimeMillis(); System.out.println("cost time: " + (t2-t1)); } }
其实若是不用Fork/join 框架,也是能够的,好比我就只开n个线依次从数据源处取数据进行处理,最后将结果合并到另外一个队列中。只是,这期间你得多付出多少努力才能作到 Fork/Join 相同的效果呢!
固然了,Fork/Join 的重要特性是: 使用了work-stealing算法。Worker线程跑完任务后,能够从其余还在忙着的线程去窃取任务。
你要愿意造轮子,也是能够的。
场景5. 我有n个线程同时开始处理一个事务,要求至少等到一个线程执行完毕后,才能进行响应返回,如何高效处理?
场景5解决:(至少一个返回型)
初步思路: 主任务中,使用一个闭锁,CountDownLatch(1); 全部子线程执行完成,调用 latch.countDown(); 开启一次闭锁。主任务执行完成后,调用 latch.await(); 阻塞等待,当有任意一个子线程打开闭锁后,就能够返回了。
可是这个是有问题的,即这个锁只会有一次生效机会,后续的完成动做并不会有实际意义,所以只能换一个方式。
使用回调实现,就容易多了,只要一个任务完成,就作一次回调,主任务若是分配完成后,发现有空闲的任务槽,就当即进行下一次分配便可,没有则等到有再进行分配工做。
具体代码以下:
public class TaskDispatcher { /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting assign */ private final Condition finishedTaskNotEmpty; /** * 正在运行的任务计数器 */ private final AtomicInteger runningTaskCounter = new AtomicInteger(0); /** * 新完成的任务计数器,当被从新分派后,此计数将会被置0 */ private Integer newFinishedTaskCounter = 0; private void consumLogHub(String shards) throws InterruptedException { resetConsumeCounter(); String[] shardList = shards.split(","); for (int i = 0; i < shardList.length; i++) { String shard = shardList[i]; int shardId = Integer.parseInt(shard); LogHubConsumer consuemr = getConsuemer(shardId); if(consuemr.startNewConsumeTask(this)) { runningTaskCounter.incrementAndGet(); } } cleanConsumer(Arrays.asList(shardList)); // 没有一个任务已完成,阻塞等待一个完成 if(runningTaskCounter.get() > 0) { if(newFinishedTaskCounter == 0) { waitAtLeastOnceTaskFinish(); } } } /** * 重置消费者计数器 */ private void resetConsumeCounter() { newFinishedTaskCounter = 0; } /** * 阻塞等待至少一个任务执行完成 * * @throws InterruptedException 中断 */ private void waitAtLeastOnceTaskFinish() throws InterruptedException { lock.lockInterruptibly(); try { while (newFinishedTaskCounter == 0) { finishedTaskNotEmpty.await(); } } finally { lock.unlock(); } } /** * 通知任务完成(回调) * * @throws InterruptedException 中断 */ private void notifyTaskFinished() throws InterruptedException { lock.lockInterruptibly(); try { runningTaskCounter.decrementAndGet(); // 此处计数不可能小于0 newFinishedTaskCounter += 1; finishedTaskNotEmpty.signal(); } finally { lock.unlock(); } } /** * 通知任务完成(回调) * * @throws InterruptedException 中断 */ public void taskFinishCallback() throws InterruptedException { notifyTaskFinished(); } } public class ConsumerWorker { private Future<?> future; @Resource private ExecutorService consumerService; /** * 当查询结果为时的等待延时, 每次查询结果都会为空时,加大该延时, 直到达到设定的最大值为准 */ private Long baseEmptyQueryDelayMills = 200L; private Long emptyQueryDelayMills = baseEmptyQueryDelayMills; /** * 调置最大延时为1秒 */ private static final Long maxEmptyQueryDelayMills = 1000L; /** * 记数 */ private void encounterEmptyQueryDelay() { if(emptyQueryDelayMills < maxEmptyQueryDelayMills) { emptyQueryDelayMills += 100L; } } private void resetEmptyQueryDelay() { emptyQueryDelayMills = baseEmptyQueryDelayMills; } // 开启一个消费者线程 public boolean startNewConsumeTask(LogHubClientWork callback) { if(future==null || future.isCancelled() || future.isDone()) { //没有任务或者任务已取消或已完成 提交任务 future = consumerService.submit(new Runnable() { @Override public void run() { try { Integer dealCount = doBizData(); if(dealCount == 0) { SleepUtil.millis(emptyQueryDelayMills); encounterEmptyQueryDelay(); } else { resetEmptyQueryDelay(); } } finally { try { callback.taskFinishCallback(); } catch (InterruptedException e) { logger.error("处理完成通知失败,中断", e); Thread.currentThread().interrupt(); } } } }); return true; } return false; } }
场景6. 抽象任务,后台运行处理任务多线程?
场景6解决:(业务相关类)
最简单也是最难的一种,根据具体业务类型作相应处理就好,主要考虑读写的安全性问题。
如上几个多线程的应用场景,是我在工做中切实用上的场景(所言非虚)。不过它们都有一个特色,即任务都是很独立的,即基本上不用太关心线程安全问题,这也是咱们编写多线程代码时尽可能要作的事。固然不少场景共享数据是必定的,这时候就更要注意线程安全了。
要作到线程安全也不是难事,好比足够好的封装,可让你把关注点锁定在很小的范围内。
固然,为了线程安全,咱们可能每每又会牺牲性能,这就看咱们如何把握这些度了!互斥锁是最容易使用的锁,可是也是性能最差的锁。分段锁可以解决锁性能问题,可是又会给编写带来更大的困难。
多线程,不止要会写,还要会给本身填坑。
唠叨: 去追天边的那束光!