线程编程一直是老生常谈的问题,在Java中,随着JDK的逐渐发展,JDK提供给咱们的并发模型也愈来愈多,本文摘取三例使用不一样原理的模型,分析其大体原理。node
cow是copy-on-write的简写,这种模型来源于linux系统fork命令,Java中一种使用cow模型来实现的并发类是CopyOnWriteArrayList。相比于Vector,它的读操做是无需加锁的:linux
1
2
3
|
public
E get(
int
index) {
return
(E) elements[index];
}
|
之因此有如此神奇功效,其采起的是空间换取时间的方法,查看其add方法:算法
1
2
3
4
5
6
7
|
public
synchronized
boolean
add(E e) {
Object[] newElements =
new
Object[elements.length +
1
];
System.arraycopy(elements,
0
, newElements,
0
, elements.length);
newElements[elements.length] = e;
elements = newElements;
return
true
;
}
|
咱们注意到,CopyOnWriteArrayList的add方法是须要加锁的,但其内部并无直接对elements数组作操做,而是先copy一份当前的数据到一个新的数组,而后对新的数组进行赋值操做。这样作就让get操做从同步中解脱出来。由于更改的数据并无发生在get所需的数组中。而是放生在新生成的副本中,因此不须要同步。但应该注意的是,尽管如此,get操做仍是可能会读取到脏数据的。编程
CopyOnWriteArrayList的另外一特色是容许多线程遍历,且其它线程更改数据并不会致使遍历线程抛出ConcurrentModificationException
异常,来看下iterator()
,数组
1
2
3
4
|
public
Iterator<E> iterator() {
Object[] snapshot = elements;
return
new
CowIterator<E>(snapshot,
0
, snapshot.length);
}
|
这个CowIterator 是 ListIterator的子类,这个Iterator的特色是它并不支持对数据的更改操做:安全
1
2
3
4
5
6
7
8
9
|
public
void
add(E object) {
throw
new
UnsupportedOperationException();
}
public
void
remove() {
throw
new
UnsupportedOperationException();
}
public
void
set(E object) {
throw
new
UnsupportedOperationException();
}
|
这样作的缘由也很容易理解,咱们能够简单地的认为CowIterator中的snapshot是不可变数组,由于list中有数据更新都会生成新数组,而不会改变snapshot, 因此此时Iterator没办法再将更改的数据写回list了。同理,list数据有更新也不会反映在CowIterator中。CowIterator只是保证其迭代过程不会发生异常。微信
CAS是Compare and Swap的简写,即比较与替换,CAS造做将比较和替换封装为一组原子操做,不会被外部打断。这种原子操做的保证每每由处理器层面提供支持。多线程
在Java中有一个很是神奇的Unsafe类来对CAS提供语言层面的接口。但类如其名,此等神器若是使用不当,会形成武功尽失的,因此Unsafe不对外开放,想使用的话须要经过反射等技巧。这里不对其作展开。介绍它的缘由是由于它是JDK1.8中ConcurrentHashMap的实现基础。并发
ConcurrentHashMap
与HashMap
对数据的存储有着类似的地方,都采用数组+链表+红黑树的方式。基本逻辑是内部使用Node来保存map中的一项key, value结构,对于hash不冲突的key,使用数组来保存Node数据,而每一项Node都是一个链表,用来保存hash冲突的Node,当链表的大小达到必定程度会转为红黑树,这样会使在冲突数据较多时也会有比较好的查询效率。函数
了解了ConcurrentHashMap
的存储结构后,咱们来看下在这种结构下,ConcurrentHashMap
是如何实现高效的并发操做,这得益于ConcurrentHashMap
中的以下三个函数。
1
2
3
4
5
6
7
8
9
10
|
static
final
<K,V> Node<K,V> tabAt(Node<K,V>[] tab,
int
i) {
return
(Node<K,V>)U.getObjectVolatile(tab, ((
long
)i << ASHIFT) + ABASE);
}
static
final
<K,V>
boolean
casTabAt(Node<K,V>[] tab,
int
i,
Node<K,V> c, Node<K,V> v) {
return
U.compareAndSwapObject(tab, ((
long
)i << ASHIFT) + ABASE, c, v);
}
static
final
<K,V>
void
setTabAt(Node<K,V>[] tab,
int
i, Node<K,V> v) {
U.putOrderedObject(tab, ((
long
)i << ASHIFT) + ABASE, v);
}
|
其中的U就是咱们前文提到的Unsafe的一个实例,这三个函数都经过Unsafe的几个方法保证了是原子性:
有了这三个函数就能够保证ConcurrentHashMap
的线程安全吗?并非的,ConcurrentHashMap
内部也使用比较多的synchronized,不过与HashTable这种对全部操做都使用synchronized不一样,ConcurrentHashMap
只在特定的状况下使用synchronized,来较少锁的定的区域。来看下putVal方法(精简版):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
final
V putVal(K key, V value,
boolean
onlyIfAbsent) {
if
(key ==
null
|| value ==
null
)
throw
new
NullPointerException();
int
hash = spread(key.hashCode());
int
binCount =
0
;
for
(Node<K,V>[] tab = table;;) {
Node<K,V> f;
int
n, i, fh;
if
(tab ==
null
|| (n = tab.length) ==
0
)
tab = initTable();
else
if
((f = tabAt(tab, i = (n -
1
) & hash)) ==
null
) {
if
(casTabAt(tab, i,
null
,
new
Node<K,V>(hash, key, value,
null
)))
break
;
// no lock when adding to embin
}
else
if
((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else
{
V oldVal =
null
;
synchronized
(f) {
....
}
}
}
addCount(1L, binCount);
return
null
;
}
|
整个put流程大体以下:
ConcurrentHashMap
是能够多线程同时扩容的。这里说协助的缘由在于,对于数组扩容,通常分为两步:1.新建一个更大的数组;2.将原数组数据copy到新数组中。对于第一步,ConcurrentHashMap
经过CAW来控制一个int变量保证新建数组这一步只会执行一次。对于第二步,ConcurrentHashMap
采用CAW + synchronized + 移动后标记 的方式来达到多线程扩容的目的。感兴趣能够查看transfer
函数。黑科技
的流程已尝试无效,目标Node已经存在值,只能锁住当前Node来进行put操做,固然,这里省略了不少代码,包括链表转红黑树的操做等等。相比于put,get的代码更好理解一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public
V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p;
int
n, eh; K ek;
int
h = spread(key.hashCode());
if
((tab = table) !=
null
&& (n = tab.length) >
0
&&
(e = tabAt(tab, (n -
1
) & h)) !=
null
) {
if
((eh = e.hash) == h) {
if
((ek = e.key) == key || (ek !=
null
&& key.equals(ek)))
return
e.val;
}
else
if
(eh <
0
)
return
(p = e.find(h, key)) !=
null
? p.val :
null
;
while
((e = e.next) !=
null
) {
if
(e.hash == h &&
((ek = e.key) == key || (ek !=
null
&& key.equals(ek))))
return
e.val;
}
}
return
null
;
}
|
还有一种实现线程安全的方式是经过将读写进行分离,这种方式的一种实现是LinkedBlockingQueue
。LinkedBlockingQueue
总体设计的也十分精巧,它的全局变量分为三类:
final型变量因为声明后就不会被修改,因此天然线程安全,Atomic型内部采用了cas模型来保证线程安全。对于普通型变量,LinkedBlockingQueue
中只包含head与last两个表示队列的头与尾。而且私有,外部没法更改,因此,LinkedBlockingQueue
只须要保证head与last的安全便可保证真个队列的线程安全。而且LinkedBlockingQueue
属于FIFO型队列,通常状况下,读写会在不一样元素上工做,因此, LinkedBlockingQueue
定义了两个可重入锁,巧妙的经过对head与last分别加锁,实现读写分离,来实现良好的安全并发特性:
1
2
3
4
5
6
7
8
|
/** Lock held by take, poll, etc */
private
final
ReentrantLock takeLock =
new
ReentrantLock();
/** Wait queue for waiting takes */
private
final
Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private
final
ReentrantLock putLock =
new
ReentrantLock();
/** Wait queue for waiting puts */
private
final
Condition notFull = putLock.newCondition();
|
首先看下它的offer 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public
boolean
offer(E e) {
if
(e ==
null
)
throw
new
NullPointerException();
final
AtomicInteger count =
this
.count;
if
(count.get() == capacity)
return
false
;
int
c = -
1
;
Node<E> node =
new
Node<E>(e);
final
ReentrantLock putLock =
this
.putLock;
putLock.lock();
try
{
if
(count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if
(c +
1
< capacity)
notFull.signal();
}
}
finally
{
putLock.unlock();
}
if
(c ==
0
)
signalNotEmpty();
return
c >=
0
;
}
|
可见,在对队列进行添加元素时,只须要对putLock进行加锁便可,保证同一时刻只有一个线程能够对last进行插入。一样的,在从队列进行提取元素时,也只须要获取takeLock锁来对head操做便可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
E poll() {
final
AtomicInteger count =
this
.count;
if
(count.get() ==
0
)
return
null
;
E x =
null
;
int
c = -
1
;
final
ReentrantLock takeLock =
this
.takeLock;
takeLock.lock();
try
{
if
(count.get() >
0
) {
x = dequeue();
c = count.getAndDecrement();
if
(c >
1
)
notEmpty.signal();
}
}
finally
{
takeLock.unlock();
}
if
(c == capacity)
signalNotFull();
return
x;
}
|
LinkedBlockingQueue
总体仍是比较好理解的,但有几个点须要特殊注意:
LinkedBlockingQueue
是一个阻塞队列,当队列无元素为空时,全部取元素的线程会经过notEmpty 的await()方法进行等待,直到再次有数据enqueue时,notEmpty发出signal信号。对于队列达到上限时也是同理。