多线程(四)之并发容器

1、并发容器

ConcurrentHashMap

为何使用ConcurrentHashMapphp

在多线程环境下,使用HashMap进行put操做会引发死循环,致使CPU利用率接近100%,HashMap在并发执行put操做时,出发rehash时,可能会引发链表成环的现象,一旦造成环形数据结构,Entry的next结点永远不为空,就会产生死循环获取Entry。java

HshTable容器使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法,其余的线程也访问H啊是Table的同步方法时,会进入阻塞或者轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,因此竞争越激烈效率越低。算法

常见的方法数据库

putIfAbsent(K key,V value)编程

若是key对应的value不存在,则put进去,返回null。不然不put,返回已存在的value.数组

boolean remove(Object key , Object value)缓存

若是key对应的值是value,则移除k-v,返回true。不然不移除,返回false安全

boolean replace(K key, V oldValue, V newValue)数据结构

若是key对应的当前值是oldValue,则替换为newValue,返回true。不然不替换,返回false。多线程

Hash的解释

散列,任意长度的输入,经过一种算法,变换成固定长度的输出。属于压缩的映射。Md5,Sha,取余都是散列算法,ConcurrentHashMap中是wang/jenkins算法

ConcurrentHashMap在1.7下的实现

分段锁的设计思想。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际是一种可重入锁(ReentrantLock),HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap相似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素,每一个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先得到与它对应的Segment锁。

ConcurrentHashMap初始化方法是经过initialCapacity、loadFactor和concurrencyLevel(参数concurrencyLevel是用户估计的并发级别,就是说你以为最多有多少线程共同修改这个map,根据这个来肯定Segment数组的大小concurrencyLevel默认是DEFAULT_CONCURRENCY_LEVEL = 16;)。

ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,能够看到其中的对象属性要么是final的,要么是volatile的。

ConcurrentHashMap1.8版本实现

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素做为锁,从而实现了对每一行数据进行加锁,进一步减小并发冲突的几率。

改进二:将原先table数组+单向链表的数据结构,变动为table数组+单向链表+红黑树的结构。对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度能够下降到O(logN),能够改进性能。

 

 

ConcurrentSkipListMap  和ConcurrentSkipListSet

ConcurrentSkipListMap    TreeMap的并发实现

ConcurrentSkipListSet     TreeSet的并发实现

了解什么是SkipList(跳表)?

二分查找和AVL树查找

二分查找要求元素能够随机访问,因此决定了须要把元素存储在连续内存。这样查找确实很快,可是插入和删除元素的时候,为了保证元素的有序性,就须要大量的移动元素了。

若是须要的是一个可以进行二分查找,又能快速添加和删除元素的数据结构,首先就是二叉查找树,二叉查找树在最坏状况下可能变成一个链表。

因而,就出现了平衡二叉树,根据平衡算法的不一样有AVL树,B-Tree,B+Tree,红黑树等,可是AVL树实现起来比较复杂,平衡操做较难理解,这时候就能够用SkipList跳跃表结构。

传统意义的单链表是一个线性结构,向有序的链表中插入一个节点须要O(n)的时间,查找操做须要O(n)的时间。

若是咱们使用上图所示的跳跃表,就能够减小查找所需时间为O(n/2),由于咱们能够先经过每一个节点的最上面的指针先进行查找,这样子就能跳过一半的节点。

好比咱们想查找19,首先和6比较,大于6以后,在和9进行比较,而后在和17进行比较......最后比较到21的时候,发现21大于19,说明查找的点在17和21之间,从这个过程当中,咱们能够看出,查找的时候跳过了三、七、12等点,所以查找的复杂度为O(n/2)。

跳跃表其实也是一种经过“空间来换取时间”的一个算法,经过在每一个节点中增长了向前的指针,从而提高查找的效率。

跳跃表又被称为几率,或者说是随机化的数据结构,目前开源软件 Redis 和 lucence都有用到它。

 

 

ConcurrentLinkedQueue  无界非阻塞队列

能够看做是LinkedList的并发版本

add,offer:添加元素

Peek:get头元素并不把元素拿走

poll():get头元素把元素拿走

 

CopyOnWriteArrayList和CopyOnWriteArraySet

写的时候进行复制,能够进行并发的读。

适用读多写少的场景:好比白名单,黑名单,商品类目的访问和更新场景,假如咱们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,可是某些关键字不容许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单天天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,若是在,则提示不能搜索。

弱点:内存占用高,数据一致性弱

 

什么是阻塞队列

取数据和读数据不知足要求时,会对线程进行阻塞

方法

抛出异常

返回值

一直阻塞

超时退出

插入

Add

offer

put

offer

移除

Remove

poll

take

poll

检查

element

peek

没有

没有

 

经常使用阻塞队列

ArrayBlockingQueue: 数组结构组成有界阻塞队列。

先进先出原则,初始化必须传大小,take和put时候用的同一把锁

LinkedBlockingQueue:链表结构组成的有界阻塞队列

先进先出原则,初始化能够不传大小,put,take锁分离

PriorityBlockingQueue:支持优先级排序的无界阻塞队列,

排序,天然顺序升序排列,更改顺序:类本身实现compareTo()方法,初始化PriorityBlockingQueue指定一个比较器Comparator

DelayQueue: 使用了优先级队列的无界阻塞队列

支持延时获取,队列里的元素要实现Delay接口。DelayQueue很是有用,能够将DelayQueue运用在如下应用场景。

缓存系统的设计:能够用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

还有订单到期,限时支付等等。

假如对User进行缓存:

public class User {
	
	private String name;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public User(String name) {
		super();
		this.name = name;
	}
}

CacheBean:

由于DelayQueue中存放的是Delayed接口,因此CacheBean要实现Delayed接

public class CacheBean<T> implements Delayed {

	private String id;
	
	private String name;
	
	private T data;
	
	
	//数据的到期时间
	private Long activeTime;
	
	
	//要求传入的activeTime为毫秒,在构造函数中会自动转换成纳秒
	public CacheBean(String id, String name, T data, Long activeTime) {
		super();
		this.id = id;
		this.name = name;
		this.data = data;
		this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS)+System.nanoTime();
	}
	
	
	

	public String getId() {
		return id;
	}




	public void setId(String id) {
		this.id = id;
	}




	public String getName() {
		return name;
	}




	public void setName(String name) {
		this.name = name;
	}




	public T getData() {
		return data;
	}




	public void setData(T data) {
		this.data = data;
	}



	

	public Long getActiveTime() {
		return activeTime;
	}




	public void setActiveTime(Long activeTime) {
		this.activeTime = activeTime;
	}




	@Override
	public int compareTo(Delayed o) {
		long d = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
		return (d == 0) ? 0 : (d >0 ? 1 : -1);
	}

	//返回还有多少纳秒的剩余时间
	@Override
	public long getDelay(TimeUnit unit) {

		return unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
	}

}

定义两个任务类,分别是向DelayQueue中存数据和取数据。

public class PutUserWork implements Runnable{
	private DelayQueue<CacheBean<User>> delayQueue;
	private List<CacheBean<User>> list;
	public PutUserWork(DelayQueue<CacheBean<User>> delayQueue, List<CacheBean<User>> list) {
		super();
		this.delayQueue = delayQueue;
		this.list = list;
	}
	@Override
	public void run() {
		list.forEach(cacheBean->{
			delayQueue.put(cacheBean);
			System.out.println("放入:"+cacheBean.getData());
		});
	}
}



public class GetUserWork implements Runnable{

	private DelayQueue<CacheBean<User>> delayque;

	public GetUserWork(DelayQueue<CacheBean<User>> delayque) {
		super();
		this.delayque = delayque;
	}

	@Override
	public void run() {
		while(true)
		{
			try {
				CacheBean<User> element = delayque.take();
				System.out.println("get element:"+element+"  id:"+element.getId()+", name:"+element.getName()
				+" data:"+element.getName());
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}

}

测试类:

public class MainTest {
	public static void main(String[] args) throws InterruptedException {
		User u1 = new User("张三");
		CacheBean<User> c1 = new CacheBean<User>("1", "张三", u1, 5000L);
		
		User u2 = new User("李四");
		CacheBean<User> c2 = new CacheBean<User>("2", "李四", u2, 3000L);
		
		List<CacheBean<User>> list = new ArrayList<>();
		list.add(c1);
		list.add(c2);
		
		DelayQueue<CacheBean<User>> delayQueue = new DelayQueue<>();
		
		new Thread(new PutUserWork(delayQueue,list)).start();
		
		new Thread(new GetUserWork(delayQueue)).start();
		
		
		CountDownLatch countDownLatch = new CountDownLatch(1);
		countDownLatch.await();
	}
}

 

生产者消费者模式

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序总体处理数据的速度。在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是经过阻塞队列来进行通讯,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

 

Fork/Join框架

并行执行任务的框架,把大任务拆分红不少的小任务,汇总每一个小任务的结果获得大任务的结果。

工做窃取算法

工做窃取(work-stealing)算法是指某个线程从其余队列里窃取任务来执行。那么,为何须要使用工做窃取算法呢?假如咱们须要作一个比较大的任务,能够把这个任务分割为若干互不依赖的子任务,为了减小线程间的竞争,把这些子任务分别放到不一样的队列里,并为每一个队列建立一个单独的线程来执行队列里的任务,线程和队列一一对应。好比A线程负责处理A队列里的任务。可是,有的线程会先把本身队列里的任务干完,而其余线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其余线程干活,因而它就去其余线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,因此为了减小窃取任务线程和被窃取任务线程之间的竞争,一般会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

 

Fork/Join框架的使用

Fork/Join使用两个类来完成以上两件事情。

①ForkJoinTask:咱们要使用ForkJoin框架,必须首先建立一个ForkJoin任务。它提供在任务

中执行fork()和join()操做的机制。一般状况下,咱们不须要直接继承ForkJoinTask类,只须要继承它的子类,Fork/Join框架提供了如下两个子类。

·RecursiveAction:用于没有返回结果的任务。

·RecursiveTask:用于有返回结果的任务。

②ForkJoinPool:ForkJoinTask须要经过ForkJoinPool来执行。

Fork/Join有同步和异步两种方式。

例子:计算硬盘中.txt文件的个数。。。。

public class MyRunnable2 {
	static class CountWork extends RecursiveTask<Integer>
	{

		private String filePath;
		public CountWork(String filePath) {
			super();
			this.filePath = filePath;
		}
		@Override
		protected Integer compute() {
			File file = new File(filePath);

			int count = 0;
			if (file.isDirectory()) {
				File[] files = file.listFiles();
				if (files == null || files.length == 0)
					return 0;


				List<CountWork> taskList = new ArrayList<CountWork>();

				for (File f : files) {
					if (f.isDirectory()) {
						String lastName = f.getName();
						String newFilePath = filePath + File.separator + lastName;
						//	 count+= count(newFilePath);
						CountWork countWork = new CountWork(newFilePath);
						taskList.add(countWork);
						
//						invokeAll(countWork);
//						count += countWork.join();
					} else {
						try {
							Thread.currentThread().sleep(1);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						count++;
					}


				}

				if (!taskList.isEmpty()) {

					//方式1
					for (CountWork mtask : invokeAll(taskList)) {
						count += mtask.join();
					}
					
					//方式2,
					
//					for (CountWork mtask : taskList) {
//						invokeAll(mtask);						
//						
//						//Returns the result of the computation when it {@link #isDone is 一直等待到计算完成才返回结果
//						count += mtask.join();
//					}
				}
			}else {
				count=1;
			}
			
			//System.out.println(Thread.currentThread().getName());
			return count;
		}
	}


	//监控Fork/Join池相关方法
	private static void showLog(ForkJoinPool pool) {
		System.out.printf("**********************\n");

		System.out.printf("线程池的worker线程们的数量:%d\n",
				pool.getPoolSize());
		System.out.printf("当前执行任务的线程的数量:%d\n",
				pool.getActiveThreadCount());
		System.out.printf("没有被阻塞的正在工做的线程:%d\n",
				pool.getRunningThreadCount());
		System.out.printf("已经提交给池尚未开始执行的任务数:%d\n",
				pool.getQueuedSubmissionCount());
		System.out.printf("已经提交给池已经开始执行的任务数:%d\n",
				pool.getQueuedTaskCount());
		System.out.printf("线程偷取任务数:%d\n",
				pool.getStealCount());
		System.out.printf("池是否已经终止 :%s\n",
				pool.isTerminated());
		System.out.printf("**********************\n");
	}
	public static void main(String[] args) throws InterruptedException {
		
		ForkJoinPool forkJoinPool = new ForkJoinPool(15);
				
		File [] roots = File.listRoots();
		Long start = System.currentTimeMillis();
		//File e=new File("D:\\");
		for(File e:roots)
		{
			CountWork countWork = new CountWork(e.getAbsolutePath());
			forkJoinPool.invoke(countWork);
//			Thread.sleep(1000);
//			forkJoinPool.execute(countWork);
//			showLog(forkJoinPool);
			System.out.println(countWork.join());


		}

		Long end = System.currentTimeMillis();
		System.out.println("用时:"+(end-start)/1000);
	}
}

 

 

2、CountDownLatch、CyclicBarrier、Semaphore(控制线程并发数)、Exchanger

CountDownLatch

容许一个或多个线程等待其余线程完成操做。CountDownLatch的构造函数接收一个int类型的参数做为计数器,若是你想等待N个点完成,这里就传入N。当咱们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。因为countDown方法能够用在任何地方,因此这里说的N个点,能够是N个线程,也能够是1个线程里的N个执行步骤。用在多个线程时,只须要把这个CountDownLatch的引用传递到线程里便可。

 

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要作的事情是,让一组线程到达一个屏障(也能够叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,全部被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每一个线程调用await方法告诉CyclicBarrier我已经到达了屏障,而后当前线程被阻塞。CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。CyclicBarrier能够用于多线程计算数据,最后合并计算结果的场景。

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可使用reset()方法重置,CountDownLatch.await通常阻塞主线程,全部的工做线程执行countDown,而CyclicBarrierton经过工做线程调用await从而阻塞工做线程,直到全部工做线程达到屏障。

 

Semaphore(控制线程并发数)

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源。应用场景Semaphore能够用于作流量控制,特别是公用资源有限的应用场景,好比数据库链接。假若有一个需求,要读取几万个文件的数据,由于都是IO密集型任务,咱们能够启动几十个线程并发地读取,可是若是读到内存后,还须要存储到数据库中,而数据库的链接数只有10个,这时咱们必须控制只有10个线程同时获取数据库链接保存数据,不然会报错没法获取数据库链接。这个时候,就可使用Semaphore来作流量控制。。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完以后调用release()方法归还许可证。还能够用tryAcquire()方法尝试获取许可证。

Semaphore还提供一些其余方法,具体以下。

·int        availablePermits():返回此信号量中当前可用的许可证数。

·int        getQueueLength():返回正在等待获取许可证的线程数。

·boolean       hasQueuedThreads():是否有线程正在等待获取许可证。

·void    reducePermits(int reduction):减小reduction个许可证,是个protected方法。

·Collection   getQueuedThreads():返回全部等待获取许可证的线程集合,是个protected方法。

用法:用用信号量实现有界缓存

public class SemaphporeCase<T> {

    private final Semaphore items;//有多少元素可拿
    private final Semaphore space;//有多少空位可放元素
    private List queue = new LinkedList<>();

    public SemaphporeCase(int itemCounts){
        this.items = new Semaphore(0);
        this.space = new Semaphore(itemCounts);
    }

    //放入数据
    public void put(T x) throws InterruptedException {
        space.acquire();//拿空位的许可,没有空位线程会在这个方法上阻塞
        synchronized (queue){
            queue.add(x);
        }
        items.release();//有元素了,能够释放一个拿元素的许可
    }

    //取数据
    public T take() throws InterruptedException {
        items.acquire();//拿元素的许可,没有元素线程会在这个方法上阻塞
        T t;
        synchronized (queue){
            t = (T)queue.remove(0);
        }
        space.release();//有空位了,能够释放一个存在空位的许可
        return t;
    }
}

 

Exchanger

Exchanger(交换者)是一个用于线程间协做的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程能够交换彼此的数据。这两个线程经过exchange方法交换数据,若是第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就能够交换数据,将本线程生产出来的数据传递给对方。

例子:

public class ExchangeCase {
	static final Exchanger<List<String>> exgr = new Exchanger<>();

	public static void main(String[] args) {

		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					List<String> list = new ArrayList<>();
					list.add(Thread.currentThread().getId()+" insert A1");
					list.add(Thread.currentThread().getId()+" insert A2");
					list = exgr.exchange(list);//交换数据
					for(String item:list){
						System.out.println(Thread.currentThread().getId()+":"+item);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();

		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					List<String> list = new ArrayList<>();
					list.add(Thread.currentThread().getId()+" insert B1");
					list.add(Thread.currentThread().getId()+" insert B2");
					list.add(Thread.currentThread().getId()+" insert B3");
					System.out.println(Thread.currentThread().getId()+" will sleep");
					Thread.sleep(1500);
					list = exgr.exchange(list);//交换数据
					for(String item:list){
						System.out.println(Thread.currentThread().getId()+":"+item);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();

	}

}

结果:

14 will sleep
14:13 insert A1
13:14 insert B1
13:14 insert B2
13:14 insert B3
14:13 insert A2
相关文章
相关标签/搜索