对java并发的一些学习,以前总结在幕布上,为了加深印象,这里从新学习下。java
阻塞 - 对临界区的代码串行化执行数组
无饥饿 - 在优先级并发中,可能低优先级的永远不能执行的状况,须要避免就叫无饥饿并发
无障碍 - 读写锁控制力度不一样,这里主要是乐观读app
无锁 - 比较交换,不是真正的不加锁,而是用java本地Unsaft 的cas方法ide
无等待 - 读写不等待,在写的时候,从新复制一个对象对其操做,操做完以后在合适的时机赋值回去,这种只适合极少许写,大量读的状况函数
public class RunnableTest implements Runnable { @Override public void run() {} }
public class ThreadTest extends Thread { @Override public void run() {} }
public class CallableTest implements Callable { @Override public Object call() throws Exception { return null; } } FutureTask futureTask = new FutureTask(new CallableTest());
synchronized Object.wait() Object.notify
synchronized同步关键字,对类,方法,对象进行修饰,修饰后获取对应监视器才能进入修饰的临界区
Object.wait(),Object.notify(),Object.notifyAll()方法必须在获取到监视器对象后才能调用,即要写在synchronized内部
Object.wait()以后会将当前线程放入对象的wait set中,释放获取的监视器对象
Object.notify()会从wait set随机对一线程唤醒,Object.notifyAll()会从wait set全部线程唤醒,不会当即释放获取的监视器,待结束临界区才释放,唤醒不表明被唤醒线程继续执行,其还须要争抢获取监视器才能继续执行工具
/** * @author tomsun28 * @date 19:49 2020-02-19 */ public class Demo { private static final Logger LOGGER = LoggerFactory.getLogger(Demo.class); private final static Demo demo = new Demo(); public static void waitDemo() { synchronized (demo) { try { // 调用wait方法必须获取对应的监视器,在wait以后,释放监视器 LOGGER.info("获取demo对象监视器1"); demo.wait(); LOGGER.info("释放demo对象监视器1"); Thread.sleep(2000); LOGGER.info("释放demo对象监视器2"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void notifyDemo() { // 从等待队列中唤醒一个 synchronized (demo) { LOGGER.info("获取demo对象监视器2"); demo.notify(); LOGGER.info("释放demo对象监视器3"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } LOGGER.info("释放demo对象监视器4"); } } public static void main(String[] args) { new Thread(() -> waitDemo()).start(); new Thread(() -> notifyDemo()).start(); } }
ReentrantLock Condition
重入锁(可重复获取的锁),重入锁内部的Condition
ReentrantLock lock几回,就须要unlock几回,lock以后须要紧跟tay finally代码,finally第一行就要unlock
Condition功能用法和Object.wait notify差很少,只不过其搭配重入锁使用,不用在使用前获取对象监视器
Condition.await()会释放当前拥有的锁,并挂起当前线程
Conditon.singal()会随机唤醒当前await的线程,Condition.singalAll()会唤醒全部,固然同Object.notify()唤醒不表明被唤醒线程能够继续执行,其还须要争抢获取到锁后继续执行性能
/** * @author tomsun28 * @date 22:15 2019-10-27 */ public class ReenterLockCondition { public static void main(String[] args) throws InterruptedException{ ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); new Thread(() -> { // 先获取重入锁 System.out.println("0 - 获取锁"); reentrantLock.lock(); try { System.out.println("0 - 释放锁,挂起线程"); // 同 Object.wait类似 其和ReentrantLock绑定,这里就是释放锁,挂起线程 condition.await(); System.out.println("0 - 等待锁释放,thread is fin"); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } }).start(); System.out.println("1 - 睡4秒"); Thread.sleep(4000); System.out.println("1 - 等待获取锁"); reentrantLock.lock(); try { System.out.println("1 - condition 唤醒线程"); condition.signal(); Thread.sleep(4000); System.out.println("1 - 睡4秒释放锁"); } finally { reentrantLock.unlock(); } } }
Semaphore
信号量-不一样于锁,其能够控制进入临界区的线程数量,锁只能是一个线程进入临界区
构造函数能够初始化信号数量,semaphore.acquire()消耗一个信号,semaphore.release()释放一个信号,当所剩的信号为0时则不容许进入临界区学习
/** * @author tomsun28 * @date 22:53 2019-10-27 */ public class SemaphoreDemo { public static void main(String[] args) { final Semaphore semaphore = new Semaphore(5); final AtomicInteger atomicInteger = new AtomicInteger(1); Runnable thread1 = () -> { try { semaphore.acquire(); Thread.sleep(2000); System.out.println(atomicInteger.getAndAdd(1)); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }; ExecutorService executorService = Executors.newFixedThreadPool(20); IntStream.range(0,20).forEach(i -> executorService.execute(thread1)); executorService.shutdown(); } }
ReetrantReadWriteLock
读写锁,以前的重入锁仍是synchronized,对于读写都是一样的加锁
ReadWriteLock把读和写的加锁分离,读读不互斥,读写互斥,写写互斥提升了锁性能,
须要注意,当有读锁存在时,写锁不能被获取,在大量一直在进行读锁的时候,会形成写锁的饥饿,即写锁一直获取不到锁ui
/** * @author tomsun28 * @date 21:35 2020-02-19 */ public class ReadWriteLockDemo { public static void main(String[] args) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); new Thread(() -> { readWriteLock.readLock().lock(); readWriteLock.readLock().lock(); try { System.out.println("获取2读锁"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); readWriteLock.readLock().unlock(); System.out.println("释放2读锁"); } }).start(); new Thread(() -> { readWriteLock.readLock().lock(); try { System.out.println("获取3读锁"); Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); System.out.println("释放3读锁"); } }).start(); new Thread(() -> { readWriteLock.writeLock().lock(); try { System.out.println("获取1写锁"); } finally { readWriteLock.writeLock().unlock(); System.out.println("释放1写锁"); } }).start(); } }
StampedLock
上面的读写锁有一个缺点,便可能写饥饿,StampedLock解决这个问题
使用乐观读 - 使用标记戳来判断此次读的过程是否有写的过程,如有则从新读取或转化为悲观读
/** * @author tomsun28 * @date 21:57 2020-02-19 */ public class StampedLockDemo { public static void main(String[] args) { StampedLock stampedLock = new StampedLock(); AtomicInteger flag = new AtomicInteger(0); new Thread(() -> { long writeStamped = stampedLock.writeLock(); try { System.out.println("获取写锁,休息1.006秒"); Thread.sleep(1006); System.out.println("当前值为: " + flag.getAndAdd(1)); } catch (InterruptedException e) { e.printStackTrace(); } finally{ stampedLock.unlockWrite(writeStamped); System.out.println("释放写锁"); } }).start(); new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } long stamp = stampedLock.tryOptimisticRead(); int readValue = flag.intValue(); while (!stampedLock.validate(stamp)) { System.out.println("重试乐观读"); stamp = stampedLock.tryOptimisticRead(); readValue = flag.intValue(); } System.out.println("乐观读为:" + readValue); }).start(); } }
CountDownLatch
并发减数拦截器,await()拦截阻塞当前线程,当减数器为0时通行
/** * @author tomsun28 * @date 23:35 2019-10-27 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(10); AtomicInteger atomicInteger = new AtomicInteger(1); Thread threadd = new Thread(() -> { try { Thread.sleep(1000); System.out.println(" thred check complete" + atomicInteger.getAndIncrement()); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); ExecutorService executorService = Executors.newFixedThreadPool(2); IntStream.range(0, 10).forEach(i -> executorService.submit(threadd)); countDownLatch.await(); System.out.println("fin"); } }
CyclicBarrier
栅栏,功能相似于ConutDownLatch,不过能够循环重置处理
ConutDownLatch是减数器为0时触发,并重置减数器循环等待下一次触发,而且能够提供触发方法
/** * @author tomsun28 * @date 23:59 2019-10-27 */ public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> { System.out.println("this cyclic ok"); }); AtomicInteger atomicInteger = new AtomicInteger(1); ExecutorService executorService = Executors.newFixedThreadPool(10); IntStream.range(0, 100).forEach(i -> { executorService.execute(() -> { try { Thread.sleep(1000); System.out.println(atomicInteger.getAndIncrement()); cyclicBarrier.await(); } catch (BrokenBarrierException | InterruptedException e) { e.printStackTrace(); } }); }); executorService.shutdown(); } }
LockSupport
同步工具类,比起wait notify要方便不少
LockSupport.park()是在当前线程阻塞挂起
LockSupport.unpark(thread)唤醒指定线程
/** * @author tomsun28 * @date 00:39 2019-10-28 */ public class LockSupportDemo { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { try { LockSupport.park(); LockSupport.park(); System.out.println("fin"); } catch (Exception e) { e.printStackTrace(); } }); thread.start(); Thread.sleep(3000); System.out.println("begin"); LockSupport.unpark(thread); Thread.sleep(3000); LockSupport.unpark(thread); } }
Executors 提供的线程池
newFixedThreadPool(threadNum)
最大与核心线程数量为threadNum的linked队列线程池 newSingleThreadExecutor()
最大与核心线程数量为1的linked队列线程池 newCachedThreadPool
核心数量为0,最大线程数量为Integer.MAX_VALUE的线程池,线程数量会跟插入的任务同步增加 newSingleThreadScheduledExecutor()
核心数量为1,最大线程数量为Integer.MAX_VALUE的延迟队列线程池 newWorkStealingPool()
ForkJoin线程池 ThreadPoolExecutor
自定义构造线程池
corePoolSize
核心线程数量 maxPoolSize
最大线程数量 keepAliveTime
超出核心线程数的线程的空闲存活时间 unit
时间单位 blockingQueue
存放未被执行的任务线程的阻塞队列,当队列满时,才会去从核心数量开始往最大线程数方向新增运行线程 threadFactory
线程建立工厂 rejectedHandler
任务满拒绝策略 ForkJoinPool
ForkJoinTask
RecursiveTask<T>
RecursiveAction
Collections.synchronziedXXX()
其本质是在容器外层加了synchronized(mutex)
map - CpncurrentHashMap - 1.7 segment 1.8 synchronzied cas
ArrayList - RCU - read copy update - CopyOnWriteArrayList
Queue - ConcurrentLinkedQueue
非阻塞队列阻塞队列 BlockingQueue - ReentrantLock Condition
ArrayBlockingQueue
数组阻塞队列LinkedBlockingQueue
列表阻塞队列SynchronousQueue
此阻塞队列无容量当即转发PriorityBlockingQueue
优先级阻塞队列DelayedWorkQueue
延迟阻塞队列AtomicBoolean
AtomicInteger
AtomicLong
LongAddr
AtomicRefrence
AtomicStampedRefreence
ThreadLocal
线程本地变量volatile
可见关键字Future
非阻塞模式CompletableFuture
非阻塞转载请注明 from tomsun28