多线程(三)

并发工具类和并发容器

 

为何要使用ConcurrentHashMap

在多线程环境下,使用HashMap进行put操做会引发死循环,致使CPU利用率接近100%,HashMap在并发执行put操做时会引发死循环,是由于多线程会致使HashMap的Entry链表php

造成环形数据结构,一旦造成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。java

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法,其余线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,因此竞争越激烈效率越低。算法

一些有用的方法

不少时候咱们但愿在元素不存在时插入元素,咱们通常会像下面那样写代码数据库

synchronized(map){
   if (map.get(key) == null){
      return map.put(key, value);
   } else{
      return map.get(key);
   }
}

putIfAbsent(key,value)方法原子性的实现了一样的功能

V putIfAbsent(K key, V value)  

若是key对应的value不存在,则put进去,返回null。不然不put,返回已存在的value。  
boolean remove(Object key, Object value)  

若是key对应的值是value,则移除K-V,返回true。不然不移除,返回false。
boolean replace(K key, V oldValue, V newValue)  

若是key对应的当前值是oldValue,则替换为newValue,返回true。不然不替换,返回false。
public class UseChm {

	HashMap<String, String> hashMap = new HashMap<>();
	ConcurrentHashMap<String, String> chm = new ConcurrentHashMap<>();

	public String putIfAbsent(String key, String value) {
		int a;
		synchronized (hashMap) {
			if (hashMap.get(key) == null) {
				return hashMap.put(key, value);
			} else {
				return hashMap.get(key);
			}
		}
	}

	// 存值
	public String useChm(String key, String value) {
		return chm.putIfAbsent(key, value);
	}

}

Hash的解释

散列,任意长度的输入,经过一种算法,变换成固定长度的输出。属于压缩的映射。编程

Md5,Sha,取余都是散列算法,ConcurrentHashMap中是wang/jenkins算法数组

ConcurrentHashMap在1.7下的实现

分段锁的设计思想。缓存

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际是一种可重入锁(ReentrantLock),HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap相似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素,每一个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先得到与它对应的Segment锁。安全

 

ConcurrentHashMap初始化方法是经过initialCapacity、loadFactor和concurrencyLevel(参数concurrencyLevel是用户估计的并发级别,就是说你以为最多有多少线程共同修改这个map,根据这个来肯定Segment数组的大小concurrencyLevel默认是DEFAULT_CONCURRENCY_LEVEL = 16;)。数据结构

ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,能够看到其中的对象属性要么是final的,要么是volatile的。多线程

ConcurrentHashMap在1.8下的实现

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素做为锁,从而实现了对每一行数据进行加锁,进一步减小并发冲突的几率。

改进二:将原先table数组+单向链表的数据结构,变动为table数组+单向链表+红黑树的结构。对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度能够下降到O(logN),能够改进性能。

ConcurrentSkipListMap  和ConcurrentSkipListSet

Skiplist跳表 能够提升链表的访问速度,达到红黑树的性能

ConcurrentSkipListMap    TreeMap的并发实现

ConcurrentSkipListSet     TreeSet的并发实现

了解什么是SkipList?

二分查找和AVL树查找

二分查找要求元素能够随机访问,因此决定了须要把元素存储在连续内存。这样查找确实很快,可是插入和删除元素的时候,为了保证元素的有序性,就须要大量的移动元素了。

若是须要的是一个可以进行二分查找,又能快速添加和删除元素的数据结构,首先就是二叉查找树,二叉查找树在最坏状况下可能变成一个链表。

因而,就出现了平衡二叉树,根据平衡算法的不一样有AVL树,B-Tree,B+Tree,红黑树等,可是AVL树实现起来比较复杂,平衡操做较难理解,这时候就能够用SkipList跳跃表结构。

传统意义的单链表是一个线性结构,向有序的链表中插入一个节点须要O(n)的时间,查找操做须要O(n)的时间。

若是咱们使用上图所示的跳跃表,就能够减小查找所需时间为O(n/2),由于咱们能够先经过每一个节点的最上面的指针先进行查找,这样子就能跳过一半的节点。

好比咱们想查找19,首先和6比较,大于6以后,在和9进行比较,而后在和12进行比较......最后比较到21的时候,发现21大于19,说明查找的点在17和21之间,从这个过程当中,咱们能够看出,查找的时候跳过了三、七、12等点,所以查找的复杂度为O(n/2)。

跳跃表其实也是一种经过“空间来换取时间”的一个算法,经过在每一个节点中增长了向前的指针,从而提高查找的效率。

跳跃表又被称为几率,或者说是随机化的数据结构,目前开源软件 Redis 和 lucence都有用到它。

 

ConcurrentLinkedQueue  无界非阻塞队列

ConcurrentLinkedQueue   LinkedList 并发版本

Add,offer:添加元素

Peek:get头元素并不把元素拿走

poll():get头元素把元素拿走

多用isEmpty()尽可能少用size()

/**
 * isEmpty()和size()的性能差别
 */
public class ConcurrentLinkedQueueTest {
	private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
	private static int count = 50000;
	private static int count2 = 2;
	private static CountDownLatch cd = new CountDownLatch(count2);

	public static void dothis() {
		for (int i = 0; i < count; i++) {
			queue.offer(i);
		}
	}

	public static void main(String[] args) throws InterruptedException {
		long timeStart = System.currentTimeMillis();
		ExecutorService es = Executors.newFixedThreadPool(4);
		ConcurrentLinkedQueueTest.dothis();
		// 启用两个线程取数据
		for (int i = 0; i < count2; i++) {
			es.submit(new Poll());
		}
		cd.await();
		System.out.println("cost time "
				+ (System.currentTimeMillis() - timeStart) + "ms");
		es.shutdown();
	}

	static class Poll implements Runnable {
		@Override
		public void run() {
			// while (queue.size() > 0) { //cost time 3648ms
			while (!queue.isEmpty()) {	  //cost time 412ms
				System.out.println(queue.poll());
			}
			cd.countDown();
		}
	}
}

 

CopyOnWriteArrayList和CopyOnWriteArraySet

写的时候进行复制,能够进行并发的读。

适用读多写少的场景:好比白名单,黑名单,商品类目的访问和更新场景,假如咱们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,可是某些关键字不容许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单天天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,若是在,则提示不能搜索。

弱点:内存占用高,数据一致性弱

 

什么是阻塞队列

取数据和读数据不知足要求时,会对线程进行阻塞

方法

抛出异常

返回值

一直阻塞

超时退出

插入

Add

offer

put

offer

移除

remove

poll

take

poll

检查

element

peek

没有

没有


经常使用阻塞队列

ArrayBlockingQueue: 数组结构组成有界阻塞队列。

先进先出原则,初始化必须传大小,take和put时候用的同一把锁

LinkedBlockingQueue:链表结构组成的有界阻塞队列

先进先出原则,初始化能够不传大小,put,take锁分离

PriorityBlockingQueue:支持优先级排序的无界阻塞队列,

排序,天然顺序升序排列,更改顺序:类本身实现compareTo()方法,初始化PriorityBlockingQueue指定一个比较器Comparator

DelayQueue: 使用了优先级队列的无界阻塞队列

支持延时获取,队列里的元素要实现Delay接口。DelayQueue很是有用,能够将DelayQueue运用在如下应用场景。

缓存系统的设计:能够用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

还有订单到期,限时支付等等。

SynchronousQueue:不存储元素的阻塞队列

每一个put操做必需要等take操做

LinkedTransferQueue:链表结构组成的界阻塞队列

Transfer,tryTransfer,生产者put时,当前有消费者take,生产者直接把元素传给消费者

LinkedBlockingDeque:链表结构组成的双向阻塞队列

能够在队列的两端插入和移除,xxxFirst头部操做,xxxLast尾部操做。工做窃取模式。

public class User {
    private final String name;

    public User(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}
public class CacheBean<T> implements Delayed {

	private String id;
	private String name;
	private T data;
	private long activeTime;// 到期时间

	public CacheBean(String id, String name, T data, long activeTime) {
		this.id = id;
		this.name = name;
		this.data = data;
		this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,
				TimeUnit.MILLISECONDS) + System.nanoTime();// 纳秒级别转换
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public T getData() {
		return data;
	}

	public void setData(T data) {
		this.data = data;
	}

	public long getActiveTime() {
		return activeTime;
	}

	public void setActiveTime(long activeTime) {
		this.activeTime = activeTime;
	}

	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(this.activeTime - System.nanoTime(),
				TimeUnit.NANOSECONDS); // 检查当前还剩多少时间
	}

	@Override
	public int compareTo(Delayed o) {
		long d = getDelay(TimeUnit.NANOSECONDS)
				- o.getDelay(TimeUnit.NANOSECONDS);
		return (d == 0) ? 0 : (d < 0) ? -1 : 1;
	}
}
public class GetFromCache implements Runnable {

	private DelayQueue<CacheBean<User>> queue;

	public GetFromCache(DelayQueue<CacheBean<User>> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				CacheBean<User> item = queue.take();// 阻塞地拿
				System.out.println("GetFromCache" + item.getId() + ":"
						+ item.getName() + "data:"
						+ ((User) item.getData()).getName());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
public class PutInCache implements Runnable {

    private DelayQueue<CacheBean<User>> queue;

    public PutInCache(DelayQueue<CacheBean<User>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        CacheBean cacheBean = new CacheBean("1","5秒",
                new User("Mark"),5000);
        CacheBean cacheBean2 = new CacheBean("1","3秒",
                new User("Mike"),3000);
        queue.offer(cacheBean);
        System.out.println("put in cache:"+cacheBean.getId()+":"+cacheBean.getName());
        queue.offer(cacheBean2);
        System.out.println("put in cache:"+cacheBean2.getId()+":"+cacheBean2.getName());

    }
}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<CacheBean<User>> queue = new DelayQueue<CacheBean<User>>();
        new Thread(new PutInCache(queue)).start();
        new Thread(new GetFromCache(queue)).start();

        for(int i=1;i<20;i++){
            Thread.sleep(500);
            System.out.println(i*500);
        }
    }
}
运行结果
put in cache:1:5秒
put in cache:1:3秒
500
1000
1500
2000
2500
3000
GetFromCache1:3秒data:Mike
3500
4000
4500
5000
GetFromCache1:5秒data:Mark
5500
6000
6500
7000
7500
8000
8500
9000
9500

了解阻塞队列的实现原理

使用了Condition实现。

生产者消费者模式

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生

产线程和消费线程的工做能力来提升程序总体处理数据的速度。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发

中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理

完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

生产者和消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是经过阻塞队列来进行通讯,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

什么是Fork/Join框架

并行执行任务的框架,把大任务拆分红不少的小任务,汇总每一个小任务的结果获得大任务的结果。

工做窃取算法

工做窃取(work-stealing)算法是指某个线程从其余队列里窃取任务来执行。

那么,为何须要使用工做窃取算法呢?假如咱们须要作一个比较大的任务,能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应。

好比A线程负责处理A队列里的任务。可是,有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

Fork/Join框架的使用

Fork/Join使用两个类来完成以上两件事情。

①ForkJoinTask:咱们要使用ForkJoin框架,必须首先建立一个ForkJoin任务。它提供在任务

中执行fork()和join()操做的机制。一般状况下,咱们不须要直接继承ForkJoinTask类,只须要继承它的子类,Fork/Join框架提供了如下两个子类。

·RecursiveAction:用于没有返回结果的任务。

·RecursiveTask:用于有返回结果的任务。

②ForkJoinPool:ForkJoinTask须要经过ForkJoinPool来执行。

Fork/Join有同步和异步两种方式。

public class PanTao {

    private final Color color;
    private final Size size;
    private final int Year;

    public PanTao(Color color, Size size, int year) {
        this.color = color;
        this.size = size;
        Year = year;
    }

    public Color getColor() {
        return color;
    }

    public Size getSize() {
        return size;
    }

    public int getYear() {
        return Year;
    }

    @Override
    public String toString() {
        return "PanTao{" +
                "color=" + color +
                ", size=" + size +
                ", Year=" + Year +
                '}';
    }
}
public enum Color {
    RED,GREEN
}
public enum Size {
    BIG,SMALL
}
public class MakePanTaoArray {

    //数组长度
    public static final int ARRAY_LENGTH  = 40000;
    //做为基准的值
    public static final int STANDARD_VAL  = 66694523;

    public static PanTao[] makeArray() {

        //new三个随机数发生器
        Random rColor = new Random();
        Random rSize = new Random();
        Random rYear = new Random();
        PanTao[] result = new PanTao[ARRAY_LENGTH];
        for(int i=0;i<ARRAY_LENGTH;i++){
            //填充数组
            PanTao panTao = new PanTao(
                    rColor.nextBoolean() ? Color.RED:Color.GREEN,
                    rSize.nextBoolean() ? Size.BIG:Size.SMALL,
                    rYear.nextInt(9001));
            result[i] =  panTao;
        }
        return result;
    }
}
public interface IPickTaoZi {
    boolean pick(PanTao[] src, int index);
}
public class WuKongPickImpl implements IPickTaoZi {

    private IProcessTaoZi processTaoZi;

    public WuKongPickImpl(IProcessTaoZi processTaoZi) {
        this.processTaoZi = processTaoZi;
    }

    @Override
    public boolean pick(PanTao[] src, int index) {
        if(src[index].getColor()== Color.RED&&
                src[index].getSize()== Size.BIG&&
                src[index].getYear()>=6000){
            processTaoZi.processTaoZi(src[index]);
            return true;
        }else{
            return false;
        }
    }
}
public interface IProcessTaoZi {
    void processTaoZi(PanTao taoZi);
}
public class WuKongProcessImpl implements IProcessTaoZi {
    @Override
    public void processTaoZi(PanTao taoZi) {
        //看看桃子,放到口袋里
        inBag();
    }

    private void inBag(){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ForkJoinWuKong {

  private static class XiaoWuKong extends RecursiveTask<Integer>{

      private final static int THRESHOLD = 100;//阈值,数组多小,进行具体的业务操做
      private PanTao[] src;
      private int fromIndex;
      private int toIndex;
      private IPickTaoZi pickTaoZi;

      public XiaoWuKong(PanTao[] src, int fromIndex, int toIndex, IPickTaoZi pickTaoZi) {
          this.src = src;
          this.fromIndex = fromIndex;
          this.toIndex = toIndex;
          this.pickTaoZi = pickTaoZi;
      }

      @Override
      protected Integer compute() {
          if (toIndex-fromIndex<THRESHOLD){
              int count =0 ;
              for(int i=fromIndex;i<toIndex;i++){
                  if (pickTaoZi.pick(src,i)) count++;
              }
              return count;
          }else{
              //fromIndex....mid......toIndex
              int mid = (fromIndex+toIndex)/2;
              XiaoWuKong left = new XiaoWuKong(src,fromIndex,mid,pickTaoZi);
              XiaoWuKong right = new XiaoWuKong(src,mid,toIndex,pickTaoZi);
              invokeAll(left,right);
              return left.join()+right.join();

          }
      }
  }

    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        PanTao[] src = MakePanTaoArray.makeArray();
        IProcessTaoZi processTaoZi = new WuKongProcessImpl();
        IPickTaoZi pickTaoZi = new WuKongPickImpl(processTaoZi);

        long start = System.currentTimeMillis();

        XiaoWuKong xiaoWuKong = new XiaoWuKong(src,0,
                src.length-1,pickTaoZi);
        //同步执行
        pool.invoke(xiaoWuKong);
        //System.out.println("Task is Running.....");

        System.out.println("The count is "+ xiaoWuKong.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }

}
运行结果:
The count is 3294 spend time:511ms

 

public class BaJieProcessImpl implements IProcessTaoZi {
	@Override
	public void processTaoZi(PanTao taoZi) {
		// 看看桃子,一口吃了
		eat();
	}

	// 看看桃子,一口吃了
	private void eat() {
		try {
			Thread.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
public class BaJiePickImpl implements IPickTaoZi {

    private IProcessTaoZi processTaoZi;

    public BaJiePickImpl(IProcessTaoZi processTaoZi) {
        this.processTaoZi = processTaoZi;
    }

    @Override
    public boolean pick(PanTao[] src, int index) {
        if(src[index].getColor()== Color.RED&&
                src[index].getSize()== Size.BIG){
            processTaoZi.processTaoZi(src[index]);
            return true;
        }else{
            return false;
        }
    }
}
public class ForkJoinBaJieAsyn {

	private static class XiaoBaJie extends RecursiveAction {

		private final static int THRESHOLD = 100;
		private PanTao[] src;
		private int fromIndex;
		private int toIndex;
		private IPickTaoZi pickTaoZi;

		public XiaoBaJie(PanTao[] src, int fromIndex, int toIndex,
				IPickTaoZi pickTaoZi) {
			this.src = src;
			this.fromIndex = fromIndex;
			this.toIndex = toIndex;
			this.pickTaoZi = pickTaoZi;
		}

		@Override
		protected void compute() {
			if (toIndex - fromIndex < THRESHOLD) {
				System.out.println(" from index = " + fromIndex + " toIndex="
						+ toIndex);
				int count = 0;
				for (int i = fromIndex; i <= toIndex; i++) {
					if (pickTaoZi.pick(src, i))
						count++;
				}
			} else {
				// fromIndex....mid.....toIndex
				int mid = (fromIndex + toIndex) / 2;
				XiaoBaJie left = new XiaoBaJie(src, fromIndex, mid, pickTaoZi);
				XiaoBaJie right = new XiaoBaJie(src, mid + 1, toIndex,
						pickTaoZi);
				invokeAll(left, right);
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {

		ForkJoinPool pool = new ForkJoinPool();
		PanTao[] src = MakePanTaoArray.makeArray();
		IProcessTaoZi processTaoZi = new BaJieProcessImpl();
		IPickTaoZi pickTaoZi = new BaJiePickImpl(processTaoZi);

		long start = System.currentTimeMillis();

		XiaoBaJie xiaoBaJie = new XiaoBaJie(src, 0, src.length - 1, pickTaoZi);
		// 异步执行
		pool.execute(xiaoBaJie);
		System.out.println("BaJie is picking.....");

		Thread.sleep(2);
		System.out.println("Please waiting.....");

		while (!xiaoBaJie.isDone()) {
			showLog(pool);
			TimeUnit.MILLISECONDS.sleep(100);
		}
		// 关闭链接池
		pool.shutdown();
		pool.awaitTermination(1, TimeUnit.DAYS);
		showLog(pool);

		xiaoBaJie.join();
		System.out.println(" spend time:"
				+ (System.currentTimeMillis() - start) + "ms");

	}

	// 监控Fork/Join池相关方法
	private static void showLog(ForkJoinPool pool) {
		System.out.printf("**********************\n");

		System.out.printf("线程池的worker线程们的数量:%d\n", pool.getPoolSize());
		System.out.printf("当前执行任务的线程的数量:%d\n", pool.getActiveThreadCount());
		System.out.printf("没有被阻塞的正在工做的线程:%d\n", pool.getRunningThreadCount());
		System.out.printf("已经提交给池尚未开始执行的任务数:%d\n",
				pool.getQueuedSubmissionCount());
		System.out.printf("已经提交给池已经开始执行的任务数:%d\n", pool.getQueuedTaskCount());
		System.out.printf("线程偷取任务数:%d\n", pool.getStealCount());
		System.out.printf("池是否已经终止 :%s\n", pool.isTerminated());
		System.out.printf("**********************\n");
	}

}
设置数组长度为400后打印结果
BaJie is picking.....
 from index = 100 toIndex=199
 from index = 0 toIndex=99
 from index = 200 toIndex=299
 from index = 300 toIndex=399
Please waiting.....
**********************
线程池的worker线程们的数量:5
当前执行任务的线程的数量:4
没有被阻塞的正在工做的线程:0
已经提交给池尚未开始执行的任务数:0
已经提交给池已经开始执行的任务数:0
线程偷取任务数:0
池是否已经终止 :false
**********************
**********************
线程池的worker线程们的数量:0
当前执行任务的线程的数量:0
没有被阻塞的正在工做的线程:0
已经提交给池尚未开始执行的任务数:0
已经提交给池已经开始执行的任务数:0
线程偷取任务数:4
池是否已经终止 :true
**********************
 spend time:247ms

 

CountDownLatch

容许一个或多个线程等待其余线程完成操做。CountDownLatch的构造函数接收一个int类型的参数做为计数器,若是你想等待N个点完成,这里就传入N。当咱们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。因为countDown方法能够用在任何地方,因此这里说的N个点,能够是N个线程,也能够是1个线程里的N个执行步骤。用在多个线程时,只须要把这个CountDownLatch的引用传递到线程里便可。

public class CountDownLatchCase {

	static CountDownLatch c = new CountDownLatch(7);

	private static class SubThread implements Runnable {

		@Override
		public void run() {
			System.out.println(Thread.currentThread().getId());
			c.countDown();
			System.out.println(Thread.currentThread().getId() + " is done");
		}
	}

	public static void main(String[] args) throws InterruptedException {

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getId());
				c.countDown();
				System.out.println("sleeping...");
				try {
					Thread.sleep(1500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("sleep is completer");
				c.countDown();
			}
		}).start();

		for (int i = 0; i <= 4; i++) {
			Thread thread = new Thread(new SubThread());
			thread.start();
		}

		c.await();
		System.out.println("Main will gone.....");
	}
}
运行结果
11
sleeping...
12
13
12 is done
13 is done
14
14 is done
16
16 is done
15
15 is done
sleep is completer
Main will gone.....

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

CyclicBarrier能够用于多线程计算数据,最后合并计算结果的场景。

public class CyclicBarrriesBase {

	static CyclicBarrier c = new CyclicBarrier(2);

	public static void main(String[] args) throws InterruptedException,
			BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getId());
				try {
					System.out.println(Thread.currentThread().getId()+ " will await");
					c.await();// 等待主线程完成
					System.out.println(Thread.currentThread().getId()+ " is going");
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("sleeping...");

			}
		}).start();

		System.out.println("main will sleep.....");
		Thread.sleep(2000);
		c.await();// //等待子线程完成
		System.out.println("All are complete.");
	}

}
运行结果
11
main will sleep.....
11 will await
All are complete.
11 is going
sleeping...
public class CyclicBarrierSum {

    static CyclicBarrier c = new CyclicBarrier(5,new SumThread());
    //子线程结果存放的缓存
    private static ConcurrentHashMap<String,Integer> resultMap =
            new ConcurrentHashMap<>();

    //全部子线程达到屏障后,会执行这个Runnable的任务
    private static class SumThread implements Runnable{

        @Override
        public void run() {
            int result =0;
            for(Map.Entry<String,Integer> workResult:resultMap.entrySet()){
                result = result+workResult.getValue();
            }
            System.out.println("result = "+result);
            System.out.println("彻底能够作与子线程,统计无关的事情.....");
        }
    }

    //工做线程,也就是子线程
    private static class WorkThread implements Runnable{

        private Random t = new Random();

        @Override
        public void run() {
            int r = t.nextInt(1000)+1000;
            System.out.println(Thread.currentThread().getId()+":r="+r);
            resultMap.put(Thread.currentThread().getId()+"",r);
            try {
                Thread.sleep(1000+r);
                c.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        for(int i=0;i<=4;i++){
            Thread thread = new Thread(new WorkThread());
            thread.start();
        }
    }
}
运行结果
11:r=1277
14:r=1144
15:r=1818
13:r=1000
12:r=1253
result = 6492
彻底能够作与子线程,统计无关的事情.....

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可使用reset()方法重

置,CountDownLatch.await通常阻塞主线程,全部的工做线程执行countDown,而CyclicBarrierton经过工做线程调用await从而阻塞工做线程,直到全部工做线程达到屏障。

控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源。应用场景Semaphore能够用于作流量控制,特别是公用资源有限的应用场景,好比数据库链接。假若有一个需求,要读取几万个文件的数据,由于都是IO密集型任务,咱们能够启动几十个线程并发地读取,可是若是读到内存后,还须要存储到数据库中,而数据库的链接数只有10个,这时咱们必须控制只有10个线程同时获取数据库链接保存数据,不然会报错没法获取数据库链接。这个时候,就可使用Semaphore来作流量控制。。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完以后调用release()方法归还许可证。还能够用tryAcquire()方法尝试获取许可证。

Semaphore还提供一些其余方法,具体以下。

·intavailablePermits():返回此信号量中当前可用的许可证数。

·intgetQueueLength():返回正在等待获取许可证的线程数。

·booleanhasQueuedThreads():是否有线程正在等待获取许可证。

·void reducePermits(int reduction):减小reduction个许可证,是个protected方法。

·Collection getQueuedThreads():返回全部等待获取许可证的线程集合,是个protected方

法。

public class SemaphporeCase<T> {

    private final Semaphore items;//有多少元素可拿
    private final Semaphore space;//有多少空位可放元素
    private List queue = new LinkedList<>();

    public SemaphporeCase(int itemCounts){
        this.items = new Semaphore(0);
        this.space = new Semaphore(itemCounts);
    }

    //放入数据
    public void put(T x) throws InterruptedException {
        space.acquire();//拿空位的许可,没有空位线程会在这个方法上阻塞
        synchronized (queue){
            queue.add(x);
        }
        items.release();//有元素了,能够释放一个拿元素的许可
    }

    //取数据
    public T take() throws InterruptedException {
        items.acquire();//拿元素的许可,没有元素线程会在这个方法上阻塞
        T t;
        synchronized (queue){
            t = (T)queue.remove(0);
        }
        space.release();//有空位了,能够释放一个存在空位的许可
        return t;
    }
}

Exchanger

Exchanger(交换者)是一个用于线程间协做的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程能够交换彼此的数据。这两个线程经过exchange方法交换数据,若是第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就能够交换数据,将本线程生产出来的数据传递给对方。

public class ExchangeCase {

	static final Exchanger<List<String>> exgr = new Exchanger<>();

	public static void main(String[] args) {

		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					List<String> list = new ArrayList<>();
					list.add(Thread.currentThread().getId() + " insert A1");
					list.add(Thread.currentThread().getId() + " insert A2");
					System.out.println(Thread.currentThread().getId()+ " exchange");
					list = exgr.exchange(list);// 交换数据
					for (String item : list) {
						System.out.println(Thread.currentThread().getId() + ":"+ item);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();

		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					List<String> list = new ArrayList<>();
					list.add(Thread.currentThread().getId() + " insert B1");
					list.add(Thread.currentThread().getId() + " insert B2");
					list.add(Thread.currentThread().getId() + " insert B3");
					System.out.println(Thread.currentThread().getId()+ " will sleep");
					Thread.sleep(3500);
					System.out.println(Thread.currentThread().getId()+ " exchange");
					list = exgr.exchange(list);// 交换数据
					for (String item : list) {
						System.out.println(Thread.currentThread().getId() + ":"
								+ item);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();

	}

}
运行结果
11 exchange
12 will sleep
12 exchange
12:11 insert A1
12:11 insert A2
11:12 insert B1
11:12 insert B2
11:12 insert B3
相关文章
相关标签/搜索