多线程与高并发09-并发容器(二)

ConcurrentSkipList系列

  • ConcurrentSkipListMap:跳表实现有序Map
  • ConcurrentSkipListSet:跳表实现有序Set
TreeMap和TreeSet使用 红黑树按照key的顺序(天然顺序、自定义顺序)来使得键值对有序存储,可是只能在单线程下安全使用;多线程下想要使键值对按照key的顺序来存储,则须要使用ConcurrentSkipListMap和ConcurrentSkipListSet,分别用以代替TreeMap和TreeSet,存入的数据按key排序。在实现上,ConcurrentSkipListSet 本质上就是ConcurrentSkipListMap

image.png

了解什么是SkipList

二分查找和AVL树查找

连续数组的局限:二分查找要求元素能够随机访问,因此决定了须要把元素存储在连续内存。这样查找确实很快,可是插入和删除元素的时候,为了保证元素的有序性,就须要大量的移动元素了前端

二叉查找树:若是须要的是一个可以进行二分查找,又能快速添加和删除元素的数据结构,首先就是二叉查找树,二叉查找树在最坏状况下可能变成一个链表java

平衡二叉树:出现了平衡二叉树,根据平衡算法的不一样有AVL树,B-Tree,B+Tree,红黑树等,可是AVL树实现起来比较复杂,平衡操做较难理解,这时候就能够用SkipList跳跃表结构算法

什么是跳表

传统意义的单链表是一个线性结构,向有序的链表中插入一个节点须要O(n)的时间,查找操做须要O(n)的时间编程

image.png

若是咱们使用上图所示的跳跃表,就能够减小查找所需时间为O(n/2),由于咱们能够先经过每一个节点的最上面的指针先进行查找,这样子就能跳过一半的节点后端

好比咱们想查找50,首先和20比较,大于20以后,在和40进行比较,而后在和70进行比较,发现70大于50,说明查找的点在40和50之间,从这个过程当中,咱们能够看出,查找的时候跳过了30数组

跳跃表其实也是一种经过“空间来换取时间”的一个算法,令链表的每一个结点不只记录next结点位置,还能够按照level层级分别记录后继第level个结点。此法使用的就是“先大步查找肯定范围,再逐渐缩小迫近”的思想进行的查找。跳跃表在算法效率上很接近红黑树缓存

跳跃表又被称为几率,或者说是随机化的数据结构,目前开源软件 Redislucence都有用到它安全

都是线程安全的Map实现,ConcurrentHashMap的性能和存储空间要优于ConcurrentSkipListMap,可是ConcurrentSkipListMap有一个功能:它会按照键的顺序进行排序数据结构

ConcurrentLinkedQueue

无界非阻塞队列:它是一个基于链表的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最早加入的,尾是最近加入的。插入元素是追加到尾上。提取一个元素是从头提取多线程

你们能够当作是LinkedList的并发版本,经常使用方法:

  • concurrentLinkedQueue.add("c")
  • concurrentLinkedQueue.offer("d"):将指定元素插入到此队列的尾部
  • concurrentLinkedQueue.peek():检索并不移除此队列的头,若是此队列为空,则返回 null
  • concurrentLinkedQueue.poll():检索并移除此队列的头,若是此队列为空,则返回 null

写时复制容器

什么是写时复制容器

CopyOnWriteArrayList和CopyOnWriteArraySet

CopyOnWrite:容器即写时复制的容器。通俗的理解是当咱们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,而后新的容器里添加元素,添加完元素以后,再将原容器的引用指向新的容器

这样作的好处是咱们能够对CopyOnWrite容器进行并发的读,而不须要加锁,由于当前容器不会添加任何元素。因此CopyOnWrite容器也是一种读写分离的思想,读和写不一样的容器。若是读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读仍是会读到旧的数据,由于写的时候不会锁住旧的CopyOnWriteArrayList

CopyOnWrite并发容器用于对于(读多写少)绝大部分访问都是读,且只是偶尔写的并发场景。好比白名单,黑名单,商品类目的访问和更新场景,假如咱们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,可是某些关键字不容许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单天天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,若是在,则提示不能搜索

使用CopyOnWriteMap须要注意两件事情:

  • 减小扩容开销。根据实际须要,初始化CopyOnWriteMap的大小,避免写时扩容的开销
  • 使用批量添加。由于每次添加,容器每次都会进行复制,因此减小添加次数,能够减小容器的复制次数

写时复制容器的问题

性能问题

每次修改都建立一个新数组,而后复制全部内容,若是数组比较大,修改操做又比较频繁,能够想象,性能是很低的,并且内存开销会很大

数据一致性问题

CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。因此若是你但愿写入的的数据,立刻能读到,不要使用CopyOnWrite容器

阻塞队列BlockingQueue


队列

image.png

队列是一种特殊的线性表,特殊之处在于它只容许在表的前端(front)进行删除操做,而在表的后端(rear)进行插入操做,和栈同样,队列是一种操做受限制的线性表。进行插入操做的端称为队尾,进行删除操做的端称为队头

在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。由于队列只容许在一端插入,在另外一端删除,因此只有最先进入队列的元素才能最早从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表

什么是阻塞队列

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序总体处理数据的速度

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者

为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是经过阻塞队列来进行通讯,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力

阻塞队列经常使用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器
image.png

  • 抛出异常:当队列满时,若是再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。若是是移除方法,则是从队列里取出一个元素,若是没有则返回null
  • 一直阻塞:当阻塞队列满时,若是生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,若是消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空
  • 超时退出:当阻塞队列满时,若是生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,若是超过了指定的时间,生产者线程就会退出

经常使用阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:一个由链表结构组成的无界阻塞队列
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
  • DelayQueue:一个使用优先级队列实现执行定时任务的无界阻塞队列
  • SynchronousQueue:一个不存储元素的阻塞队列
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的

有界与无界

有限队列就是长度有限,满了之后生产者会阻塞,无界队列就是里面能放无数的东西而不会由于队列长度限制被阻塞,固然空间限制来源于系统资源的限制,若是处理不及时,致使队列愈来愈大愈来愈大,超出必定的限制导致内存超限,操做系统或者JVM帮你解决烦恼,直接把你 OOM kill 省事了

无界也会阻塞,为什么?

由于阻塞不只仅体如今生产者放入元素时会阻塞,消费者拿取元素时,若是没有元素,一样也会阻塞

ArrayBlockingQueue

是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,能够按照阻塞的前后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程均可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数能够设置
示例代码:

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class T06_ArrayBlockingQueue {

    static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

    static Random r = new Random();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strs.put("a" + i);
        }
        
        strs.put("aaa"); //满了就会等待,程序阻塞
        strs.add("aaa"); //满了就会抛出异常 full queue
        strs.offer("aaa"); //满了会当即返回false
        strs.offer("aaa", 10, TimeUnit.SECONDS); //满了等待10秒,不成功返回false
        System.out.println(strs);
    }
}

LinkedBlockingQueue

是一个用链表实现的无界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
示例代码:

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class T05_LinkedBlockingQueue {

    static BlockingQueue<String> strs = new LinkedBlockingQueue<>();

    static Random r = new Random();

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    strs.put("a" + i); //若是满了,就会等待
                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "p1").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //若是空了,就会等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "c" + i).start();

        }
    }
}

运行结果:

c0 take -a0
c1 take -a1
c2 take -a2
c3 take -a3
c4 take -a4
c0 take -a5
c1 take -a6
c2 take -a7
c3 take -a8
c4 take -a9

Array实现和Linked实现的区别

队列中锁的实现不一样

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁

LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock

在生产或消费时操做不一样

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将对象插入或移除的

LinkedBlockingQueue实现的队列中在生产和消费的时候,须要把对象转换为Node<E>进行插入或移除(由于须要增长链表指针),会影响性能

队列大小初始化方式不一样

ArrayBlockingQueue实现的队列中必须指定队列的大小

LinkedBlockingQueue实现的队列中能够不指定队列的大小,可是默认是Integer.MAX_VALUE

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认状况下元素采起天然顺序升序排列。也能够自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。须要注意的是不能保证同优先级元素的顺序
示例代码:

import java.util.PriorityQueue;

public class T07_01_PriorityQueque {
    public static void main(String[] args) {
        PriorityQueue<String> q = new PriorityQueue<>();

        q.add("c");
        q.add("e");
        q.add("a");
        q.add("d");
        q.add("z");

        for (int i = 0; i < 5; i++) {
            System.out.println(q.poll());
        }

    }
}

运行结果:

a
c
d
e
z

DelayQueue

是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素

DelayQueue很是有用,能够将DelayQueue运用在如下应用场景:

缓存系统的设计能够用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。还有订单到期,限时支付等等
示例代码:

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class T07_DelayQueue {

    static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    static Random r = new Random();
    
    static class MyTask implements Delayed {
        String name;
        long runningTime;
        
        MyTask(String name, long rt) {
            this.name = name;
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                return 1;
            else 
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        
        @Override
        public String toString() {
            return name + " " + runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask("t1", now + 1000);
        MyTask t2 = new MyTask("t2", now + 2000);
        MyTask t3 = new MyTask("t3", now + 1500);
        MyTask t4 = new MyTask("t4", now + 2500);
        MyTask t5 = new MyTask("t5", now + 500);
        
        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);
        
        System.out.println(tasks);
        
        for(int i=0; i<5; i++) {
            System.out.println(tasks.take());
        }
    }
}

运行结果:

[t5 1586608841726, t1 1586608842226, t3 1586608842726, t4 1586608843726, t2 1586608843226]
t5 1586608841726
t1 1586608842226
t3 1586608842726
t2 1586608843226
t4 1586608843726

SynchronousQueue

是一个不存储元素的阻塞队列。每个put操做必须等待一个take操做,不然不能继续添加元素。SynchronousQueue能够当作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列自己并不存储任何元素,很是适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueueArrayBlockingQueue
示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class T08_SynchronusQueue { //容量为0
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strs = new SynchronousQueue<>();
        
        new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        strs.put("aaa"); //阻塞等待消费者消费
        //strs.put("bbb");
        //strs.add("aaa");
        System.out.println(strs.size());
    }
}

运行结果:

aaa
0

LinkedTransferQueue

多了tryTransfer和transfer方法

  • transfer方法

若是当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法能够把生产者传入的元素马上transfer(传输)给消费者。若是没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回

  • tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。若是没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法不管消费者是否接收,方法当即返回,而transfer方法是必须等到消费者消费了才返回
示例代码:

package com.mashibing.juc.c_025;

import java.util.concurrent.LinkedTransferQueue;

public class T09_TransferQueue {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
        
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() +":"+ strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        strs.transfer("aaa");
        
        strs.put("bbb");


        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() +":"+strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


    }
}

运行结果:

Thread-0:aaa
Thread-1:aaa

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是能够从队列的两端插入和移出元素。双向队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争

多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。可是take方法却等同于takeFirst,不知道是否是JDK的bug,使用时仍是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时能够设置容量防止其过分膨胀。另外,双向阻塞队列能够运用在“工做窃取”(ForkJoinPool)模式中

了解阻塞队列的实现原理

使用了wait/notify模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。经过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现。其他队列的实现,你们能够自行查看,队列的实现的代码整体来讲,并不复杂

相关文章
相关标签/搜索