####一、线程池的数量 Thread_num=CPU_num * U * (1 + W/C)java
U: 目标CPU使用率 [0,1] W: wait time C: compute time数组
####二、任务独立时,设置线程池的工做队列界限才合理,若是任务之间存在依赖性,则可能致使线程“饥饿死锁”,应使用无界线程池,如newCachedThreadPool缓存
####三、单线程的Executor可能发生死锁,newSingleThreadExecutor 对象同时执行父任务与子任务安全
/** * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/6 * Time: 15:43 */ public class SingleThreadExecutorDeadLock { ExecutorService exec = Executors.newSingleThreadExecutor() ; class MyCall implements Callable<String> { @Override public String call () throws Exception { Future<String> f1 = exec.submit( new MyThread()) ; Future<String> f2 = exec.submit(new MyThread()) ; System. out.println(" 任务提交结束,等待两个子任务返回 "); // 主线程在单线程线程池中,因此 f1与f2 均在等待队列中,永远没法执行。 return f1.get() + f2.get(); } } class MyThread implements Callable<String> { @Override public String call () throws Exception { System.out.println( "子任务结束") ; return "RES"; } } public static void main(String[] args) throws Exception { SingleThreadExecutorDeadLock lock = new SingleThreadExecutorDeadLock() ; Future<String> f3 = lock.exec .submit(lock.new MyCall()) ; try { System.out.println(f3.get()) ; } finally { lock.exec.shutdown() ; } } }
####四、一种带有缓存的计算工具,线程安全的并发
/** * 一种带有缓存的计算工具 * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/1 * Time: 14:29 */ public class CachedCompute { interface Computable<A, V> { V compute(A args) throws InterruptedException , ExecutionException; } class CacheComputer<A, V> implements Computable< A, V > { // 缓存 private final Map<A, Future<V>> cache = new ConcurrentHashMap<>() ; private Computable< A, V > computer; public CacheComputer(Computable< A, V > c) { this .computer = c ; } @Override public V compute (A args) throws InterruptedException , ExecutionException { Future<V> f = cache.get(args); if (null == f) { Callable<V> callable = new Callable< V>() { @Override public V call() throws Exception { return computer .compute(args) ; } }; FutureTask<V > ft = new FutureTask< V>(callable); f = cache .putIfAbsent(args, ft) ; if (null == f) { System. out.println(" 缓存放置成功 "); f = ft; ft.run(); } } try { return f.get() ; } catch (CancellationException e) { cache.remove(args , f); } catch (ExecutionException e) { throw e ; } return null; } } CacheComputer cache = new CacheComputer<String , String>( new Computable<String, String>() { @Override public String compute (String args) throws InterruptedException , ExecutionException { return "计算结果"; } }); public static void main (String[] args) throws Throwable { CachedCompute compute = new CachedCompute(); System. out.println(compute.cache .compute("key")) ; } }
####五、一种This指针逃逸ide
/** * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/5/11 * Time: 17:08 */ public class ThisEscape { private String name; public ThisEscape() throws Throwable{ new Thread(new EscapeRunnable()).start() ; Thread. sleep( 1000); name ="123"; } private class EscapeRunnable implements Runnable { @Override public void run() { // 经过ThisEscape.this就能够引用外围类对象 , 可是此时外围类对象可能尚未构造完成 , 即发生了外围类的this引用的逃逸,构造函数未完成以前不该该暴露this指针 System. out.println(ThisEscape.this. name); //可能会出现使人疑惑的错误 name=null } } public static void main(String[] args) throws Throwable{ new ThisEscape(); } }
/** * 使用 semaphore控制任务的提交速度 * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/7 * Time: 10:24 */ public class BoundedExecutor { private final Executor executor; private final Semaphore semaphore; public BoundedExecutor(Executor exe , int bound) { this .executor = exe ; this. semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException { semaphore .acquire(); System. out.println(" 信号量获取成功,当前剩余数: " + semaphore .availablePermits()) ; try { executor .execute(new Runnable() { @Override public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore .release(); } } public static void main(String[] args) throws Exception { ExecutorService es=Executors.newCachedThreadPool() ; BoundedExecutor exe = new BoundedExecutor(es , 100) ; for ( int i = 0 ; i < 50 ; i++) { exe.submitTask(new Runnable() { @Override public void run() { try { System. out.println(" 任务执行 "); Thread.sleep(1000) ; } catch (Throwable e) { } } }); } System.out.println( "提交50 个任务结束 "); es.shutdown() ; } }
####七、修改标准工厂建立的executor函数
/** * 强制类型转换从新设置 Executor的线程池参数 * newSingleThreadExecutor 除外,不是线程工厂直接建立,而是经过包装类 * public static ExecutorService newSingleThreadExecutor() { * return new FinalizableDelegatedExecutorService * (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, * new LinkedBlockingQueue<Runnable>())); * } * Created with IntelliJ IDEA. * User: pingansheng * Date: 2016/6/7 * Time: 10:24 */ public class ExecutorForceSet { private static final ExecutorService executor = Executors.newFixedThreadPool( 1); // 使用此方法包装后能够避免被修改 // private static final ExecutorService executor =Executors.unconfigurableExecutorService(Executors.newFixedThreadPool(1)); public static void main(String[] args) throws Exception { if (executor instanceof ThreadPoolExecutor) { ((ThreadPoolExecutor) executor ).setCorePoolSize(100) ; ((ThreadPoolExecutor) executor ).setMaximumPoolSize( 100); } else { System.out.println( "转换出错,非线程工厂建立 "); } for (int i = 0; i < 50; i++) { //lambda executor.execute(() -> { try { System. out.println(" 任务执行 "); Thread.sleep (2000 ); } catch (Throwable e) { } }); } System.out.println( "提交50 个任务结束 "); executor.shutdown() ; } }
####八、Amdahl定律 串行部分越小加速比越大(单核运行时间/多核运行时间)工具
Speedup=1/(F+(1-F)/N)ui
F: 必须串行部分的比例 N: CPU个数this
####九、增长系统的伸缩性(增长计算资源时,吞吐量和处理能力相应增长)
####十、一种非阻塞的计数器
/** * CasCounter * <p/> * Nonblocking counter using CAS * * @author Brian Goetz and Tim Peierls */ @ThreadSafe public class CasCounter { private SimulatedCAS value; public int getValue() { return value.get(); } public int increment() { int v; do { v = value.get(); } while (v != value.compareAndSwap(v, v + 1)); //非阻塞通常使用底层的并发原语操做 return v + 1; } }