实现线程有三种方式:使用内核线程实现、使用用户线程实现和使用用户线程加轻量级进程混合实现。内核线程是直接由操做系统内核支持的线程,经过内核完成线程切换,内核经过操纵调度器对线程进行调度,并负责将线程的任务映射到各个处理器上。
程序不会直接使用内核线程,而是去使用内核线程的高级接口-轻量级进程。每一个轻量级进程由一个内核线程支持。这种轻量级进程与内核线程之间1:1的关系称为一对一的线程模型。Sun JDK 的Windows版和Linux版都是使用一对一的线程模型,一条Java线程就映射到一条轻量级进程中。因为内核线程的支持,每一个轻量级进程成为一个独立的调度单元,一个轻量级进程的阻塞不会影响整个进程。但也是由于基于内核线程实现,各类线程操做,如建立、析构及同步都要进行系统调用,须要在用户态和内核态中来回切换,调用代价高,其次轻量级进程消耗必定的内核资源,所以一个系统支持轻量级进程的数量有限。java
线程调度是指系统为线程分配处理器使用权的过程,分为协同式调度和抢占式调度。协同式调度的多线程系统,线程执行时间由线程自己控制,线程完成本身的工做以后,主动通知系统切换到另外一个线程上。优势是实现简单,切换操做是由线程主动的,对线程可知,没有线程同步问题。缺点是线程执行时间不可控制,若是一个线程阻塞,可能致使整个系统奔溃。抢占式调度的多线程系统,每一个线程有系统分配执行时间,线程的切换不禁线程自己决定。(yield可让出执行时间,但线程自己没法获取执行时间)优势是线程执行时间系统可控。Java使用的线程调度方式就是抢占式调度。算法
Java线程的6种状态:编程
线程建立成功但还没有启动就是New;Runable状态的线程可能正在执行,也可能在等待CPU分配执行时间;当线程等待另外一个线程通知调度器一个条件时就进入等待状态,例如Object.wait、Thread.join;当这些方法指定时间参数时就成了限时等待;当一个线程试图获取一个内部的对象锁,而该锁被另外一线程持有时,该线程进入阻塞状态;当线程因run方法正常退出而天然死亡,或者由于没有捕获的异常死亡都会致使线程进入Terminated状态。安全
Java中断机制是一种协做机制,经过中断并不能直接终止另外一个线程,而须要被中断的线程本身处理中断。当对一个线程调用interrup方法时,线程的中断状态将被置位。这是每个线程都具备的boolean标志位。每一个线程都应该不时地检查这个标志,以判断线程是否被中断,并及时处理。服务器
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); } public static boolean interrupted() { return currentThread().isInterrupted(true); } public boolean isInterrupted() { return isInterrupted(false); } private native boolean isInterrupted(boolean ClearInterrupted);
能够看到,interrupt方法经过设置中断位来完成中断。interrupted方法和isInterrupted方法都是经过调用native方法来检测中断的,interrupted是一个静态方法,用来检测当前线程是否被中断,并且interrupted会清除该线程的中断状态;isInterrupted是一个实例方法,可用来检验是否有线程被中断,该方法不会改变中断状态。多线程
通常来讲,咱们中断线程的目的极可能是想中止线程执行。怎么中止线程执行呢?咱们能够在判断中断置位后,用return退出run方法。但这样设计并不优雅,另一种方式,就是抛出InterruptedException并在run方法里捕获。捕获后怎么处理也是件值得考虑的事,最好的方法是直接抛给调用者处理,但run方法是重写方法,结构已固定,没法抛出异常,咱们还能够在捕获InterruptedException后从新中断当前线程,让调用者检测。并发
wait方法是挂起当前线程,释放当前对象的控制权(释放锁),而后线程处于等待状态。notify是通知正在等待对象控制权(锁)的线程能够继续运行。notifyAll是通知全部等待对象控制权的线程继续运行。这几个方法是基于monitor监视器锁来实现的,因此必须在同步块内执行。app
sleep让当前线程暂停指定时间。wait方法依赖于同步,sleep能够直接调用。由于sleep只是暂时让出CPU的执行权,并不释放锁,而wait须要释放锁。举个简单的例子:异步
public class WaitTest { private static Object o = new Object(); static class Thread1 extends Thread{ @Override public void run() { try { synchronized(o){ System.out.println("Thread1--start"); //o.wait(); Thread.sleep(2000); System.out.println("Thread1--end"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("a"); } } } static class Thread2 extends Thread{ @Override public void run() { synchronized(o){ System.out.println("Thread2--start"); o.notify(); System.out.println("Thread2--end"); } } } public static void main(String[] args) throws InterruptedException{ Thread1 t1 = new Thread1(); Thread2 t2 = new Thread2(); t1.start(); Thread.sleep(100);//保证t1先得到锁 t2.start(); } }
在线程1里分别调用sleep和wait会有不一样的结果,调用sleep时线程1不会释放锁,因此会打印完“Thread1 start”、“Thread1 end”,再进入线程2的打印。调用wait时,打印完“Thread1-start”,就会释放锁,这时线程2的打印得以继续进行,会打印“Thread2 start”。ide
Thread.yield()方法会将当前线程从Running转为Runnable,让出当前对进程的使用,以便其余线程有机会执行,不过调度器能够忽虐该方法,也不能指定暂停时间,通常只用来调试和测试。
Thread.join()方法用于将异步的线程“合并”为同步的线程,父线程等待子线程执行完成后再执行。其实并不算合并,而是调用join的线程进入限时等待,不断检查子线程状态,在子线程执行完成后恢复执行。看一下它的实现原理:
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
能够看到,join是经过子线程不断轮询本身状态一直到执行完毕才返回继续执行父线程。
在Java中,最基本的互斥同步手段就是synchronized关键字,synchronized自动提供一个锁以及相关的条件。synchronized同步块对于同一条线程来讲是可重入的,其次,同步块在已进入的线程执行完以前,会阻塞后面其余线程的进入。前面提到,Java的线程是映射到系统原生线程上,阻塞或唤醒一个线程,都须要操做系统帮忙完成,须要从用户态转为核心态,这须要耗费很长时间。所以synchronized是重量级操做,虚拟机自己会有一些优化手段,好比在阻塞以前加入自旋等待过程,避免频繁切入核心态之中。
重入锁(ReentrantLock)与synchronized类似,具有同样的线程重入性,一个表现为API层面的互斥锁,另外一个表现为原生语法层面的互斥锁。ReetrantLock增长了一些功能:等待可中断、公平锁和锁绑定多个条件。
公平锁是指多个线程等待同一个锁时,必须按照申请锁的时间顺序来依次得到锁,能够经过带布尔值的构造函数要求使用公平锁。
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
等待可中断是指持有锁的线程长期不释放锁的时候,正在等待的线程能够选择放弃等待:
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTestOne { static int count = 0; static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) throws Exception{ Thread a = new Thread(new CountThread("a")); Thread b = new Thread(new CountThread("b")); a.start(); Thread.sleep(100);//确保b线程后执行,a能先得到锁 b.start(); Thread.sleep(500);//等待0.5s后,a线程尚未释放锁,经过中断放弃等待 b.interrupt(); } static class CountThread extends Thread{ String name; CountThread(String name){ this.name = name; } @Override public void run() { /*try{ lock.lockInterruptibly(); }catch(InterruptedException e){ System.out.println("Thread "+name+" interrupted"); return; }*/ lock.lock(); System.out.println(Thread.currentThread().isInterrupted()); try{ System.out.println("Thread "+name+" begin"); for(int i=0; i<2000000; i++){ for(int j=0; j<100000; j++){ count++; } } System.out.println("Thread "+name+" end"); }finally{ lock.unlock(); } } } }
咱们先看lock.lock的执行结果:
能够看见a线程执行完后b才开始执行,且b线程的中断位已被置位。说明lock是阻塞式的获取锁,只有在成功获取到锁之后才处理中断信息,而且怎么处理由调用端决定,lock只负责给中断位置位。
再看一下lock.lockInterruptibly的执行结果:
能够看到,lockInterruptibly会当即处理中断信息,抛出InterruptedException,而不用等到获取锁。
锁绑定多个条件是指一个ReentrantLock对象能够同时绑定多个Condition对象。在synchronized中,锁对象的wait和notify其实实现一个隐含的条件,若是要和多个条件关联,必须额外添加锁。
public class ReentrantLockTestTwo { static ReentrantLock lock = new ReentrantLock(); static Condition productCondition = lock.newCondition(); static Condition customerCondition = lock.newCondition(); static Set<Object> set = new HashSet<Object>(8); public static void main(String[] args) { ProductThread pt = new ProductThread(); CustomerThread ct = new CustomerThread(); new Thread(pt).start(); new Thread(pt).start(); new Thread(ct).start(); new Thread(ct).start(); } static class ProductThread extends Thread{ @Override public void run() { lock.lock(); try{ System.out.println("进入生产线程"); for(;;){ Thread.sleep(1000); if(set.size()>=6){ customerCondition.signalAll(); productCondition.await(); }else{ System.out.println("开始生产"); Object o = new Object(); set.add(o); System.out.println("目前有"+set.size()+"个产品"); } } }catch(Exception e){ }finally{ lock.unlock(); } } } static class CustomerThread extends Thread{ @Override public void run() { lock.lock(); try{ System.out.println("进入使用线程"); for(;;){ Thread.sleep(1000); if(set.size()<=2){ productCondition.signalAll(); customerCondition.await(); }else{ System.out.println("开始使用"); Iterator it = set.iterator(); if(it.hasNext()){ Object o = it.next(); set.remove(o); } System.out.println("目前有"+set.size()+"个产品"); } } }catch(Exception e){ }finally{ lock.unlock(); } } } }
上面展现了ReentrantLock锁绑定多个条件。能够看到咱们在产品上加锁并在锁上新建了两个条件:生产条件和使用条件。当产品数量多于6时,让生产线程等待,小于2时,让使用线程等待。从执行结果能够看出,每次唤醒的线程只多是生产或使用线程的一种,而并无唤醒这个锁上的全部线程。
锁优化有几种措施:自旋锁与自适应锁、锁消除、锁粗化、轻量级锁和偏向锁。
前面提到同步块会阻塞其余线程,而线程的阻塞和恢复须要系统切换状态,耗费较长时间。因此若是持有锁的线程很快就会释放锁时,咱们并不须要让等待线程阻塞,而是让它执行一个忙循环,这就是所谓的自旋锁。但自旋锁虽然避免了线程切换的开销,却要占用处理器时间。当锁被长时间占用时,自旋锁除了浪费处理器资源就没有做用了。JDK1.6引入了自适应的自旋锁,有系统决定自旋时间,改善性能。
锁消除是指虚拟机即时编译器在运行时,对一些代码上要求同步,可是被检测到不可能存在共享数据竞争的锁进行消除。锁消除主要断定依据来源于逃逸分析的数据支持。
若是一系列的连续操做都对同一个对象反复加锁和解锁,甚至加锁操做是出如今循环体中,频繁地进行互斥同步操做也会致使没必要要的性能损耗。这时能够锁粗化。
偏向锁是消除数据在无竞争状况下的同步,所谓偏向,是指其偏向第一个得到它的线程。假设JVM启用了偏向锁,当锁对象第一次被线程得到的时候,虚拟机将会把对象头的标志位设为“01”,即偏向模式。同时使用CAS操做把获取到这个锁的线程的ID记录在对象的Mark Word中,若是CAS成功,持有偏向锁的线程之后每次进入这个锁相关的同步块时,虚拟机都再也不进行任何同步操做。第二个线程来访问时,检查原来持有对象锁线程是否存活,若已介素则偏向锁偏向第二个线程,不然第一个线程若是存活,经过线程栈检查对象是否处于锁定状态,若是无锁,则撤销偏向恢复到未锁定对象,若是仍然锁定,则升级为轻量级锁。
轻量级锁是在无竞争状况(我的认为是轻度竞争)下使用CAS操做去消除同步使用的互斥量,线程在执行同步块以前,虚拟机在当前线程的栈帧中创建Lock Record来存储对象目前Mark Word的拷贝,而后JVM经过CAS替换对象Mark Word为Lock Record的指针。若是成功,对象处于轻量级锁定,失败说明存在额外线程竞争锁,则尝试自旋,若是自旋时间内还未得到锁,则开始膨胀,修改MarkWord为重量级锁的指针,而且阻塞本身。
同步机制采用了“以时间换空间”的方式,而ThreadLocal采用了“以空间换时间”的方式。ThreadLocal会在每一个线程中为变量建立一个副本,即每一个线程内部都会有一个该变量,且在线程内部任何地方可使用,线程之间互不影响,这样须要在多线程使用的变量就不存在线程安全问题。
ThreadLocal自己并不存储变量值,它自己其实只是一个键值对的键,用来让线程从ThreadLocalMap中获取Value,ThreadLocalMap是每一个线程内部的容器。
能够看一下ThreadLocal的源码:
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
同时,咱们看一下ThreadLocalMap的源码,会发现它的key使用的是ThreadLocal的弱引用。至于为何用弱引用,是由于从上图咱们能够看见一共有两条引用链到ThreadLocal变量,若是ThreadLocalRef置空,也就是程序再也不访问ThreadLocal变量了。此时若是key使用的是强引用,那么根据判断对象存亡的可达性分析算法,ThreadLocal并不会被回收,由于还有一条GC root的引用链到ThreadLocal上;若是使用的是弱引用,咱们知道弱引用只会存活到下一次JVM GC时,ThreadLocal就能够被回收。
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
使用弱引用ThreadLocal当然能够被回收,可是带来新的问题。ThreadLocal被回收后ThreadLocalMap中会出现key为null的Entry,意味着没有办法访问这些key为null的Entry的value,若是当前线程迟迟不结束,value对应的对象不被回收,就会致使内存泄漏。从下面的代码看到,ThreadLocal的set、get、remove方法在一些时机下会清理这些value,但这不及时,仍是会有一些内存泄漏,最好的办法时咱们能够经过每次使用完ThreadLocal后,调用它的remove方法来避免这种状况。
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; } private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
Collections做为集合的工具类,除了提供一些有效的算法以外,还能够对集合进行包装。其中一种就是非同步集合包装成同步集合。
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) { return new SynchronizedMap<>(m); } private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} } private transient Set<K> keySet; private transient Set<Map.Entry<K,V>> entrySet; private transient Collection<V> values; public Set<K> keySet() { synchronized (mutex) { if (keySet==null) keySet = new SynchronizedSet<>(m.keySet(), mutex); return keySet; } } public Set<Map.Entry<K,V>> entrySet() { synchronized (mutex) { if (entrySet==null) entrySet = new SynchronizedSet<>(m.entrySet(), mutex); return entrySet; } } public Collection<V> values() { synchronized (mutex) { if (values==null) values = new SynchronizedCollection<>(m.values(), mutex); return values; } } public boolean equals(Object o) { if (this == o) return true; synchronized (mutex) {return m.equals(o);} } public int hashCode() { synchronized (mutex) {return m.hashCode();} } public String toString() { synchronized (mutex) {return m.toString();} } // Override default methods in Map @Override public V getOrDefault(Object k, V defaultValue) { synchronized (mutex) {return m.getOrDefault(k, defaultValue);} } @Override public void forEach(BiConsumer<? super K, ? super V> action) { synchronized (mutex) {m.forEach(action);} } @Override public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { synchronized (mutex) {m.replaceAll(function);} } @Override public V putIfAbsent(K key, V value) { synchronized (mutex) {return m.putIfAbsent(key, value);} } @Override public boolean remove(Object key, Object value) { synchronized (mutex) {return m.remove(key, value);} } @Override public boolean replace(K key, V oldValue, V newValue) { synchronized (mutex) {return m.replace(key, oldValue, newValue);} } @Override public V replace(K key, V value) { synchronized (mutex) {return m.replace(key, value);} } @Override public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { synchronized (mutex) {return m.computeIfAbsent(key, mappingFunction);} } @Override public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.computeIfPresent(key, remappingFunction);} } @Override public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.compute(key, remappingFunction);} } @Override public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { synchronized (mutex) {return m.merge(key, value, remappingFunction);} } private void writeObject(ObjectOutputStream s) throws IOException { synchronized (mutex) {s.defaultWriteObject();} } }
能够看见,其实包装的原理很简单,无非是对原来的全部操做加上同步锁,这样非同步集合就成了同步集合。
读写锁其实就是共享锁和排它锁。若是对资源加了写锁,其余线程没法再得到读锁或写锁,但持有写锁的线程,能够对资源加读锁(锁降级)。若是线程对资源加了读锁,其余线程能够继续加读锁。举个例子:几我的一块儿开发,SVN服务器上的代码你们能够同时查看,但对同一段代码的修改提交同时只能一我的操做。这里查看就须要读锁,提交就须要加写锁,以下代码。
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; public class ReentrantReadWriteLockTest { static int readCount = 0; static int writeCount = 0; static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); static String code = "hello world"; static ReadLock readlock = lock.readLock(); static WriteLock writelock = lock.writeLock(); public static void main(String[] args) { ReadThread r = new ReadThread(); WriteThread w = new WriteThread(); for(int i=0; i<3; i++){ new Thread(r).start(); new Thread(w).start(); } } static class ReadThread extends Thread{ @Override public void run() { while(true){ readlock.lock(); try{ readCount ++; System.out.println("同时有"+readCount+"个线程同时读的内容: "+code); String temp = new String(code); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(code.equals(temp)); readCount --; }finally{ readlock.unlock(); } } } } static class WriteThread extends Thread{ @Override public void run() { while(true){ writelock.lock(); try{ writeCount ++; code = code + "a"; System.out.println("同时有"+writeCount+"个线程写的内容: "+code); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } writeCount --; }finally{ writelock.unlock(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
咱们来看一下运行结果:
能够看到,有多个线程同时读取代码,但任意时刻只有一个线程进行更改。且读的时候不容许更改(代码是经过比较先后两次读到的内容来验证读写锁不兼容的,这不够严谨,暂时没有想到更好例子)。至于锁降级,由于在修改数据后写线程没有再用到数据,因此上例中没有用锁降级,在此摘抄一段话来讲明其必要性。
参考:《深刻理解Java虚拟机》、《Java并发编程实战》、《Java核心技术》。