在Android开发的漫漫长途上的一点感想和记录,若是能给各位看官带来一丝启发或者帮助,那真是极好的。java
上一篇博文中,主要说了些线程以及锁的东西,咱们大多数的并发开发需求,基本上能够用synchronized或者volatile解决,虽然synchronized已经被JDK优化了,但有的时候咱们仍是以为synchronized过重了,node
好比说一个电影院卖票,这个票数是必定的并且共享的,我想尽快的卖票而且知道还有多少余票。在程序员看来这就是个票数自减以及获取最新票数的操做。程序员
private static Long sCount = 10000L; final Object obj = new Object(); //这里开了1000个线程对sCount并发操做 for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { synchronized (obj) { //这里加锁保证同步,使用synchronized总以为没有 //必要,毕竟就是自减操做,若是不使用synchronized又有什么办法呢? sCount--; } } }).start(); } Thread.sleep(5000); System.out.println(sCount);
再有,咱们日常使用的容器类List以及Map,如ArrayList、HashMap这些容器是非线程安全的,那咱们若是须要支持并发的容器,咱们该怎么办呢??读者莫急,这正是本篇分享的内容。算法
咱们先来解决第一个问题,JDK1.5以后为咱们提供了一系列的原子操做类,位于java.util.concurrent.atomic包下。编程
以上3个类提供的方法几乎如出一辙,因此本篇仅以AtomicInteger为例进行讲解,
AtomicInteger的经常使用方法以下。数组
value)相加,并返回结果。安全
那按照上面的知识从新对上面的卖票问题编程以下数据结构
private static AtomicLong sAtomicLong = new AtomicLong(10000L); //这里开了1000个线程对sCount并发操做 for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { sAtomicLong.decrementAndGet(); } }).start(); } Thread.sleep(5000); System.out.println(sAtomicLong);
上面的是原子更新基本类型,那对于对象呢,JDK也提供了原子更新对象引用的原子类多线程
型的标记位和引用类型。构造方法是AtomicMarkableReference(V initialRef,boolean
initialMark)。并发
以上几个类提供的方法几乎同样,因此本节仅以AtomicReference为例进行讲解
boolean compareAndSet(V expect, V update):若是当前对象(调用该函数的对象)等于预期对象(expect),则以原子方式将当前对象(调用该函数的对象)设置为更新的对象(update)。
V get():获取找对象
void set(V newValue):设置对象
V getAndSet(V newValue):以原子方式将当前对象(调用该函数的对象)设置为指定的对象(newValue),并返回原来的对象(设置以前)
那这个东西用在哪里呢,我在著名的Rxjava源码中看到了原子更新对象的用法。
CachedThreadScheduler.java
//原子引用AtomicReference AtomicReference<CachedWorkerPool> pool; static final CachedWorkerPool NONE; static { NONE = new CachedWorkerPool(0, null); NONE.shutdown(); } public CachedThreadScheduler() { this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); } @Override public void start() { CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT); //调用AtomicReference的compareAndSet方法 if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } }
在建立线程调度器的时候把初始的工做线程池更新为新的工做线程池
AtomicReferenceFieldUpdater以原子方式更新一个对象的属性值
AtomicMarkableReference是带有标记的原子更新引用的类,能够有效解决ABA问题,什么是ABA问题,
咱们就以上面的代码为例
假设pool.compareAndSet调用以前,pool内的对象NONE被更新成了update,而后又更新成了NONE,那么在调用pool.compareAndSet的时候仍是会把pool内的对象更新为update,也就是说AtomicReference不关心对象的中间历程,这对于一些以当前对象是否被更改过为判断条件的特殊情境,AtomicReference就不适用了。
因此JDK提供了AtomicMarkableReference
那除了上面的原子更新引用类型以外,JDK还为咱们提供了原子更新数组
经过原子的方式更新数组里的某个元素,Atomic包提供了如下4个类。
方式将数组位置i的元素设置成update值。
以上几个类提供的方法几乎同样,因此本节仅以AtomicIntegerArray为例进行讲解
public class AtomicIntegerArrayTest { static int[] value = new int[]{2, 3}; static AtomicIntegerArray ai = new AtomicIntegerArray(value); public static void main(String[] args) { ai.getAndSet(0, 4); System.out.println(ai.get(0)); System.out.println(value[0]); } }
如下是输出的结果。
4
2
JDK1.8为咱们提供了更快的原子操做基本类LongAdder DouleAdder,
LongAdder的doc部分说明以下
This class is usually preferable to {@link AtomicLong} when
multiple threads update a common sum that is used for purposes such
as collecting statistics, not for fine-grained synchronization
control. Under low update contention, the two classes have similar
characteristics. But under high contention, expected throughput of
this class is significantly higher, at the expense of higher space
consumption
上面那段话翻译过来就是
当咱们的场景是为了统计计数,而不是为了更细粒度的同步控制时,而且是在多线程更新的场景时,LongAdder类比AtomicLong更好用。 在小并发的环境下,论更新的效率,二者都差很少。可是高并发的场景下,LongAdder有着明显更高的吞吐量,可是有着更高的空间复杂度。
从LongAdder的doc文档上咱们就能够知道LongAdder更适用于统计求和场景,而不是细粒度的同步控制。
咱们在开发中遇到比较简单的并发操做像自增自减,求和之类的问题,上一节原子类已经能比较好的解决了,但对于本篇文章来讲只是开胃小菜,下面正菜来喽
ConcurrentLinkedQueue是一个基于链表的无界线程安全队列,它采用先进先出的规则对节点进行排序,咱们添加一个元素的时候,它会添加到队列的尾部;当咱们获取一个元素时,它会返回队列头部的元素。
咱们先来看一下ConcurrentLinkedQueue的类图
ConcurrentLinkedQueue由head节点和tail节点组成,每一个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是经过这个next关联起来,从而组成一张链表结构的队列。默认状况下head节点存储的元素为空,tail节点等于head节点
如下源码来自JDK1.8
public ConcurrentLinkedQueue() { //默认状况下head节点存储的元素为空,tail节点等于head节点,哨兵节点 head = tail = new Node<E>(null); } private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { //设置item值 //这种的设置方式相似于C++的指针,直接操做内存地址, //例如此行代码,就是以CAS的方式把值(item)赋值给当前对象即Node地址偏移itemOffset后的地址 //下面出现的casItem以及casNext也是同理 UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
看完了初始化,咱们来看一下这个线程安全队列的进队和出队方法
public boolean offer(E e) { checkNotNull(e);// 检查,为空直接异常 // 建立新节点,并将e 做为节点的item final Node<E> newNode = new Node<E>(e); // 这里操做比较多,将尾节点tail 赋给变量 t,p for (Node<E> t = tail, p = t;;) { // 并获取q 也就是 tail 的下一个节点 Node<E> q = p.next; // 若是下一个节点是null,说明tail 是处于尾节点上 if (q == null) { // 而后用cas 将下一个节点设置成为新节点 // 这里用cas 操做,若是多线程的状况,总会有一个先执行成功,失败的线程继续执行循环。 // <1> if (p.casNext(null, newNode)) { // 若是p.casNext有个线程成功了,p=newNode // 比较 t (tail) 是否是 最后一个节点 if (p != t) // 若是不等,就利用cas将,尾节点移到最后 // 若是失败了,那么说明有其余线程已经把tail移动过,也是OK的 casTail(t, newNode); return true; } // 若是<1>失败了,说明确定有个线程成功了, // 这时候失败的线程,又会执行for 循环,再次设值,直到成功。 } else if (p == q) // 有可能恰好插入一个,而后P 就被删除了,那么 p==q // 这时候在头结点须要重新定位。 p = (t != (t = tail)) ? t : head; else // 这里是为了当P不是尾节点的时候,将P 移到尾节点,方便下一次插入 // 也就是一直保持向前推动 p = (p != t && t != (t = tail)) ? t : q; } } private boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); }
从上述代码可知入队列过程可概括为3步
1.定位尾节点tail节点并不老是尾节点,因此每次入队都必须先经过tail节点来找到尾节点。
尾节点多是tail节点,也多是tail节点的next节点。2.设置入队节点为尾节点
p.casNext(null, newNode)方法用于将入队节点设置为当前队列尾节点的next节点,若是p是null,
表示p是当前队列的尾节点,若是不为null,表示有其余线程更新了尾节点,则须要从新获取当前队列的尾节点3.更新尾节点
casTail(t, newNode);将尾节点移到最后(即把tail指向新节点)
若是失败了,那么说明有其余线程已经把tail移动过,此时新节点newNode为尾节点,tail为其前驱结点
public E poll() { // 设置起始点 restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 利用cas 将第一个节点设置为null if (item != null && p.casItem(item, null)) { // 和上面相似,p的next被删了, // 而后而后判断一下,目的为了保证head的next不为空 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 有可能已经被另外线程先删除了下一个节点 // 那么须要先设定head 的位置,并返回null updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else // 和offer 相似,保证下一个节点有值,才能删除 p = q; } } }
JDK1.7与JDK1.8 ConcurrentHashMap的实现仍是有不小的区别的
在JDK1.7版本中,ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成。
Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分离技术,而每个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构同样.
1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构以下:
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //① 只有在执行第一次put方法时才会调用initTable()初始化Node数组 tab = initTable(); //② 若是相应位置的Node还未初始化,则经过CAS插入相应的数据; else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //③ 若是相应位置的Node不为空,且当前该节点处于移动状态 帮助转移数据 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //④ 若是相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁, else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //⑤ 若是该节点的hash不小于0,则遍历链表更新节点或插入新节点; if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //⑥ 若是该节点是TreeBin类型的节点,说明是红黑树结构,则经过putTreeVal方法往红黑树中插入节点; else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } /** *若是binCount不为0,说明put操做对数据产生了影响,若是当前链表的个数达到8个, *经过treeifyBin方法转化为红黑树, *若是oldVal不为空,说明是一次更新操做,没有对元素个数产生影响,则直接返回旧值; */ if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
JDK1.8中关于CopyOnWriteArrayList的官方介绍以下
A thread-safe variant of {@link java.util.ArrayList} in which all mutative
operations ({@code add}, {@code set}, and so on)
are implemented bymaking a fresh copy of the underlying array.中文翻译大体是
CopyOnWriteArrayList是一个线程安全的java.util.ArrayList的变体,
add,set等改变CopyOnWriteArrayList的操做是经过制做当前数据的副本实现的
其实意思很简单,假设有一个数组以下所示
多个线程并发读取是没有任何问题的
咱们来看add 源码
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
有了前面的积淀,这段代码能够说没有任何难度
注 写入过程当中,如有其余线程读取数据,那么读取的依然是老数组的数据
由上面的结构以及源码分析就知道CopyOnWriteArrayList用在读多写少的多线程环境中。
本篇分享了一些原子操做类以及并发容器,这些在多线程开发中都颇有做用。但愿帮到你。
Android 并发工具类与线程池
此致,敬礼