本文主要整理自《Java并发编程实战》java
当某个操做没法执行下去时,就会发生活跃性问题,如死锁,饥饿,活锁等ios
Timer, servlet/JSP,RMI,swing/AWT算法
最好将一个有关联的同步操做放在同一个线程安全类中,由一个类提供入口,在类中作好同步措施,客户端调用类时不须要再考虑同步问题,好比concurrent包下的类数据库
当计算的正确性取决于多个线程的交替执行顺序时,就会发生竞态条件
常见的竞态条件是“先检查后执行”(check then act),检查和执行的间隙被其余线程隔断,发生错误
应尽可能使用线程安全的类来管理类的状态,如原子类(经过CAS方式实现,CAS算法有ABA问题)
当状态变量有多个且相互关联时,单纯的原子类已经不够用了,应使用同步代码管理,此时能够不用原子变量了编程
每一个java对象能够用做同步的锁,称为内置锁
内置锁是可重入的,所以,若是某线程试图获取一个已经由它本身持有的锁,那这个请求会成功.重入意味着获取锁操做的粒度是线程,而不是调用windows
注意, synchronized实例方法锁住的都是调用者实例数组
class Widget {
public synchronized void doSomething() {
}
}
class LoggingWidget extends Widget {
/* * 实例方法上的synchronized锁住的都是调用实例 * 这里确定是用LoggingWidget实例去调用,锁住LoggingWidget实例 */
public synchronized void doSomething() {
//这里依旧是LoggingWidget实例去调用父类的synchronized方法
//锁住的依然是调用者LoggingWidget实例
super.doSomething();
}
}
复制代码
每一个锁都关联一个请求计数器和一个占有他的线程,当请求计数器为0时,这个锁能够被认为是unhled的,当一个线程请求一个unheld的锁时,JVM记录锁的拥有者,并把锁的请求计数加1,若是同一个线程再次请求这个锁时,请求计数器就会增长,当该线程退出syncronized块时,计数器减1,当计数器为0时,锁被释放(这就保证了锁是可重入的,不会发生死锁的状况)。缓存
线程执行互斥代码的过程安全
Lock -> 主内存 -> 工做内存 -> 主内存 -> unlock
bash
对于可能被多个线程同时访问的可变状态变量,在访问时须要持有同一个锁,状态变量由这个锁保护
若原子变量的操做已经在同步代码块内,则可放弃使用原子变量,普通变量更好——不一样的同步机制容易形成混乱,一个同步机制已经足够时,就不要加入其它同步机制,多余
某个线程在得到对象的锁以后,只能阻止其余线程得到同一个锁。之因此每一个对象都有一个内置锁,只是为了免去显示建立锁对象。
Synchronized修饰实例方法,得到的就是实例锁(对象锁),修饰静态方法,就得到类锁,代码块同理
锁分为对象锁和类锁
Java内存模型要求,变量读写必须是原子操做,当无线程同步时,读取到的变量值必为某个线程设置的值,而不是一个随机值,注意这里读写的概念,这是jvm中的读写,并非代码中的读写
但对于非volatile类型的long和double变量,jvm容许将64位的读/写操做分解为两个32位的操做,这就有可能形成高32位和低32位不是原组合的问题
解决方法:用volatile修饰或者用锁保护起来
volatile修饰的变量操做不会与其余内存操做一块儿重排序,volatile变量不会被缓存在寄存器或者其余处理器不可见的地方(直接读写主内存上的值,无副本),所以在读取volatile类型的变量时总会返回最新写入的值
Example
volatile boolean asleep;
…
while(asleep)
countSomeSleep()
复制代码
若不使用volatile,可能当asleep被一个线程修改时,执行判断的线程修改不了,此时用volatile比锁机制简单方便
可是,volatile只保证可见性,而不保证原子性,如volatile不保证count++的原子性(count++比存在读取和写入两步),但锁机制能够
可使用volatile类型来发布不可变对象P40
对于服务器应用程序,在开发和测试阶段,启动jvm时都要指定-server命令行选项,server模式的jvm将比client模式的jvm进行更多的优化,例如将循环中未被修改的变量提高到循环外部,可能致使开发环境(client模式的jvm)正常的代码到部署环境(server模式的jvm)发生异常
在volatile示例代码中,若asleep未声明为volatile类型,那么server模式的jvm会将asleep的判断条件提高到循环体外部,而client模式的jvm不会这么作
当在对象构造完成以前发布该对象到其余线程,就会破坏线程安全性,特别的,当从对象的构造函数中发布对象时,只是发布了一个还没有构造完成的对象(即this未初始化完成,但你this却能够被构造函数中新发布(实例化)的对象引用)
不要在构造过程当中使用this引用逸出
Example
//Wrong
public class ThisEscape {
public ThisEscape(EventSource source) {
source.registerListener(new EventListener() {
public void onEvent(Event e) {
doSomething(e);
}
});
}
//Correct
public class SafeListener {
private final EventListener listener;
private SafeListener() {
listener = new EventListener() {
public void onEvent(Event e) {
doSomething(e);
}
};
}
public static SafeListener newInstance(EventSource source) {
SafeListener safe = new SafeListener(); //先构造完成
source.registerListener(safe.listener); //再发布
return safe;
}
复制代码
指线程封闭性的职责彻底由程序实现来承担,不共享数据,仅在单线程内访问数据,将对象封闭到目标线程上;因其脆弱性,应该尽可能少用它,应使用更强的线程封闭技术,如栈封闭或threadlocal类
变量只存在于执行线程栈中,只在线程内部使用
若是在线程内部上下文中使用非线程安全的对象,那么该对象仍然是线程安全的
类能够将ThreadLoad<T>
视为Map<Thread ,T>
Threadlocad提供了set get方法,这些方法为使用该变量的线程都保存一份独立的副本,所以get老是返回当前执行线程在调用set时设置的最新值
最好不要放在线程池中,避免复用
public class Holder {
// private int n;
private final int n;
public Holder(int n) {
this.n = n;
}
public void assertSanity() {
if (n != n)
throw new AssertionError("This statement is false.");
}
}
复制代码
设为final就能够保证正确地构造对象,就是线程安全的了
1. 在静态初始化函数中初始化一个对象引用:单例饿汉模式
2. 将对象的引用保存到volatile类型的域或者AtomicReferance对象中:原子类
3. 将对象引用保存到某个正确构造对象的final域中:不可变
4. 将对象引用保存到一个由锁保护的域中:锁
能够将对象放入到线程安全的容器中:
复制代码
Hashtable synchronizedMap concurrentMap vector copyOnWriterArrayList copyOnWriterSet synchronizedList synchronizedSet blockingQueue concurrentLinkedQueue
静态初始化器由jvm在类的初始化阶段执行,因为在jvm内部存在着同步机制,故对象能够被安全地发布
public static Hodlder holder = new Hodeler(42);
不可变对象能够经过任意机制发布
事实不可变对象必须经过安全方式发布
可变对象必须经过安全方式发布,而且必须是线程安全的或由锁保护起来
一些java基础同步类并非线程安全的,但能够经过包装器工厂方法collections.synchronizedList()
,将容器类封装在一个同步的包装器对象中
把对象的全部可变状态都封装起来,并用对象本身的内置锁来保护
如vector和hashtable
使用对象私有的锁(private)可能更有优势
同时,在获取被保护的对象时,能够返回复制对象,修改对象时经过保护对象共有方法修改便可(不是直接修改返回的复制对象)
copyonwrite是修改返回的集合,而后修改引用
若是一个类是由多个独立且线程安全的状态变量组成,而且在全部的操做中都不包括无效状态转换,则可将线程安全性委托给底层的状态变量
线程安全能够将状态变量赋予线程安全的类来管理,好比线程安全容器,不可变容器,原子类等
涉及线程安全的变量,尽可能设为final类型
返回引用时,特别须要注意是否会形成逸出,能够返回复制对象,或者不可变对象(对象自己不可变(注意是否能够修改引用),不可变容器,同步容器)
须要同步的对象能够放到客户端中同步,须要注意同步时加锁同一个锁
如vector为同步类,其内部方法操做是同步的,但涉及几个操做按序同步执行时,能够在客户端加锁实现,此时,所加的锁应与vector对象本来的锁一致,即vector对象自身 synchronized(vector){ … }
同步容器
Vector、Hashtable、同步封装类,能够由Collections.synchronizedXxxx
等方法建立
同步容器类虽然都是线程安全的,可是在某些状况下(复合操做),仍然须要加锁来保护;
同步容器对全部容器状态的访问都串行化,严重下降了并发性;当多个线程竞争锁时,吞吐量严重降低;
并发容器
java5.0以后提供了多种并发容器来改善同步容器的性能,如ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentSkipListMap等
以ConcurrentHashMap为例
采用分离锁技术,同步容器中,是一个容器一个锁,但在ConcurrentHashMap中,会将hash表的数组部分分红若干段,每段维护一个锁,以达到高效的并发访问;
迭代器弱一致性,迭代期间不会抛出ConcurrentModificationException异常;
size()
、isEmpty()
等方法返回的是一个近似值;
如size操做,就保存了一个last值用于记录上次循环时统计的总数,只有先后两次统计值相等时才会返回
增长了若干原子操做方法,如putIfAbsent(没有该key,则添加)
注意,此时不能再经过客户端加锁新建新的原子操做了,客户端只能对并发容器自身加锁,但并发容器内部使用的并非自身锁
写入时复制容器:Copyonwrite,在每次修改时都会加锁并建立并从新发布一个新的容器副本,直接修改容器引用,从而实现可见性,但在读取时不加锁,直接读取原值,致使的问题就是写入时虽然加锁,但仍能够读取,可能读到失效值.其访问和写入的操做最终必定会经过对应的final方法,好比setArray()
,getArray()
读多写少时使用Copyonwrite
总结
只有在应用程序须要对容器加锁进行独占式访问时,才用同步容器,不然使用非并发容器以保证更优性能
但在代码中调用一个能够抛出InterruptedException的方法时,本身的方法就变成了一个阻塞方法,而且必须处理中断的响应
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
}
}
复制代码
同步工具类能够根据自身状态来协调线程的控制流
put()
方法为例)lock = new ReentrantLock(fair);
Condition notEmpty = lock.newCondition();
Condition notFull = lock.newCondition();
复制代码
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
复制代码
注意,一定要有循环,当被唤醒时,须要回到循环中再次作判断是否符合条件
//提供统一入口&出口
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
// 全部线程等在这里,直到计数为0,即调用了startGate.countDown();
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
// 全部线程等在这里,直到计数为0,即调用了endGate.countDown() nThreads次;
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
复制代码
FutureTask:可生成结果的runnable,包括3种状态:等待运行、正在运行和运行完成。若任务已经完成,则future.get()
会当即返回结果,不然阻塞直至完成状态.一旦完成就永远中止 FutureTask使用场景:用ConcurrentMap <A, Future<V>>
缓存计算,vaule值是Future,P89
// 使用FutureTask来提早加载稍后须要的数据
public class Preloader {
ProductInfo loadProductInfo() {
return null; // 这里执行复杂计算or等待
}
private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {
public ProductInfo call() throws InterruptedException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start() {
thread.start();
}
public ProductInfo get() throws InterruptedException {
try {
return future.get(); // 阻塞直到有结果
} catch (ExecutionException e) {
throw e;
}
}
interface ProductInfo {
}
}
复制代码
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
复制代码
用于等待其余线程,全部线程必须同时到达栅栏位置,才能继续执行
CyclicBarier可使必定数量的参与方反复地在栅栏位置聚集,在并行迭代算法中很是有用
Exchanger 是一种两方栅栏,各方在栅栏位置上交换数据,用于双方执行不对称操做
private final CyclicBarrier barrier;
//barrier.await()调用了count次就执行内部线程mainBoard.commitNewValues()方法
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
public void run() {
while (!board.hasConverged()) {
//当循环走完,即表示计算完成
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
复制代码
P84线程CPU数与吞吐量
在不涉及I/O操做或共享数据访问时,当线程数量为cpu数或CPU数+1 时,将得到最优的吞吐量。一个进程下的线程是如此,若是有多个进程呢?进程间的时间分配?
提供了一种标准方法,将任务的提交过程和执行过程解耦,还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制
名称 | 线程数 | 异常 | 特性 | 实现队列 |
---|---|---|---|---|
newfixedThreadPool | 固定 | 出现异常而结束则补充一个线程 | 逐步增长,直到最大 | LinkedBlockingQueue |
newCachedThreadPool | 最大Interger. MAX_VALUE | 可缓存线程池,线程池规模多于当前需求,则回收空闲线程,线程池可无限扩大 | SynchronousQueue | |
newSingleThreadExecutor | 1 | 出现异常而结束则另起一个线程 | 单线程按优先级等顺序执行 | LinkedBlockingQueue |
newScheduledThreadPool | 固定 | 以延迟或定时的方式执行 | RunnableScheduledFuture[] 数组 |
ExecutorService exec = Executors.newSingleThreadExecutor();
三种状态:运行、关闭和已终止
四个生命周期阶段:建立,提交,开始和完成
已提交但还没有开始的任务能够取消,而已开始执行的任务,只有当它们能响应中断时才能取消
JVM只有在全部非守护线程所有终止后才会退出,若是没法正确关闭executor,那么jvm则没法结束
而关闭时有两种方式
1. 平缓关闭shutdown:中止接受新任务,执行完全部正在执行和在等待队列中的任务
2. 强制关闭shutdownNow:取消全部运行中的任务,再也不启动等待队列中的任务,返回全部已提交但未开始的任务,能够将任务记入日志etc
java.util.Timer类在执行全部定时任务时只会建立一个线程,若某个任务执行时间过长,那么将破坏其余TimerTask的定时精确性
Timer若抛出异常,则会取消全部timer类下的定时线程,不会恢复执行
ExecutorService中的全部submit方法都将放回一个future,可得到任务结果
Future有cancle方法,能够取消任务
CompletionService:将executor和blockingqueue的功能融合在一块儿,将callable任务交给提交给他来执行,而后使用相似于队列操做的take和poll等方法得到future,再future.get()
返回结果.这里是能够应付一组计算结果,一旦有返回就能够得到
如ExecutorCompletionService实际上就是将计算完成后的结果放在blockingqueue中
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
复制代码
Future为一个任务设计时限:时限内有结果,get当即返回,超过期限抛出TimeOutException Future.get(long,timeType)
提交一组任务
InvokeAll:将多个任务提交到一个ExecutorService并得到结果,InvokeAll按照任务集合中迭代器的顺序添加到返回集合,由此可关联各个future与callable
当任务执行完成/调用者线程中断/超时,invokeAll将返回,能够经过get或者isCancle判断是何种状况
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
for (Future<TravelQuote> f : futures) {
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(...); //按序放回关联,须要放入对象
} catch (CancellationException e) {
quotes.add(...); //按序放回关联,须要放入对象
}
}
复制代码
class QuoteTask implements Callable<TravelQuote> {
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
复制代码
Java没有提供任何机制来安全地终止线程,而是提供了中断(interrupion),能使一个线程终止另外一个线程的当前工做
Callable认为主入口点将返回一个值,并可能抛出一个异常
无返回值,可以使用Callable
调用interrupt并不意味着当即中止目标线程正在进行的工做,而只是传递了请求中断的消息,而后由线程在下一个合适的时刻中断本身
一般,中断是实现取消最合理的方式,而不是设置标志位:若使用标志位,I/O阻塞就会一直卡住,中断请求只能设置线程的中断状态,同样也卡住,只能关闭I/O接口
得到中断状态,并清除当前线程的中断状态.
在调用interrupted时返回了true,则会清除线程中断状态,下次再调用interrupted时就已经不是中断状态了,故须要对中断作处理—抛出interruptException或者再次调用interrupt恢复中断状态:Thread.currentThread().interrupt();
最合理的取消操做是某种形式的线程级取消操做或服务级取消操做:尽快退出,在必要时进行清理,通知某个全部者线程已经退出
线程应该只能由其全部者中断,全部者能够将线程的中断策略信息封装到某个合适的取消机制中,例如shutdown方法
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
复制代码
当尝试取消某个任务时,不宜直接中断线程池,只能经过任务的future来实现取消Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
} finally {
task.cancel(true);
}
复制代码
在如socket I/O或者等待得到内置锁而阻塞时,那么中断请求只能设置线程的中断状态,除此以外并没有多大做用。此时应该中断底层的阻塞操做,抛出异常,以此响应中断
Example:
Socket读取阻塞
改写thread的中断方法
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
复制代码
能够中断线程,也能够取消底层阻塞方法
注意,在取消生产者-消费者操做时,须要同时取消生产者和消费者
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
//自定义的取消方法
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
//先调用自身取消方法
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
//新增两个方法
interface CancellableTask <T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask(); //返回扩展对象
else
return super.newTaskFor(callable);
}
}
复制代码
能够设置一个Boolean flag标识是否取消。同时设置一个计数器统计当前任务队列中任务数量,关闭时设置flag,中断线程,而底层的生产者方法就判断flag是否已关闭,抛出异常,消费者则只在flag和计数器值为0时取消,不然一直处理任务队列,直到完成全部任务。
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
//要把对象消费完
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
复制代码
往任务队列中添加约定的对象,消费者每次都查看对象,判断是否退出
也能够再作一个统计,达到数量才退出,这样就能够确保取消多个线程
已知生产者消费者时才有用,要确认生产的毒丸对象数量
注意:只有在无界队列中,毒丸对象才能可靠地工做
public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void start() {
producer.start();
consumer.start();
}
public void stop() { //中断机制
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
复制代码
消费者
class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}
复制代码
生产者
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* 被打断就放入毒丸对象 */
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
复制代码
平缓关闭shutdown:中止接受新任务,执行完全部正在执行和在等待队列中的任务
public void stop() {
try {
exec.shutdown();
exec.awaitTermination(3000, TimeUnit);//等待执行完成,这里不是冗余吗?
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
...
}
}
复制代码
致使线程提早死亡的最主要缘由就是runtimeException,在线程代码中可使用try-catch代码块捕获异常并进行处理
未捕获异常
UncaughtExceptionHandler,Thread API中提供的处理异常类,能检测出某个线程因为未捕获的异常而终结的状况,至少将异常信息打印到日志表中。须要为ThreadPoolExecutor的构造函数提供一个ThreadFactory
public class MyAppThread extends Thread {
public MyAppThread(Runnable runnable, String name) {
super(runnable, name);
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
});
}
}
复制代码
只有经过execute提交的任务,才能将它抛出的异常交给未捕获异常处理器,而经过submit提交的任务,不管是抛出的未检查异常仍是已检查异常,都将被认为是任务返回状态的一部分.
若一个由submit提交的任务因为抛出了异常而结束,那么这个异常将被future.get封装在ExecutionException中从新抛出
若但愿在任务中因为发生异常而失败时得到通知,而且执行一些特定于任务的恢复操做,那么能够将任务封装在能捕获异常的runnable或callable中,或者改写ThreadPoolExecutor的afterExecute方法
Shutdown hook:正常关闭jvm时,jvm首先调用全部已经注册好的关闭钩子,指经过Runtime.addShutdownHook()
注册但未开始的线程
Jvm并不会中止或者中断任何在关闭时仍然运行的应用程序线程,当jvm最终结束时,守护线程将被强行结束
Runtime.getRuntime().addShutdownHook(new Thread(){...});
普通线程:主线程建立的全部线程都是普通线程,普通线程继承了建立它的线程的守护状态
守护线程:非普通线程,当一个线程退出时,jvm会检查正在运行的线程,若都是守护线程,则jvm退出,当jvm中止时,全部的守护线程将被抛弃,直接退出
守护线程最好执行“内部任务”
只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用threadlocal才有意义,而在线程池的线程中不该该使用threadlocal在任务之间传递值
只要线程池中的任务须要无限期地等待一些必须由池中其余任务才能提供的资源或条件,除非线程池足够大,不然将发生线程饥饿死锁
每当提交一个有依赖性的executor任务时,须要知道可能会出现线程饥饿死锁,故而须要在代码或配置executor的配置文件中记录线程池的大小限制或配置限制
只有当任务相互独立时,为线程池工做队列设置界限才是合理的,若是任务之间存在依赖性,那么有界的线程池或队列就可能致使线程”饥饿死锁”问题,此时应该使用无界的线程池,例如newCachePool
可阻塞方法大都定义了限时版本和不限时版本,如Thread.join
, blockingQueue.put
, countDownLatch.await
, selector.select
等。若等待超时,能够把任务标识为失败,而后终止任务或者将任务从新放回队列以便随后执行
线程池大小不该该固定,应该经过配置机制提供,或者根据Runtime.getRuntime().availableProcessors()
来动态计算
若是须要执行不一样类别的任务,而且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每一个线程池能够根据各自的工做负载来调整
计算密集型:线程池大小为CPU数+1
I/O操做或其余阻塞操做:线程并不会一直执行,规模应该更大,须要估算任务的等待时间与计算时间的比值
N_cpu=number of CPUs
U_cpu=指望CPU利用率,0≤U_cpu≤1
W/C=等待时间/计算时间
复制代码
要使处理器达到指望的使用率,线程池的最优大小等于:
N_threads =N_cpu*U_cpu*(1+W/C)
复制代码
能够经过runtime来得到CPU数目 int cpu = Runtime.getRuntime().availableProcessors();
threadPoolExecutor容许提供一个BlockingQueue来保存等待执行的任务,基本的任务排队方法有3种:无界队列,有界队列和同步移交
工厂方法newFixedPoolExecutor , newSingleThreadExecutor在默认状况下使用一个无界的linkedBlockedQueue
ArrayBlockingQueue,有界的linkedBlockingQueue、PriorityBlockingQueue,有界队列有助于避免资源耗尽的状况发生
对于很是大或者无界的线程池,能够经过synchronousQueue来避免任务排队,以及直接将任务从生产者交给工做者线程
若没有线程正在等待接受任务,而且线程池未满,则新建立线程接受任务,不然根据饱和策略,这个任务将被拒绝
NewCachedThreadPool工厂方法使用了synchronousQueue
只有当线程池是无界的或者能够拒绝任务时, synchronousQueue才有价值
当有界队列被填满后,或者Executor已关闭,饱和策略开始发挥做用,ThreadPoolExextor的饱和策略能够经过调用setRejectedExcutionHandler修改
不一样的RejectedExcutionHandler实现
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
复制代码
能够控制对某些特殊代码库的访问权限,所建立的线程将与建立PrivilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader
若不使用PrivilegedThreadFactory,新建的线程将从调用exectute或submit的客户程序中继承访问权限
提供子类改写方法beforeExecute, afterExecute, terminated
Run方法返回或抛出异常,afterExecute都会执行,如有error,则不会执行afterExecute
若beforeExecute抛出RuntimeException异常,则任务不被执行,且afterExecute也不会被调用
线程须要以固定一致的顺序获取锁
须要注意的是,虽然对象引用顺序是固定的,但在两次加锁时其实际对象是交换的,这实际上就不是固定顺序加锁,容易致使死锁
加锁时能够以惟一,不可变的值做为加锁的排序依据,好比帐号,id等
在制定锁的顺序时,可使用system.identityHashCode()
获取对象hashcode值,以hashcode值为顺序加锁,又对象可能有相同hashcode值,那么可使用加时赛锁,即当判断hashcode值同样时,就对加时赛锁上锁,而后再以一个固定顺序上锁
若是在持有锁的状况下调用某个外部方法,要检查被调用外部方法是否也有同步操做,避免出现死锁问题
开放调用:在调用某个方法时不须要持有锁
在程序中应该尽可能使用开放调用,更加容易进行死锁分析
死锁
//Class A
public synchronized void setLocation(Point location) {
...
if (location.equals(destination))
dispatcher.notifyAvailable(this); //方法调用也加锁
}
//class B
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
复制代码
开放调用
public void setLocation(Point location) {
boolean reachedDestination; //新增中间变量
synchronized (this) {
…
reachedDestination = location.equals(destination);
}
if (reachedDestination)
dispatcher.notifyAvailable(this);
}
复制代码
能够指定一个超时时限,在超时后会返回一个失败信息
Jvm经过线程转储帮助识别死锁的发生,线程转储包括各个运行中的线程的栈追踪信息,加锁信息(每一个线程有哪些锁,那些帧栈得到这些锁,被阻塞的线程正在等待获取哪个锁) 线程转储前,jvm将在等待关系图中经过搜索循环来找出死锁,若发现死锁,则获取相应死锁信息
线程因为没法访问它所须要的资源而不能继续执行时,就发生了饥饿,引起饥饿最多见的资源就是CPU时钟周期
更改线程优先级且使用不当,或者在持有锁时执行一些没法结束的结构(无限循环,无线等待etc)
要避免使用线程优先级,由于这会增长平台依赖性,并可能致使活跃性危险
线程不断重复执行相同操做,但老是失败。如在事务消息处理中,若不能成功地处理某个消息,那么消息处理机制将回滚整个事务,并将它从新放到队列的开头
当多个相互协做的线程都对彼此进行响应从而修改各自的状态,并让线程没法继续执行,如行人同时互相让路,此时引入随机数,能够避免问题
避免不成熟的优化,首先使程序正确,而后再提升运行速度——若是运行不够快
在增长计算资源的状况下,程序在理论上可以实现的最高加速比,取决于程序中可并行组件与串行组件的比重
F: 串行执行比率
N:处理器个数
Speedup≤1/(F+(1-F)/N)
复制代码
线程调度过程当中须要访问由操做系统和jvm共享的数据结构,其开销包括jvm和操做系统代码执行开销,同时,因为新线程切换进来时,它所须要的数据结构可能不在当前处理器本地缓存中,故还有缓存切换开销
jvm能够将阻塞的线程挂起并容许它被交换出去,当线程频发发生阻塞,则CPU密集型的程序就会发生越多的上下文切换,从而增长调度开销,下降吞吐量
在大多数通用的处理器中,上下文切换的开销至关于5000~10000个时钟周期,几微秒
Unix系统的vmstat命令和windows perform工具可以报告上下文切换次数及在内核中执行时间所占比例等信息。若内核占用率超过10%,那么一般表示调度活动频繁,多是由I/O或者竞争锁致使的阻塞形成的。
Jvm在实现阻塞行为时,能够
其效率取决于上下文切换的开销以及在成功得到锁以前须要等待的时间。 等待时间短,则选择自旋等待,等待时间长,则选择线程挂起
阻塞时,有两次上下文切换,包括两次必要的系统操做和缓存操做:
在ConcurrentHashMap中的size函数,并非直接返回一个储存在map中的全局计数值,由于这会致使这个值成为热点值(每次增删操做都会修改,即便不是同一线程,会致使锁竞争),而是每一个分段独立维护各自分段的计数值,计算map size值是直接枚举分段计数值相加便可
使用并发容器,读写锁,不可变对象、原子变量
工具命令UNIX vmstat/mpstat, windows perfmom
CPU没有被充分利用缘由
线程从线程池中请求对象时若被阻塞,其阻塞开销将是内存分配操做(新建对象)的数百倍
另外,须要确保从新使用对象时要将对象重置到正确状态
可使用中断来解除阻塞,在主线程中启动含有阻塞操做的测试线程,此时测试线程阻塞中,在主线程中中断测试线程,测试线程抛出InterruptException,测试线程执行join操做,确保测试线程走完,而后当测试线程.isAlive()==false则表示阻塞测试成功
使用Thread.getState来验证线程可否在一个条件等待上阻塞,这并不可靠
测试因为数据竞争而引起的错误,须要多个线程分别执行put和take操做
关键问题是:找出容易检查的属性,且这些属性在发生错误的状况下极有可能失败,同时又不能使得错误检查代码人为地限制并发性
可使用校验和计算函数来计算入列和出列的元素校验和,若是两者相等,代码正确。须要考虑是否顺序敏感
测试时,可使用CyclicBarrier或者CountDownLatch来统一运行多线程测试程序,同时执行到同一位置,避免建立线程时致使的不一样步问题
当抛出异常,或者无限循环时,测试可能永远不会结束,此时测试程序能够设置最大等待时间,过期不执行,后期再排查问题
测试线程数量应该多于CPU数量,则任意时刻都有线程在运行和被交换出去,增长交替行为
不须要对象时,销毁对象引用
Thread.yield()
或 Thread.sleep()
. (sleep 会好一些) 使用AOP提升方便性–verbose:gc
当某个类第一次被加载时,JVM经过解释字节码的方式执行,而热点代码在运行中可能会被动态编器编译成机器代码,则代码将热点代码变为直接执行;代码也可能被退回解释执行,从新编译
-XX:+PrintCompilation
,当动态编译时会输出信息,验证动态编译是在测试运行前动态编译器可能针对一个单线程测试程序进行一些专门优化,但只要在真实的应用程序中包含一些并行,都会使这些优化不复存在——将单线程性能测试与多线程性能测试结合在一块儿
HotSpot中,-server
模式比-client
模式更好,-server
模式编译器能产生更有效的代码,并且这种模式更易于经过优化消除无用代码
System.nanoTime
的当前值,若相等,则输出无用且可被忽略的消息if ( f.x.hashCode() == System.nanoTime() ) {
System.out.println(" ");
}
复制代码
若是在构造函数中启动一个线程,那么将可能带来子类化问题,同时还会致使this引用从构造函数中逸出
当在一个条件队列上等待时,object.wait
和condition.await
方法应该在检查了状态谓词以后,在某个循环之中调用,同时须要持有正确的锁,若是在调用object.wait
和 condition.await
方法时没有持有锁,或者不在某个循环中,或者没有检查某些状态谓词,那么一般都是一个错误
若是在调用thread.sleep时持有一个锁,那么将致使其余线程在很长一段时间内没法执行,所以可能致使严重的活跃性问题.若是在调用object.wait
或condition.await
时持有两个锁,那么也可能致使一样的问题
必须在finally块中释放锁unlock
在synchronized内置锁中,出现死锁时,恢复程序的惟一方式是重启程序,而防止死锁的惟一方式是在构造程序时避免出现不一致的锁顺序
特性:可定时,可轮询,可中断的锁获取操做,公平队列, 非块结构 reentrantLock.lockInterruptibly();
可中断的锁获取操做
提供另外一种选择来避免死锁的发生
若是不能获取锁,会释放已经得到的锁,而后从新尝试获取全部锁
定时锁能根据剩余时间来提供一个时限,若是操做不能在指定时限内完成,则程序提早结束
竞争激烈时,非公平锁的性能高于公平锁的性能的一个缘由是:在恢复一个被挂起的线程与该线程真正开始运行以前存在着严重的延迟
当持有锁的时间较长,或者请求锁的平均时间间隔较长,应该使用公平锁
仅当内置锁没法知足需求的状况下,才使用ReentrantLock
使用ReentrantLock场景:可定时的,可轮询的与可中断的锁获取操做,公平队列,以及非块结构的锁
读写锁能提升读取多处理器系统上的数据结构的速度,而在其余状况下,读写锁的性能比较差
当锁由读线程持有,而由写线程请求锁时,其余读线程只能等到写线程使用完并释放了写入锁后才能持有读取锁
写线程拥有更高的优先级,写线程能够降级为读线程,而读线程不能升级为写线程,不然容易致使死锁:若是两个读线程试图同时升级为写入锁,那么两者都不会释放读取锁
某些操做是基于状态的,如不能从空队列删除元素,要获取还没有结束的任务的计算结果,必须等到队列进入“非空”状态或者任务进入已完成状态
依赖状态的操做能够一直阻塞直到能够继续执行,能够经过轮询(循环)与休眠来实现简单的阻塞,其思路是使用循环方式,重试直到成功,这并非一种好的实现
使用基于 LinkedBlockingQueue latch Semaphore FutureTask的条件队列
循环判断:
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
doPut(v);
notifyAll();
}
复制代码
每当在等待一个条件时,必定要确保在条件谓词变成真时经过某种方式发出通知
如条件谓词 中,每当put一个元素后,都执行notifyAll(放后,尽快退出, notify和notifyAll方法都不释放锁,只是通知wait状态的线程准备获取锁),通知唤醒在take上等待的线程
注意是唤醒哪一个锁上的对象
对于状态依赖的类,要么将其等待和通知等协议彻底向子类公开,而且写入正式文档,要么彻底阻止子类参与到等待和通知等过程当中
用于描述wait和notify方法的正确使用
对于每一个依赖状态依赖的操做,与每一个修改其余操做依赖状态的操做,都应该定义一个入口协议和出口协议
入口协议即操做的条件谓词,出口协议则包括检查被该操做修改的全部状态变量,并确认他们是否使某个其余的条件谓词为真,如果,则通知相关的条件队列
private final Condition notEmpty = lock.newCondition();
复制代码
AbstractQueuedSynchronizer(AQS)是一个用于构建锁和同步器的框架,许多同步器能够经过AQS很容易并高效地构造出来,如ReentrantLock,Semaphore,FutureTask, CountDownLatch
CAS包含3个操做数——须要读写的内存位置V,进行比较的值A和拟写入的新值B
当且仅当V的值等于A时,CAS才会经过原子方式用新值B来更新V的值,不然不执行任何操做
CAS是一项乐观的技术,它但愿能成功地执行更新操做,而且若是有另外一个线程在最近一次检查后更新了该变量,那么CAS能检测到这个错误,在本次更新时不执行更新操做
因为CAS能检测到来自其余线程的干扰,所以即便不用锁也能实现原子的读-改-写操做序列
经验法则:在大多数处理器上,在无竞争的锁获取和释放的“快捷代码路径”上的开销,大约是CAS开销的两倍
12个原子变量类,分红4组
非阻塞算法:一个线程的失败或挂起不会致使其余线程也失败或挂起
无锁算法:在算法的每一个步骤中都存在某个线程可以执行下去
构建非阻塞算法的技巧在于:将执行原子修改的范围缩小到单个变量上
在算法执行中,值改变后又改变回原来的值,在CAS判断时就有误判
解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号
若是两个操做之间缺少Happens-before关系,那么JVM能够任意地重排序
volatile变量规则:对volatile变量的写入操做必须在对该变量的读操做以前执行
线程启动规则:在线程上对Thread.start()的调用必须在该线程中执行任何操做以前执行
传递性:A在B前完成,B在C前完成,则A在C前完成
当缺乏Happens-before关系时,就可能出现重排序问题,因此才会出如今没有充分同步的状况下发布一个对象会致使另外一个线程看到一个只被部分构造的对象
同步集合类,Hashtable 和 Vector 还有同步集合包装类,Collections.synchronizedMap()
和Collections.synchronizedList()
提供了一个基本的有条件的线程安全的Map和List的实现。
并发集合像ConcurrentHashMap,不只提供线程安全还用锁分离和内部分区等现代技术提升了可扩展性
死锁的发生必须知足如下四个条件:
volatile变量能够确保先行关系,即写操做会发生在后续的读操做以前, 但它并不能保证原子性。例如用volatile修饰count变量那么 count++
操做就不是原子性的
AtomicInteger类提供的atomic方法可让这种操做具备原子性
没有找到好的解释
是对全部的操做都封装了同步方法
public int size() {
synchronized (mutex) {return m.size();}
}
复制代码