Java并发多线程 - 并发工具类JUC

安全共享对象策略

1.Java线程限制 : 一个被线程限制的对象,由线程独占,而且只能被占有它的线程修改
2.共享只读 : 一个共享只读的对象,在没有额外同步的状况下,能够被多个线程并发访问,
可是任何线程都不能修改它
3.线程安全对象 : 一个线程安全的对象或则容器,在内部经过同步机制来保证线程安全,
因此其余线程无需额外的同步就能够经过公共接口随意访问它
4.被守护对象 : 被守护对象只能经过获取特定的锁来访问html

线程安全 - 同步容器

采用synchronized关键字同步,缺点 :segmentfault

  1. 不能完成作到线程安全
  2. 性能差

ArrayLisy -> Vector, Stack
HashMap -> HashTable (key、value不能为null)
Collections.synchronizedXXX(List、Set、Map)安全

线程安全 - 并发容器 J.U.C

ArrayList -> CopyOnWriteArrayList
HashSet、TreeSet -> CopyOnWriteArraySet ConcurrentSkipListSet
HashMap、TreeMap -> ConcurrentHashMap ConcurrentSkipListMap并发

AbstractQueuedSynchronizer - AQS框架

  1. 使用Node实现FIFO队列,能够用于构建锁或则其余同步装置的基础框架
  2. 利用一个int类型表示状态
  3. 使用方法是基础
  4. 子类经过继承并经过实现它的方法管理其状态 { acquire 和 release} 的方法操纵状态
  5. 能够同时实现排他锁和共享锁模式(独占、共享)

经常使用类

CountDownLatch
Semaphore
CyclicBarrier
ReentrantLock
Condition
FutureTask工具

CountDownLacth

CountDownLatch是一个同步工具类,它容许一个或多个线程一直等待,直到其余线程执行完后再执行。例如,应用程序的主线程但愿在负责启动框架服务的线程已经启动全部框架服务以后执行。性能

CountDownLatch是经过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了本身的任务后,计数器的值就相应得减1。当计数器到达0时,表示全部的线程都已完成任务,而后在闭锁上等待的线程就能够恢复执行任务。ui

@Self4j
public class CountDownLatchExample {

    private final  static int threadCount = 200;
    
    public static void main(String[] arg) {
    
        ExecutorService exec = Executors.newCachedThreadPool(); 
        
        final CountDownLatch lacth = new CountDownLatch(5);
        
        for (int i = 0; i < 1000; i++) {
            exec.excute( () -> {
            final int threadNum  = i;
            try {
                test(threadNum);
            } catch (Exception e) {
                log.error("exception", e);
            } finally {
                // latch递减
                lacth.countDown();
            }
            });
        }
        // 等待latch计数器为0,则继续往下执行
        latch.await();
        // latch的await方法也能够传入等待时间,等到等待时间后无论有没完成计数都往下执行
        // latch.await( 10, TimeUnit.MILLISECONDS);
        log.info("finished");
        exec.shutdown();
    }

    public static void test(int i)  throw Exception{
        log.info("thread: {}", i);
    }
}

Semaphore

Semaphore(int permits):构造方法,建立具备给定许可数的计数信号量并设置为非公平信号量。
Semaphore(int permits,boolean fair):构造方法,当fair等于true时,建立具备给定许可数的计数信号量并设置为公平信号量。
void acquire():今后信号量获取一个许可前线程将一直阻塞。
void acquire(int n):今后信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。
void release():释放一个许可,将其返回给信号量。就如同车开走返回一个车位。
void release(int n):释放n个许可。
int availablePermits():获取当前可用的许可数。
boolean tryAcquire():仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire(int permits):仅在调用时此信号量中有给定数目的许可时,才今后信号量中获取这些许可。线程

boolean tryAcquire(int permits,
                          long timeout,
                          TimeUnit unit)
                   throws InterruptedException

若是在给定的等待时间内此信号量有可用的全部许可,而且当前线程未被 中断,则今后信号量获取给定数目的许可。调试

@Self4j
public class SemaphoreExample {

    private final  static int threadCount = 200;

    public static void main(String[] arg) {

        ExecutorService exec = Executors.newCachedThreadPool(); 
    
        final Semaphore semaphore = new Semaphore(3);
    
        for (int i = 0; i < threadCount; i++) {
            exec.excute( () )-> {
            final int threadNum  = i;
            try {
                // tryAcquire会尝试去获取一个信号量,若是获取不到
                // 则什么都不会发生,走接下来的逻辑
                // if (semaphore.tryAcquire(1)) {
                //    test(i);
                //    semaphore.release();//释放一个信号量
                // }
                semaphore.acquire();//获取一个信号量
                test(i);
                semaphore.release();//释放一个信号量
            } catch (Exception e) {
                log.error("exception", e);
            } 
            });
        }
        log.info("finished");
        exec.shutdown();
    }

    public static void test(int i)  throw Exception{
        log.info("thread: {}", i);
    }
}

CyclicBarrier

一个同步辅助类,它容许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

CyclicBarrier(int parties, Runnable barrierAction)
建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操做,该操做由最后一个进入 barrier 的线程执行。

CyclicBarrier(int parties)
建立一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预约义的操做。

int await()
在全部 参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。

int await(long timeout,
                 TimeUnit unit)
          throws InterruptedException,
                 BrokenBarrierException,
                 TimeoutException

在全部 参与者都已经在此屏障上调用 await 方法以前将一直等待,或者超出了指定的等待时间。

boolean isBroken() : 查询此屏障是否处于损坏状态。

void reset() :
将屏障重置为其初始状态。若是全部参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在因为其余缘由形成损坏 以后,实行重置可能会变得很复杂;此时须要使用其余方式从新同步线程,并选择其中一个线程来执行重置。与为后续使用建立一个新 barrier 相比,这种方法可能更好一些。

int getNumberWaiting() :返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。

@Self4j
public class CyclicBarrierExample {

    private final  static int threadCount = 200;
    
    private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(7, 
        () -> {
        log.info("callback is running !");
        }
    );
    
    public static void main(String[] arg) {
    
        ExecutorService exec = Executors.newCachedThreadPool(); 
        
        for (int i = 0; i < threadCount; i++) {
            exec.excute( () -> {
                final int threadNum  = i;
                try {
                    race(i);
                } catch (Exception e) {
                    log.error("exception", e);
                } 
            });
        }
        log.info("finished");
        
        exec.shutdown();
    }
    
    public static void race(int i)  throw Exception{
        log.info("thread {} is ready", i);
        cyclicBarrier.await();
        log.info("thread {} is continue", i);
    }
}

来源:https://segmentfault.com/a/1190000017864607

相关文章
相关标签/搜索