答: ① sleep()方法给其余线程运行机会时不考虑线程的优先级,所以会给低优先级的线程以运行的机会;yield()方法只会给相同优先级或更高优先级的线程以运行的机会; ② 线程执行sleep()方法后转入阻塞(blocked)状态,而执行yield()方法后转入就绪(ready)状态; ③ sleep()方法声明抛出InterruptedException,而yield()方法没有声明任何异常; ④ sleep()方法比yield()方法(跟操做系统CPU调度相关)具备更好的可移植性。java
答:node
答:若是系统中存在临界资源(资源数量少于竞争资源的线程数量的资源),例如正在写的数据之后可能被另外一个线程读到,或者正在读的数据可能已经被另外一个线程写过了,那么这些数据就必须进行同步存取(数据库操做中的排他锁就是最好的例子)。当应用程序在对象上调用了一个须要花费很长时间来执行的方法,而且不但愿让程序等待方法的返回时,就应该使用异步编程,在不少状况下采用异步途径每每更有效率。事实上,所谓的同步就是指阻塞式操做,而异步就是非阻塞式操做。程序员
当run() 或者 call() 方法执行完的时候线程会自动结束,若是要手动结束一个线程,你能够用volatile 布尔变量来退出run()方法的循环或者是取消任务来中断线程。面试
使用自定义的标志位决定线程的执行状况算法
public class SafeStopThread implements Runnable{ private volatile boolean stop=false;//此变量必须加上volatile int a=0; @Override public void run() { // TODO Auto-generated method stub while(!stop){ synchronized ("") { a++; try { Thread.sleep(100); } catch (Exception e) { // TODO: handle exception } a--; String tn=Thread.currentThread().getName(); System.out.println(tn+":a="+a); } } //线程终止 public void terminate(){ stop=true; } public static void main(String[] args) { SafeStopThread t=new SafeStopThread(); Thread t1=new Thread(t); t1.start(); for(int i=0;i<5;i++){ new Thread(t).start(); } t.terminate(); } }
在语言层面有两种方式。java.lang.Thread 类的实例就是一个线程可是它须要调用java.lang.Runnable接口来执行,因为线程类自己就是调用的Runnable接口因此你能够继承java.lang.Thread 类或者直接调用Runnable接口来重写run()方法实现线程。数据库
Java不支持类的多重继承,但容许你调用多个接口。因此若是你要继承其余类,固然是调用Runnable接口好了。编程
Semaphore两个重要的方法就是semaphore.acquire() 请求一个信号量,这时候的信号量个数-1(一旦没有可以使用的信号量,也即信号量个数变为负数时,再次请求的时候就会阻塞,直到其余线程释放了信号量)semaphore.release()释放一个信号量,此时信号量个数+1数组
public class SemaphoreTest { private Semaphore mSemaphore = new Semaphore(5); public void run(){ for(int i=0; i< 100; i++){ new Thread(new Runnable() { @Override public void run() { test(); } }).start(); } } private void test(){ try { mSemaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 进来了"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 出去了"); mSemaphore.release(); } }
线程调度是指系统为线程分配处理器使用权的过程。 主要调度方式有两种,分别是协同式线程调度和抢占式线程调度。缓存
协同式线程调度:线程的执行时间由线程自己控制,当线程把本身的工做执行完了以后,主动通知系统切换到另外一个线程上。安全
抢占式线程调度:每一个线程由系统分配执行时间,不禁线程自己决定。线程的执行时间是系统可控的,不会有一直阻塞的问题。
Java使用抢占式调度
抢占式。一个线程用完CPU以后,操做系统会根据线程优先级、线程饥饿状况等数据算出一个总的优先级并分配下一个时间片给某个线程执行。
线程类的构造方法、静态块是被new这个线程类所在的线程所调用的,而run方法里面的代码才是被线程自身所调用的。
Thread t = Thread.currentThread(); String name = t.getName(); System.out.println("name=" + name);
建立线程要花费昂贵的资源和时间,若是任务来了才建立线程那么响应时间会变长,并且一个进程能建立的线程数有限。
为了不这些问题,在程序启动的时候就建立若干线程来响应处理,它们被称为线程池,里面的线程叫工做线程。
Executor框架让你能够建立不一样的线程池。好比单线程池,每次处理一个任务;数目固定的线程池或者是缓存线程池(一个适合不少生存期短的任务的程序的可扩展线程池)。
如下是Java自带的几种线程池: 一、newFixedThreadPool 建立一个指定工做线程数量的线程池。 每当提交一个任务就建立一个工做线程,若是工做线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
二、newCachedThreadPool 建立一个可缓存的线程池。 这种类型的线程池特色是:
三、newSingleThreadExecutor建立一个单线程化的Executor,即只建立惟一的工做者线程来执行任务,若是这个线程异常结束,会有另外一个取代它,保证顺序执行(我以为这点是它的特点)。
单工做线程最大的特色是可保证顺序地执行各个任务,而且在任意给定的时间不会有多个线程是活动的。
四、newScheduleThreadPool 建立一个定长的线程池,并且支持定时的以及周期性的任务执行,相似于Timer。
Executor 和 ExecutorService 这两个接口主要的区别是:
Executors 类提供工厂方法用来建立不一样类型的线程池。
好比: newSingleThreadExecutor() 建立一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来建立固定线程数的线程池,newCachedThreadPool()能够根据须要建立新的线程,但若是已有线程是空闲的会重用已有线程。
Java经过Executors提供四种线程池,分别为:
newCachedThreadPool建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 建立一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。
start()方法被用来启动新建立的线程,并且start()内部调用了run()方法,这和直接调用run()方法的效果不同。
当你调用run()方法的时候,只会是在原来的线程中调用,没有新的线程启动,start()方法才会启动新线程。
两个方法均可以向线程池提交任务,execute()方法的返回类型是void,它定义在Executor接口中, 而submit()方法能够返回持有计算结果的Future对象,它定义在ExecutorService接口中,它扩展了Executor接口,其它线程池类像ThreadPoolExecutor和ScheduledThreadPoolExecutor都有这些方法。
notify()方法不能唤醒某个具体的线程,因此只有一个线程在等待的时候它才有用武之地。而notifyAll()唤醒全部线程并容许他们争夺锁确保了至少有一个线程能继续运行。
当有线程调用了对象的 notifyAll()方法(唤醒全部 wait 线程)或 notify()方法(只随机唤醒一个 wait 线程),被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。也就是说,调用了notify后只要一个线程会由等待池进入锁池,而notifyAll会将该对象等待池内的全部线程移动到锁池中,等待锁竞争
优先级高的线程竞争到对象锁的几率大,倘若某线程没有竞争到该对象锁,它还会留在锁池中,惟有线程再次调用 wait()方法,它才会从新回到等待池中。
一个很明显的缘由是JAVA提供的锁是对象级的而不是线程级的,每一个对象都有锁,经过线程得到。
若是线程须要等待某些锁那么调用对象中的wait()方法就有意义了。若是wait()方法定义在Thread类中,线程正在等待的是哪一个锁就不明显了。
简单的说,因为wait,notify和notifyAll都是锁级别的操做,因此把他们定义在Object类中由于锁属于对象。
主要是由于Java API强制要求这样作,若是你不这么作,你的代码会抛出IllegalMonitorStateException异常。还有一个缘由是为了不wait和notify之间产生竞态条件。
最主要的缘由是为了防止如下这种状况
// 等待者(Thread1) while (condition != true) { // step.1 lock.wait() // step.4 } // 唤醒者(Thread2) condition = true; // step.2 lock.notify(); // step.3
在对以前的代码去掉 synchronized 块以后,若是在等待者判断 condition != true 以后而调用 wait() 以前,唤醒者**将 condition 修改为了 true 同时调用了 notify() **的话,那么等待者在调用了 wait() 以后就没有机会被唤醒了。
join() 的做用:让“主线程”等待“子线程”结束以后才能继续运行。
yield方法能够暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法并且只保证当前线程放弃CPU占用而不能保证使其它线程必定能占用CPU,执行yield()的线程有可能在进入到暂停状态后立刻又被执行。
sleep()方法(休眠)是线程类(Thread)的静态方法,调用此方法会让当前线程暂停执行指定的时间,将执行机会(CPU)让给其余线程,可是对象的锁依然保持,所以休眠时间结束后会自动恢复。注意这里的恢复并非恢复到执行的状态,而是恢复到可运行状态中等待CPU的宠幸。
Java程序中wait和sleep都会形成某种形式的暂停,它们能够知足不一样的须要。
不能被重写,线程的不少方法都是由系统调用的,不能经过子类覆写去改变他们的行为。
Thread类的sleep()和yield()方法将在当前正在执行的线程上运行。
该代码只有在某个A线程执行时会被执行,这种状况下通知某个B线程yield是无心义的(由于B线程原本就没在执行)。所以只有当前线程执行yield才是有意义的。经过使该方法为static,你将不会浪费时间尝试yield 其余线程。
只能给本身喂安眠药,不能给别人喂安眠药。
阻塞式方法是指程序会一直等待该方法完成期间不作其余事情。
ServerSocket的accept()方法就是一直等待客户端链接。这里的阻塞是指调用结果返回以前,当前线程会被挂起,直到获得结果以后才会返回。
此外,还有异步和非阻塞式方法在任务完成前就返回。
在Java里面没有办法强制启动一个线程,它是被线程调度器控制着
简单的说,若是异常没有被捕获该线程将会中止执行。
Thread.UncaughtExceptionHandler是用于处理未捕获异常形成线程忽然中断状况的一个内嵌接口。
当一个未捕获异常将形成线程中断的时候JVM会使用Thread.getUncaughtExceptionHandler()来查询线程的UncaughtExceptionHandler并将线程和异常做为参数传递给handler的uncaughtException()方法进行处理。
在Java中有两种异常。
非运行时异常(Checked Exception):这种异常必须在方法声明的throws语句指定,或者在方法体内捕获。例如:IOException和ClassNotFoundException。
运行时异常(Unchecked Exception):这种异常没必要在方法声明中指定,也不须要在方法体中捕获。例如,NumberFormatException。
由于run()方法不支持throws语句,因此当线程对象的run()方法抛出非运行异常时,咱们必须捕获而且处理它们。当运行时异常从run()方法中抛出时,默认行为是在控制台输出堆栈记录而且退出程序。
好在,java提供给咱们一种在线程对象里捕获和处理运行时异常的一种机制。实现用来处理运行时异常的类,这个类实现UncaughtExceptionHandler接口而且实现这个接口的uncaughtException()方法。示例:
package concurrency; import java.lang.Thread.UncaughtExceptionHandler; public class Main2 { public static void main(String[] args) { Task task = new Task(); Thread thread = new Thread(task); thread.setUncaughtExceptionHandler(new ExceptionHandler()); thread.start(); } } class Task implements Runnable{ @Override public void run() { int numero = Integer.parseInt("TTT"); } } class ExceptionHandler implements UncaughtExceptionHandler{ @Override public void uncaughtException(Thread t, Throwable e) { System.out.printf("An exception has been captured\n"); System.out.printf("Thread: %s\n", t.getId()); System.out.printf("Exception: %s: %s\n", e.getClass().getName(),e.getMessage()); System.out.printf("Stack Trace: \n"); e.printStackTrace(System.out); System.out.printf("Thread status: %s\n",t.getState()); } }
当一个线程抛出了异常而且没有被捕获时(这种状况只多是运行时异常),JVM检查这个线程是否被预置了未捕获异常处理器。若是找到,JVM将调用线程对象的这个方法,并将线程对象和异常做为传入参数。
Thread类还有另外一个方法能够处理未捕获到的异常,即静态方法setDefaultUncaughtExceptionHandler()。这个方法在应用程序中为全部的线程对象建立了一个异常处理器。
当线程抛出一个未捕获到的异常时,JVM将为异常寻找如下三种可能的处理器。
处于等待状态的线程可能会收到错误警报和伪唤醒,若是不在循环中检查等待条件,程序就会在没有知足结束条件的状况下退出。
一、通常来讲,wait确定是在某个条件调用的,不是if就是while 二、放在while里面,是防止出于waiting的对象被别的缘由调用了唤醒方法,可是while里面的条件并无知足(也可能当时知足了,可是因为别的线程操做后,又不知足了),就须要再次调用wait将其挂起。 三、其实还有一点,就是while最好也被同步,这样不会致使错失信号。
while(condition){ wait(); }
忙循环就是程序员用循环让一个线程等待,不像传统方法wait()、 sleep() 或 yield(),它们都放弃了CPU控制,而忙循环不会放弃CPU,它就是在运行一个空循环。
这么作的目的是为了保留CPU缓存,在多核系统中,一个等待线程醒来的时候可能会在另外一个内核运行,这样会重建缓存。为了不重建缓存和减小等待重建的时间就可使用它了。
没有得到锁的线程一直循环在那里看是否该锁的保持者已经释放了锁,这就是自旋锁。
互斥锁:从等待到解锁过程,线程会从sleep状态变为running状态,过程当中有线程上下文的切换,抢占CPU等开销。
自旋锁不会引发调用者休眠,若是自旋锁已经被别的线程保持,调用者就一直循环在那里看是否该自旋锁的保持者释放了锁。因为自旋锁不会引发调用者休眠,因此自旋锁的效率远高于互斥锁。
虽然自旋锁效率比互斥锁高,但它会存在下面两个问题: 一、自旋锁一直占用CPU,在未得到锁的状况下,一直运行,若是不能在很短的时间内得到锁,会致使CPU效率下降。 二、试图递归地得到自旋锁会引发死锁。递归程序决不能在持有自旋锁时调用它本身,也决不能在递归调用时试图得到相同的自旋锁。
因而可知,咱们要慎重的使用自旋锁,自旋锁适合于锁使用者保持锁时间比较短而且锁竞争不激烈的状况。正是因为自旋锁使用者通常保持锁时间很是短,所以选择自旋而不是睡眠是很是必要的,自旋锁的效率远高于互斥锁。
同一个Runnable,使用全局变量。
第一种:将共享数据封装到一个对象中,把这个共享数据所在的对象传递给不一样的Runnable
第二种:将这些Runnable对象做为某一个类的内部类,共享的数据做为外部类的成员变量,对共享数据的操做分配给外部类的方法来完成,以此实现对操做共享数据的互斥和通讯,做为内部类的Runnable来操做外部类的方法,实现对数据的操做
class ShareData { private int x = 0; public synchronized void addx(){ x++; System.out.println("x++ : "+x); } public synchronized void subx(){ x--; System.out.println("x-- : "+x); } } public class ThreadsVisitData { public static ShareData share = new ShareData(); public static void main(String[] args) { //final ShareData share = new ShareData(); new Thread(new Runnable() { public void run() { for(int i = 0;i<100;i++){ share.addx(); } } }).start(); new Thread(new Runnable() { public void run() { for(int i = 0;i<100;i++){ share.subx(); } } }).start(); } }
Runnable和Callable都是接口, 不一样之处: 1.Callable能够返回一个类型V,而Runnable不能够 2.Callable可以抛出checked exception,而Runnable不能够。 3.Runnable是自从java1.1就有了,而Callable是1.5以后才加上去的 4.Callable和Runnable均可以应用于executors。而Thread类只支持Runnable.
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ThreadTestB { public static void main(String[] args) { ExecutorService e=Executors.newFixedThreadPool(10); Future f1=e.submit(new MyCallableA()); Future f2=e.submit(new MyCallableA()); Future f3=e.submit(new MyCallableA()); System.out.println("--Future.get()...."); try { System.out.println(f1.get()); System.out.println(f2.get()); System.out.println(f3.get()); } catch (InterruptedException e1) { e1.printStackTrace(); } catch (ExecutionException e1) { e1.printStackTrace(); } e.shutdown(); } } class MyCallableA implements Callable<String>{ public String call() throws Exception { System.out.println("开始执行Callable"); String[] ss={"zhangsan","lisi"}; long[] num=new long[2]; for(int i=0;i<1000000;i++){ num[(int)(Math.random()*2)]++; } if(num[0]>num[1]){ return ss[0]; }else if(num[0]<num[1]){ throw new Exception("弃权!"); }else{ return ss[1]; } } }
CountDownLatch和CyclicBarrier都可以实现线程之间的等待,只不过它们侧重点不一样:
CountDownLatch的用法:
public class Test { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(2); new Thread(){ public void run() { try { System.out.println("子线程"+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); new Thread(){ public void run() { try { System.out.println("子线程"+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); try { System.out.println("等待2个子线程执行完毕..."); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } catch (InterruptedException e) { e.printStackTrace(); } } }
CyclicBarrier用法:
public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() { @Override public void run() { System.out.println("当前线程"+Thread.currentThread().getName()); } }); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操做 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其余线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("全部线程写入完毕,继续处理其余任务..."); } } }
interrupt方法用于中断线程。调用该方法的线程的状态为将被置为”中断”状态。
注意:线程中断仅仅是置线程的中断状态位,不会中止线程。须要用户本身去监视线程的状态为并作处理。支持线程中断的方法(也就是线程中断后会抛出interruptedException的方法)就是在监视线程的中断状态,一旦线程的中断状态被置为“中断状态”,就会抛出中断异常。
isInterrupted 只是简单的查询中断状态,不会对状态进行修改。
ConcurrentHashMap的结构是比较复杂的,都深究去本质,其实也就是数组和链表而已。咱们由浅入深慢慢的分析其结构。
先简单分析一下,ConcurrentHashMap 的成员变量中,包含了一个 Segment 的数组(final Segment<K,V>[] segments;),而 Segment 是 ConcurrentHashMap 的内部类,而后在 Segment 这个类中,包含了一个 HashEntry 的数组(transient volatile HashEntry<K,V>[] table;)。而 HashEntry 也是ConcurrentHashMap 的内部类。HashEntry 中,包含了 key 和 value 以及 next 指针(相似于 HashMap 中 Entry),因此 HashEntry 能够构成一个链表。
因此通俗的讲,ConcurrentHashMap 数据结构为一个 Segment 数组,Segment 的数据结构为 HashEntry 的数组,而 HashEntry 存的是咱们的键值对,能够构成链表。
首先,咱们看一下 HashEntry 类。
HashEntry 用来封装散列映射表中的键值对。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。其类的定义为:
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } ... ... }
HashEntry 的学习能够类比着 HashMap 中的 Entry。咱们的存储键值对的过程当中,散列的时候若是发生“碰撞”,将采用“分离链表法”来处理碰撞:把碰撞的 HashEntry 对象连接成一个链表。
以下图,咱们在一个空桶中插入 A、B、C 两个 HashEntry 对象后的结构图(其实应该为键值对,在这进行了简化以方便更容易理解):
Segment 的类定义为static final class Segment<K,V> extends ReentrantLock implements Serializable。其继承于 ReentrantLock 类,从而使得 Segment 对象能够充当锁的角色。Segment 中包含HashEntry 的数组,其能够守护其包含的若干个桶(HashEntry的数组)。Segment 在某些意义上有点相似于 HashMap了,都是包含了一个数组,而数组中的元素能够是一个链表。
table:table 是由 HashEntry 对象组成的数组若是散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式连接成一个链表table数组的数组成员表明散列映射表的一个桶每一个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分若是并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16。
count 变量是计算器,表示每一个 Segment 对象管理的 table 数组(若干个 HashEntry 的链表)包含的HashEntry 对象的个数。之因此在每一个Segment对象中包含一个 count 计数器,而不在 ConcurrentHashMap 中使用全局的计数器,是为了不出现“热点域”而影响并发性。
/** * Segments are specialized versions of hash tables. This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */ static final class Segment<K,V> extends ReentrantLock implements Serializable { /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ transient volatile HashEntry<K,V>[] table; /** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ transient int count; transient int modCount; /** * 装载因子 */ final float loadFactor; }
咱们经过下图来展现一下插入 ABC 三个节点后,Segment 的示意图:
其实从我我的角度来讲,Segment结构是与HashMap很像的。
ConcurrentHashMap 的结构中包含的 Segment 的数组,在默认的并发级别会建立包含 16 个 Segment 对象的数组。经过咱们上面的知识,咱们知道每一个 Segment 又包含若干个散列表的桶,每一个桶是由 HashEntry 连接起来的一个链表。若是 key 可以均匀散列,每一个 Segment 大约守护整个散列表桶总数的 1/16。
下面咱们还有经过一个图来演示一下 ConcurrentHashMap 的结构:
在 ConcurrentHashMap 中,当执行 put 方法的时候,会须要加锁来完成。咱们经过代码来解释一下具体过程: 当咱们 new 一个 ConcurrentHashMap 对象,而且执行put操做的时候,首先会执行 ConcurrentHashMap 类中的 put 方法,该方法源码为:
/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p> The value can be retrieved by calling the <tt>get</tt> method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * @throws NullPointerException if the specified key or value is null */ @SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
咱们经过注释能够了解到,ConcurrentHashMap 不容许空值。该方法首先有一个 Segment 的引用 s,而后会经过 hash() 方法对 key 进行计算,获得哈希值;继而经过调用 Segment 的 put(K key, int hash, V value, boolean onlyIfAbsent)方法进行存储操做。该方法源码为:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //加锁,这里是锁定的Segment而不是整个ConcurrentHashMap HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //获得hash对应的table中的索引index int index = (tab.length - 1) & hash; //找到hash对应的是具体的哪一个桶,也就是哪一个HashEntry链表 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { //解锁 unlock(); } return oldValue; }
关于该方法的某些关键步骤,在源码上加上了注释。
须要注意的是:加锁操做是针对的 hash 值对应的某个 Segment,而不是整个 ConcurrentHashMap。由于 put 操做只是在这个 Segment 中完成,因此并不须要对整个 ConcurrentHashMap 加锁。因此,此时,其余的线程也能够对另外的 Segment 进行 put 操做,由于虽然该 Segment 被锁住了,但其余的 Segment 并无加锁。同时,读线程并不会由于本线程的加锁而阻塞。
正是由于其内部的结构以及机制,因此 ConcurrentHashMap 在并发访问的性能上要比Hashtable和同步包装以后的HashMap的性能提升不少。在理想状态下,ConcurrentHashMap 能够支持 16 个线程执行并发写操做(若是并发级别设置为 16),及任意数量线程的读操做。
在实际的应用中,散列表通常的应用场景是:除了少数插入操做和删除操做外,绝大多数都是读取操做,并且读操做在大多数时候都是成功的。正是基于这个前提,ConcurrentHashMap 针对读操做作了大量的优化。经过 HashEntry 对象的不变性和用 volatile 型变量协调线程间的内存可见性,使得 大多数时候,读操做不须要加锁就能够正确得到值。这个特性使得 ConcurrentHashMap 的并发性能在分离锁的基础上又有了近一步的提升。
ConcurrentHashMap 是一个并发散列映射表的实现,它容许彻底并发的读取,而且支持给定数量的并发更新。相比于 HashTable 和用同步包装器包装的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不一样线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也致使对容器的访问变成串行化的了。
ConcurrentHashMap 的高并发性主要来自于三个方面:
使用分离锁,减少了请求 同一个锁的频率。
经过 HashEntery 对象的不变性及对同一个 Volatile 变量的读 / 写来协调内存可见性,使得 读操做大多数时候不须要加锁就能成功获取到须要的值。因为散列映射表在实际应用中大多数操做都是成功的 读操做,因此 2 和 3 既能够减小请求同一个锁的频率,也能够有效减小持有锁的时间。经过减少请求同一个锁的频率和尽可能减小持有锁的时间 ,使得 ConcurrentHashMap 的并发性相对于 HashTable 和用同步包装器包装的 HashMap有了质的提升。
阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
1)add(E e): 添加元素,若是BlockingQueue能够容纳,则返回true,不然报异常
2)offer(E e): 添加元素,若是BlockingQueue能够容纳,则返回true,不然返回false.
3)put(E e): 添加元素,若是BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
4)poll(long timeout, TimeUnit timeUnit): 取走BlockingQueue里排在首位的对象,若不能当即取出,则能够等timeout参数规定的时间,取不到时返回null
5)take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
1)ArrayBlockingQueue: 有界的先入先出顺序队列,构造方法肯定队列的大小.
2)LinkedBlockingQueue: 无界的先入先出顺序队列,构造方法提供两种,一种初始化队列大小,队列即有界;第二种默认构造方法,队列无界(有界即Integer.MAX_VALUE)
4)SynchronousQueue: 特殊的BlockingQueue,没有空间的队列,即必须有取的方法阻塞在这里的时候才能放入元素。
3)PriorityBlockingQueue: 支持优先级的阻塞队列 ,存入对象必须实现Comparator接口 (须要注意的是 队列不是在加入元素的时候进行排序,而是取出的时候,根据Comparator来决定优先级最高的)。
BlockingQueue 实现主要用于生产者-使用者队列,BlockingQueue 实现是线程安全的。全部排队方法均可以使用内部锁或其余形式的并发控制来自动达到它们的目的
这是一个生产者-使用者场景的一个用例。注意,BlockingQueue 能够安全地与多个生产者和多个使用者一块儿使用 此用例来自jdk文档
//这是一个生产者类 class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ... } } Object produce() { ... } } //这是一个消费者类 class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ... } } void consume(Object x) { ... } } //这是实现类 class Setup { void main() { //实例一个非阻塞队列 BlockingQueue q = new SomeQueueImplementation(); //将队列传入两个消费者和一个生产者中 Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
合理利用线程池可以带来三个好处。第一:下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的消耗。第二:提升响应速度。当任务到达时,任务能够不须要等到线程建立就能当即执行。第三:提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。可是要作到合理的利用线程池,必须对其原理了如指掌。
咱们能够经过ThreadPoolExecutor来建立一个线程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
建立一个线程池须要输入几个参数:
咱们可使用execute提交的任务,可是execute方法没有返回值,因此没法判断任务是否被线程池执行成功。经过如下代码可知execute方法输入的任务是一个Runnable类的实例。
threadsPool.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } });
咱们也可使用submit 方法来提交任务,它会返回一个future,那么咱们能够经过这个future来判断任务是否执行成功,经过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后当即返回,这时有可能任务没有执行完。
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 处理中断异常 } catch (ExecutionException e) { // 处理没法执行任务异常 } finally { // 关闭线程池 executor.shutdown(); }
咱们能够经过调用线程池的shutdown或shutdownNow方法来关闭线程池,它们的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。可是它们存在必定的区别,shutdownNow首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于咱们应该调用哪种方法来关闭线程池,应该由提交到线程池的任务特性决定,一般调用shutdown来关闭线程池,若是任务不必定要执行完,则能够调用shutdownNow。
流程分析:线程池的主要工做流程以下图:
从上图咱们能够看出,当提交一个新任务到线程池时,线程池的处理流程以下:
上面的流程分析让咱们很直观的了解了线程池的工做原理,让咱们再经过源代码来看看是如何实现的。线程池执行任务的方法以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //若是线程数小于基本线程数,则建立线程并执行当前任务 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //如线程数大于等于基本线程数或线程建立失败,则将当前任务放到工做队列中。 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } //若是线程池不处于运行中或任务没法放入队列,而且当前线程数量小于最大容许的线程数量, 则建立一个线程执行任务。 else if (!addIfUnderMaximumPoolSize(command)) //抛出RejectedExecutionException异常 reject(command); // is shutdown or saturated } }
工做线程。线程池建立线程时,会将线程封装成工做线程Worker,Worker在执行完任务后,还会无限循环获取工做队列里的任务来执行。咱们能够从Worker的run方法里看到这点:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
要想合理的配置线程池,就必须首先分析任务特性,能够从如下几个角度来进行分析:
任务性质不一样的任务能够用不一样规模的线程池分开处理。CPU密集型任务配置尽量小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则因为线程并非一直在执行任务,则配置尽量多的线程,如2*Ncpu。混合型的任务,若是能够拆分,则将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,若是这两个任务执行时间相差太大,则不必进行分解。咱们能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。
优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。
依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增长系统的稳定性和预警能力,能够根据须要设大一点,好比几千。有一次咱们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,经过排查发现是数据库出现了问题,致使执行SQL变得很是缓慢,由于后台任务线程池里的任务全是须要向数据库查询和插入数据的,因此致使线程池里的工做线程所有阻塞住,任务积压在线程池里。若是当时咱们设置成无界队列,线程池的队列就会愈来愈多,有可能会撑满内存,致使整个系统不可用,而不仅是后台任务出现问题。固然咱们的系统全部的任务是用的单独的服务器部署的,而咱们使用不一样规模的线程池跑不一样类型的任务,可是出现这样问题时也会影响到其余任务。
经过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可使用
经过扩展线程池进行监控。经过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,咱们能够在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }
Java中的Semaphore是一种新的同步类,它是一个计数信号。
从概念上讲,信号量维护了一个许可集合。若有必要,在许可可用前会阻塞每个 acquire(),而后再获取该许可。每一个 release()添加一个许可,从而可能释放一个正在阻塞的获取者。
可是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采起相应的行动。
信号量经常用于多线程的代码中,好比数据库链接池。
同步方法默认用this或者当前类class对象做为锁; 同步代码块能够选择以什么来加锁,比同步方法要更细颗粒度,咱们能够选择只同步会发生同步问题的部分代码而不是整个方法; 同步方法使用关键字 synchronized修饰方法,而同步代码块主要是修饰须要进行同步的代码,用 synchronized(object){代码内容}进行修饰;
使用多线程的时候,一种很是简单的避免死锁的方式就是:指定获取锁的顺序,并强制线程按照指定的顺序获取锁。所以,若是全部的线程都是以一样的顺序加锁和释放锁,就不会出现死锁了。
原文:Java架构笔记
免费Java高级资料须要本身领取,涵盖了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高并发分布式等教程,一共30G。
传送门: https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q