public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array;
// 添加元素,有锁
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 修改时加锁,保证并发安全
try {
Object[] elements = getArray(); // 当前数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 建立一个新数组,比老的大一个空间
newElements[len] = e; // 要添加的元素放进新数组
setArray(newElements); // 用新数组替换原来的数组
return true;
} finally {
lock.unlock(); // 解锁
}
}
// 读元素,不加锁,所以可能读取到旧数据
public E get(int index) {
return get(getArray(), index);
}
}
复制代码
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 读写共用此锁,线程间经过下面两个Condition通讯
* 这两个Condition和lock有紧密联系(就是lock的方法生成的)
* 相似Object的wait/notify
*/
final ReentrantLock lock;
/** 队列不为空的信号,取数据的线程须要关注 */
private final Condition notEmpty;
/** 队列没满的信号,写数据的线程须要关注 */
private final Condition notFull;
// 一直阻塞直到有东西能够拿出来
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// 在尾部插入一个元素,队列已满时等待指定时间,若是仍是不能插入则返回
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 锁住
try {
// 循环等待直到队列有空闲
while (count == items.length) {
if (nanos <= 0)
return false;// 等待超时,返回
// 暂时放出锁,等待一段时间(可能被提早唤醒并抢到锁,因此须要循环判断条件)
// 这段时间可能其余线程取走了元素,这样就有机会插入了
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);//插入一个元素
return true;
} finally {
lock.unlock(); //解锁
}
}
复制代码
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
// 没有休息,疯狂写入
for (int i = 0; ; i++) {
System.out.println("放入: " + i);
queue.put(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
// 咸鱼模式取数据
while (true) {
System.out.println("取出: " + queue.take());
Thread.sleep((long) (Math.random() * 2000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
/* 输出:
放入: 0
取出: 0
放入: 1
取出: 1
放入: 2
取出: 2
放入: 3
取出: 3
*/
复制代码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, // 核心线程为0,没用的线程都被无情抛弃
Integer.MAX_VALUE, // 最大线程数理论上是无限了,还没到这个值机器资源就被掏空了
60L, TimeUnit.SECONDS, // 闲置线程60秒后销毁
new SynchronousQueue<Runnable>()); // offer时若是没有空闲线程取出任务,则会失败,线程池就会新建一个线程
}
复制代码
欢迎你们关注个人公众号【程序员追风】,文章都会在里面更新,整理的资料也会放在里面。
java