咱们知道线程安全的问题就是出在多个线程同时修改共享变量,不可变对象的策略彻底规避了对对象的修改,因此在多线程中使用必定是线程安全的。java
不可变对象须要知足的条件:数组
下面来复习一下final关键字的做用安全
修饰类:多线程
修饰方法:并发
修饰对象:app
除了final修饰的方法来使对象不可变,还能够用Collections类
中的unmodifiable
为前缀的方法,包括Collection、List、Set、Map等,只需把对应集合的对象传入这个方法这个集合就不容许修改了。ide
一样地,在Guava
中也有相似的方法immutableXXX
能够达到相同的效果。高并发
下面来验证一下测试
@Slf4j
public class ImmutableExample1 {
private static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
map.put(1, 3);
log.info("{}", map.get(1));
}
}
复制代码
运行结果优化
能够看到程序报了一个不支持操做的异常,说明当map通过Collections.unmodifiableMap
方法后就不支持更新操做了。
下面咱们进入Collections.unmodifiableMap
看一下它的实现
/** * Returns an unmodifiable view of the specified map. This method * allows modules to provide users with "read-only" access to internal * maps. Query operations on the returned map "read through" * to the specified map, and attempts to modify the returned * map, whether direct or via its collection views, result in an * <tt>UnsupportedOperationException</tt>.<p> * * The returned map will be serializable if the specified map * is serializable. * * @param <K> the class of the map keys * @param <V> the class of the map values * @param m the map for which an unmodifiable view is to be returned. * @return an unmodifiable view of the specified map. */
public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
return new UnmodifiableMap<>(m);
}
复制代码
能够看到这个方法返回了一个新的不能被修改的map,咱们来看一下这个map的实现。
/** * @serial include */
private static class UnmodifiableMap<K,V> implements Map<K,V>, Serializable {
private static final long serialVersionUID = -1034234728574286014L;
private final Map<? extends K, ? extends V> m;
UnmodifiableMap(Map<? extends K, ? extends V> m) {
if (m==null)
throw new NullPointerException();
this.m = m;
}
public int size() {return m.size();}
public boolean isEmpty() {return m.isEmpty();}
public boolean containsKey(Object key) {return m.containsKey(key);}
public boolean containsValue(Object val) {return m.containsValue(val);}
public V get(Object key) {return m.get(key);}
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
public V remove(Object key) {
throw new UnsupportedOperationException();
}
public void putAll(Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException();
}
public void clear() {
throw new UnsupportedOperationException();
}
private transient Set<K> keySet;
private transient Set<Map.Entry<K,V>> entrySet;
private transient Collection<V> values;
public Set<K> keySet() {
if (keySet==null)
keySet = unmodifiableSet(m.keySet());
return keySet;
}
public Set<Map.Entry<K,V>> entrySet() {
if (entrySet==null)
entrySet = new UnmodifiableEntrySet<>(m.entrySet());
return entrySet;
}
public Collection<V> values() {
if (values==null)
values = unmodifiableCollection(m.values());
return values;
}
public boolean equals(Object o) {return o == this || m.equals(o);}
public int hashCode() {return m.hashCode();}
public String toString() {return m.toString();}
// Override default methods in Map
@Override
@SuppressWarnings("unchecked")
public V getOrDefault(Object k, V defaultValue) {
// Safe cast as we don't change the value
return ((Map<K, V>)m).getOrDefault(k, defaultValue);
}
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
m.forEach(action);
}
@Override
public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
throw new UnsupportedOperationException();
}
@Override
public V putIfAbsent(K key, V value) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException();
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
throw new UnsupportedOperationException();
}
@Override
public V replace(K key, V value) {
throw new UnsupportedOperationException();
}
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
throw new UnsupportedOperationException();
}
@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
throw new UnsupportedOperationException();
}
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
throw new UnsupportedOperationException();
}
@Override
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
throw new UnsupportedOperationException();
}
复制代码
从上面的实现中能够看到UnmodifiableMap
对于不少操做都是直接抛出不支持操做的异常。
Guava
中的immutable
方法也是相似原理。
线程封闭就是把对象封装到一个线程里,只有一个线程能够看到这个对象,这样就算这个对象不是线程安全也不会有线程安全问题。
实现线程封闭主要有三种方式
下面主要来看ThreadLocal线程封闭方法。
ThreadLocal是为每个线程都提供了一个线程内的局部变量,每一个线程只能访问到属于它的副本。
咱们来看一下ThreadLocal的源码中的get和set方法
/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
/** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
复制代码
从源码中能够看出:每个线程拥有一个ThreadLocalMap
,这个map
存储了该线程拥有的全部局部变量。
set
时先经过Thread.currentThread()
获取当前线程,进而获取到当前线程的ThreadLocalMap
,而后以ThreadLocal
本身为key
,要存储的对象为值,存到当前线程的ThreadLocalMap
中。
get
时也是先得到当前线程的ThreadLocalMap
,以ThreadLocal
本身为key
,取出和该线程的局部变量。
一个线程内能够设置多个ThreadLocal
,这样该线程就拥有了多个局部变量。好比当前线程为t1
,在t1
内建立了两个ThreadLocal
分别是tl1
和tl2
,那么t1
的ThreadLocalMap
就有两个键值对。
t1.threadLocals.set(tl1, obj1) // 等价于在t1线程中调用tl1.set(obj1)
t1.threadLocals.set(tl2, obj2) // 等价于在t1线程中调用tl2.set(obj1)
t1.threadLocals.getEntry(tl1) // 等价于在t1线程中调用tl1.get()得到obj1
t1.threadLocals.getEntry(tl2) // 等价于在t1线程中调用tl2.get()得到obj2
复制代码
因为不少常见的容器都是线程不安全的,这就要求开发人员在任何访问到这些容器的地方进行同步处理,致使使用很是不便,所以Java提供了同步容器。
常见的同步容器有如下几种:
ArrayList -> Vector, Stack
HashMap -> HashTable(key,value不能为null)
Collections.synchronizedXXX(List, Set, Map)
注意:同步容器不是绝对的线程安全。
@Slf4j
public class VectorExample1 {
/** * 请求总数 */
public static int clientTotal = 5000;
/** * 同时并发执行线程数 */
public static int threadTotal = 200;
private static Vector<Integer> list = new Vector<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i){
list.add(i);
}
}
复制代码
运行结果
在这里Vector
是线程安全的。
下面来看一个线程不安全的例子
public class VectorExample2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
for (int i = 0; i < 10; i++){
vector.add(i);
}
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++){
vector.remove(i);
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++){
vector.get(i);
}
}
};
thread1.start();
thread2.start();
}
}
复制代码
运行结果
能够看到抛出了数组越界的异常。这是由于thread2
中可能会get
到已经被thread1
移除的下标。
@Slf4j
public class HashTableExample {
/** * 请求总数 */
public static int clientTotal = 5000;
/** * 同时并发执行线程数 */
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new Hashtable<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i){
map.put(i, i);
}
}
复制代码
运行结果
将以前例子中的容器类改为
private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
复制代码
运行结果始终是5000
,线程安全。
将容器换成Set
,Map
也是同样。
public class VectorExample3 {
private static void test1(Vector<Integer> v1) {
for (Integer i : v1) {
if (i.equals(3)){
v1.remove(i);
}
}
}
private static void test2(Vector<Integer> v1) {
Iterator<Integer> iterator = v1.iterator();
while (iterator.hasNext()) {
Integer i = iterator.next();
if (i.equals(3)){
v1.remove(i);
}
}
}
private static void test3(Vector<Integer> v1) {
for (int i = 0; i < v1.size(); i++){
if (v1.get(i).equals(3)) {
v1.remove(i);
}
}
}
public static void main(String[] args) {
Vector<Integer> vector = new Vector<>();
vector.add(1);
vector.add(2);
vector.add(3);
test1(vector);
}
}
复制代码
这里定义了3种对Vector
遍历后删除指定值的方法,依次对每一个方法进行测试。
测试结果:
test1
和test2
都抛出java.util.ConcurrentModificationException
异常
test3
运行正常
下面来看一下异常产生的缘由
从第一个报错处点进去能够看到
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
复制代码
咱们在对一个集合进行遍历操做的同时对它进行了增删的操做,致使了modCount != expectedModCount
从而抛出异常。
所以当咱们用for-each
或迭代器
遍历集合时不要对集合进行更新操做。若是须要对集合进行增删操做,推荐的作法是在遍历过程当中标记好要增删的位置,遍历结束后再进行相关的操做。
核心思想:
相比于ArrayList
,CopyOnWriteArrayList
是线程安全的。
当有新元素添加到CopyOnWriteArrayList
时,它先从原有的数组中拷贝一份出来,而后在新数组上作写操做,写完后再将原有的数组指向到新的数组。CopyOnWriteArrayLis
t的整个add
操做都是在锁的保护下进行的。
缺点:
CopyOnWriteArrayList
只能保证最终的一致性,不能知足实时性的要求。CopyOnWriteArrayList
的读操做都是在原数组上读的,不须要加锁。
下面来coding测试一下
public class CopyOnWriteArrayListExample {
/** * 请求总数 */
public static int clientTotal = 5000;
/** * 同时并发执行线程数 */
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i){
list.add(i);
}
}
复制代码
运行结果始终是5000,线程安全。
下面咱们进入CopyOnWriteArrayList
的add
方法看一下
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
复制代码
能够看到整个方法是加了锁的,添加新元素时是把整个数组复制到一个新的数组中。
HashSet
对应的线程安全类。
底层实现是基于CopyOnWriteArrayList
,所以它符合CopyOnWriteArrayList
的特色和适用场景。
迭代器不支持可变的remove
操做。
TreeSet
对应的线程安全类。
基于Map
集合,在多线程环境下add
、remove
等操做都是线程安全的,可是批量操做如addAll
、removeAll
等并不能保证以原子方式执行。缘由是它们的底层调用的仍是add
、remove
等方法,须要手动作同步操做。
不能存储null
值。
HashMap
的线程安全类。
不能存储null
值。
对读操做作了大量优化,后面会详细介绍。
TreeMap
的线程安全类。
内部使用SkipList
来实现。
key
有序,相比于ConcurrentHashMap
支持更高并发,存取数与线程没有关系,也就是说在相同条件下并发线程越多ConcurrentSkipListMap
优点越大。
Written by Autu
2019.7.19