小伙伴们,咱们认识一下。java
俗世游子:专一技术研究的程序猿面试
最近在作新项目的数据库设计,目前为止一共出了80张表,预计只作了一半,心好累o(╥﹏╥)o数据库
前一节咱们聊过了多线程的基础问题,可是还漏掉一个知识点:缓存
这里咱们补上安全
咱们老是在说多线程操做,会出现线程不安全的问题,那么该怎么解释这个线程安全
呢?服务器
通俗的来说,当多个线程操做同一份共享数据的时候,数据的一致性被破坏,这就是线程不安全的。数据结构
举个例子:多线程
循环的数值调大才能看出效果,本人试了好久并发
public class ThreadSafe { public static void main(String[] args) throws InterruptedException { ShareObj shareObj = new ShareObj(); new Thread(() -> { for (int i = 0; i < 20_0000; i++) { shareObj.num += 1; } System.out.println(shareObj.num); }, "线程A").start(); new Thread(() -> { for (int i = 0; i < 20_0000; i++) { shareObj.num += 1; } System.out.println(shareObj.num); }, "线程B").start(); } } class ShareObj { int num = 0; }
两个线程同时操做共享变量的话,就会出现数据不一致的问题:框架
那怎么解决这个问题呢,其实也就是加锁:synchronized
,可是咱们要注意加锁的资源
synchronized
是对共享变量进行加锁,只有线程抢占到锁以后,该线程才能继续操做,操做完成以后释放锁资源那么,上面的小例子咱们就能够进行调整:
public class ThreadSafe { public static void main(String[] args) throws InterruptedException { ShareObj shareObj = new ShareObj(); new Thread(() -> { synchronized(shareObj) { for (int i = 0; i < 20_0000; i++) { shareObj.num += 1; } System.out.println(shareObj.num); } }, "线程A").start(); new Thread(() -> { synchronized(shareObj) { for (int i = 0; i < 20_0000; i++) { shareObj.num += 1; } System.out.println(shareObj.num); } }, "线程B").start(); } } class ShareObj { int num = 0; }
这样就解决了问题,达到了咱们预想的结果
那么咱们来聊一聊synchronized
:
同步锁,监视共享资源或共享对象(同步监视器),须要的是Object的子类。能够经过同步代码块
或者同步方法
的方法来加锁
同步方法也就是将业务逻辑抽离成一个普通方法,使用
synchronized
进行修饰,是同样的效果
public synchronized void update() { // 业务逻辑 }
这种状况下执行效率可见通常
也就是说:
除了这个问题以外,在线程中中还会出现很是严重的问题:死锁
死锁,通常状况下表示互相等待,是程序运行是出现的一种状态,简单一点理解:
就是说两个线程,各自须要对方的资源,可是本身又不释放本身的资源,就形成了死锁现象
死锁没办法解决,只能在编写代码的过程时刻注意
很是经典的一个案例,不知道大家在面试的时候有没有被它支配过
前提条件:
关键点:
定义的产品类
public class Goods { // 品牌 public String brand; // 名称 public String name; public Goods() { } public Goods(String brand, String name) { this.brand = brand; this.name = name; } }
生产者
public class Producer implements Runnable { private Goods goods; public Producer(Goods goods) { this.goods = goods; } @Override public void run() { for (int i = 0; i < 10; i++) { if (i % 2 == 0) { goods.brand = "农夫山泉"; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } goods.name = "矿泉水"; } else { goods.brand = "旺仔"; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } goods.name = "小馒头"; } System.out.println(String.format("生产者生产了:%s---%s", goods.brand, goods.name)); } } }
消费者
public class Consumer implements Runnable { private Goods goods; public Consumer(Goods goods) { this.goods = goods; } @Override public void run() { System.out.println("消费者开始消费"); for (int i = 0; i < 10; i++) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("消费者消费产品:%s--%s", goods.brand, goods.name)); } } }
测试方法
public class Main { public static void main(String[] args) { Goods goods = new Goods(); new Thread(new Producer(goods)).start(); new Thread(new Consumer(goods)).start(); } }
这里出现两个问题,看一下结果
- 出现先消费后生产的问题
- 出现品牌和名称不一致的问题
下面咱们来第二版解决
在产品类中定义生产和消费的方法
public class Goods { // 品牌 public String brand; // 名称 public String name; // 标志位 public boolean flag; public Goods() { } public Goods(String brand, String name) { this.brand = brand; this.name = name; } public synchronized void set(String brand, String name) { /** * 若是生产者抢占到CPU资源,那么先判断当前有没有产品,若是有产品,那么就进入等待状态,等待消费者消费完以后再次生产 */ if (flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.brand = brand; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } this.name = name; flag = true; // 唤醒消费者消费 notify(); } public synchronized void get() { /** * 若是flag=false,说明生产者没有生产商品,那么消费者进入等待状态,等待生产者生产产品以后,而后再次消费 */ if (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("消费者消费产品:%s--%s", this.brand, this.name)); flag = false; // 唤醒生产者进行生产 notify(); } }
生产者
public class Producer implements Runnable { private Goods goods; public Producer(Goods goods) { this.goods = goods; } @Override public void run() { for (int i = 0; i < 10; i++) { if (i % 2 == 0) { goods.set("农夫山泉", "矿泉水"); } else { goods.set("旺仔", "小馒头"); } System.out.println(String.format("生产者生产了:%s---%s", goods.brand, goods.name)); } } }
消费者
public class Consumer implements Runnable { private Goods goods; public Consumer(Goods goods) { this.goods = goods; } @Override public void run() { for (int i = 0; i < 10; i++) { goods.get(); } } }
其余不变
这样就解决了上面的两个问题,
其实还有一种解决方式是采用BlockingQueue
(队列)来解决,等待和唤醒的操做就不用咱们来进行,BlockingQueue
会帮咱们来完成
就是提一下,不懂的等以后学过了队列,就清楚了
BlockingQueue
的版本,该类位于java.util.concurrent
包下(JUC),后续咱们详细聊
public class Main { public static void main(String[] args) { BlockingQueue<Goods> queue = new ArrayBlockingQueue<>(5); new Thread(new Producer(queue)).start(); new Thread(new Consumer(queue)).start(); } } class Producer implements Runnable { private BlockingQueue<Goods> queue; public Producer(BlockingQueue<Goods> queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 10; i++) { if (i % 2 == 0) { try { queue.put(new Goods("农夫山泉", "矿泉水")); } catch (InterruptedException e) { e.printStackTrace(); } } else { try { queue.put(new Goods("旺仔", "小馒头")); } catch (InterruptedException e) { e.printStackTrace(); } } } } } class Consumer implements Runnable { private BlockingQueue<Goods> queue; public Consumer(BlockingQueue<Goods> queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Goods take = queue.take(); System.out.println(String.format("消费者消费产品:%s--%s", take.brand, take.name)); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Goods { // 品牌 public String brand; // 名称 public String name; public Goods() { } public Goods(String brand, String name) { this.brand = brand; this.name = name; System.out.println(String.format("生产者生产了:%s---%s", brand, name)); } }
在实际的使用中,线程是很是消耗系统资源的,并且若是对线程管理不善,很容易形成系统资源的浪费,
并且在实际开发中,会形成线程的不可控,好比:
所以咱们推荐在实际开发中采用线程池来进行开发,拥有如下优势:
了解到这一点以后,咱们来看一看其具体的实现方式,在Java中,建立线程池主要是经过ThreadPoolExecutor
来构造,下面咱们来具体了解一下
咱们看参数最多的构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
在了解这些参数以前,咱们先来聊一个知识点,就是线程池的工做原理,否则下面聊着有点生硬。画个图:
简单用文字描述一下就是这样的过程:
这是总体处理的一个过程,下面咱们去实际源码中看看:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
在
execute()
的注释中也有至关详细的说明
好了,下面看详细的参数,这里很是重要
corePoolSize
表示核心线程数,maximumPoolSize
表示线程池中容许存在的最大线程数。
那么
keepAliveTime
:当线程数大于核心时,多余的空闲线程在终止以前等待新任务的最长时间
unit
表示空闲线程存活时间的表示单位
简单一点理解:
阻塞队列或者说是等待队列,用于存放等待执行的任务。
队列在这里先了解一下,等到后面聊 数据结构 的时候再详细介绍
数据结构很重要,这里简单聊一下
队列通常会和栈一块儿作对比,二者都是动态集合。栈中删除的元素都是最近插入的元素,遵循的是后进先出的策略(LIFO);而队列删除的都是在集合中存在时间最长的元素,遵循的是先进先出的策略(FIFO)。
这里我罗列出队列的类和说明,你们查看一下
建立新线程时须要用到的工厂,该参数,若是没有另外指定,则默认使用Executors.defaultThreadFactory()
,该工厂建立的线程所有位于同一ThreadGroup
而且具备相同的NORM_PRIORITY
优先级和非守护程序状态。 经过提供其余ThreadFactory
,能够更改线程的名称,线程组,优先级,守护程序状态等。
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
Java还为咱们提供了一种工厂方式:`Executors.privilegedThreadFactory()
。返回用于建立具备与当前线程相同权限的新线程的线程工厂
/** * Thread factory capturing access control context and class loader */ static class PrivilegedThreadFactory extends DefaultThreadFactory { private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedThreadFactory() { super(); SecurityManager sm = System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Fail fast sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public Thread newThread(final Runnable r) { return super.newThread(new Runnable() { public void run() { AccessController.doPrivileged(new PrivilegedAction<Void>() { public Void run() { Thread.currentThread().setContextClassLoader(ccl); r.run(); return null; } }, acc); } }); } }
还有一点,若是咱们想自定义线程工厂的话,那么咱们能够参考上面两种的写法
饱和策略,也能够称为拒绝策略。也就是当线程池中线程数都占满了没法再继续添加执行任务,最后就会交给饱和策略来处理
在线程池中饱和策略分为四种:
这是Java提供的默认策略,也就是说当前策略会丢弃任务并抛出RejectedExecutionException
异常
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
当前策略是经过调用线程处理该任务,只要线程池不关闭,那么就会执行该任务
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
什么都不作,直接将任务丢弃
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
也就是说,若是线程池没有关闭,那么将阻塞队列中的头任务丢弃,而后再经过execute()
从新执行当前任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
在Java中,提供三种类型的线程池,下面咱们一一来聊一聊
线程池执行器,为咱们提供了如下几种:
建立一个可根据须要建立新线程的线程池,可是在之前构造的线程可用时将重用它们,并在须要时使用提供的 ThreadFactory, 可用于业务逻辑处理时间短的操做
该方法是无参或者参数为ThreadFactory
,其构建参数以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
拥有如下特性:
Integer
最大值写个小案例
private static void newCachePoolExecutor() { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 20; i++) { newCachedThreadPool.execute(() -> System.out.println("anc")); } newCachedThreadPool.shutdown(); }
建立一个可重用固定线程数的线程池,已***队列方式来运行这些线程
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
拥有如下特性:
maximumPoolSize
无效写个小案例:
private static void newFixPoolExecutor() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(12); for (int i = 0; i < 20; i++) { newFixedThreadPool.execute(() -> System.out.println("anc")); } newFixedThreadPool.shutdown(); }
建立一个使用单个worker线程的Executor,已***队列方式来运行该线程。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
拥有如下特性:
写个小案例:
private static void newSingleThreadPoolExecutor() { ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { newSingleThreadExecutor.execute(() -> System.out.println("anc")); } newSingleThreadExecutor.shutdown(); }
这是一种可调度的执行器,也就是说能够执行定时任务,常常有面试会被问到
除了使用定时任务框架和Timer以外,还有什么技术能够实现定时任务?
其中一种就是采用该线程池技术
该方法建立了一个单线程池的可调度执行器,
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } // ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor public ScheduledThreadPoolExecutor(int corePoolSize) { // 这里调用父类的构造方法 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
拥有以下特性:
写个小案例:
private static void newSingleScheduledPoolExecutor() { ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); // 延迟1s执行,每一个1s执行一次 newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> System.out.println("kk"), 1L, 1L, TimeUnit.SECONDS); // 延迟1s执行 newSingleThreadScheduledExecutor.schedule(() -> System.out.println("kk"), 1L, TimeUnit.SECONDS); }
建立一个线程池,可安排在给定延迟后运行命令或者按期执行,和上面线程池同样,最终调用的是同一个类,可是不一样点在于:
该线程池能够指定核心线程数
写个小案例:
private static void newScheduledThreadPool() { ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(6); newScheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("kk"), 1L, 1L, TimeUnit.SECONDS); newScheduledThreadPool.schedule(() -> System.out.println("aa"), 1L, TimeUnit.SECONDS); }
该线程池是JDK1.7以后添加进来的,采用了分而治之
的思想,在大数据
中不少地方都用到了这种思想。
建立一个带并行级别的线程池,并行级别决定了同一个时刻作多有多少线程在执行,如不传并行级别参数,将默认为当前系统的CPU个数
我直接给个案例吧,你们看看,毕竟这种方式本人在实际的开发中基本没有用过
这是计算总和的例子
public class SumTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 20; private int[] arry; private int start; private int end; public SumTask(int[] arry, int start, int end) { this.arry = arry; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { sum += arry[i]; } return sum; } else { int middle = (start + end) / 2; SumTask left = new SumTask(arry, start, middle); SumTask right = new SumTask(arry, middle, end); // left.fork(); // right.fork(); invokeAll(left, right); return left.join() + right.join(); } } } int[] arry = new int[100]; for (int i = 0; i < 100; i++) { arry[i] = new Random().nextInt(20); } // 实际调用 SumTask sumTask = new SumTask(arry, 0, arry.length); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); System.out.println("多线程执行结果:" + forkJoinPool.submit(sumTask).get());
实现类:
经过泛型能够指定其执行的返回结果
无返回值
线程池生命周期只有两种:
线程池在RUNNING
状态下,可以接收新的任务,而且也可以处理阻塞队列中的任务
线程池正式进入到已终止的状态
在这两种状态中间,还包含三种过分状态:
当线程池调用shutdown()
方法的时候,会进入到SHUTDOWN
状态,该状态下,线程池再也不接收新的任务,可是阻塞队列中的任务却能够继续执行
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // SHUTDOWN状态 interruptIdleWorkers(); // 中断可能正在等待任务的线程 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 若是(SHUTDOWN状态和线程池和队列为空)或(STOP和线程池为空),则转换为TERMINATED状态 }
当线程池调用shutdownNow()
方法的时候,会进入到STOP
状态,该状态下,线程池再也不接收新的任务,也不会执行阻塞队列中的任务,同时还会中断如今执行的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // STOP状态 interruptWorkers(); // 中断全部线程,即便处于活动状态也是如此 tasks = drainQueue(); // 将没有执行的任务从队列中remove(),并添加到List中 } finally { mainLock.unlock(); } tryTerminate(); // 若是(SHUTDOWN状态和线程池和队列为空)或(STOP和线程池为空),则转换为TERMINATED状态 return tasks; } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
上面也就是
shutdown()
和shutdownNow()
的区别,更多的是推荐使用shutdown()
当线程池中全部任务都已终止,而且 工做线程 为0,那么线程池就会调用terminated()
方法进入到TERMINATED状态
用图来表示:
多线程的基础知识就聊到这里,欢迎你们在评论区积极互动,提出本身的看法