特色:控制每次执行的线程数,达到控制线程并发的效果java
package com.zhiwei.thread; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * 信号灯:当线程空闲时自动去执行阻塞的线程,实现运行最优化 */ public class SemaphoreTest { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); // 定义信号灯,一次最多能处理3个线程 Semaphore sp = new Semaphore(3); for (int i = 0; i < 20; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { // 获取信号,信号灯处理阻塞线程,最多容许3个线程访问 sp.acquire(); System.out.println(Thread.currentThread().getName() + ":进入信号灯,还有" + sp.availablePermits() + "个信号"); Thread.sleep(new Random().nextInt(2000)); // 回收信号灯信号,供别的线程使用 sp.release(); System.out.println(Thread.currentThread().getName() + ":离开信号灯,还有" + sp.availablePermits() + "个信号"); } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
效果:缓存
做用:控制线程运行的任务总量同步,例如等待全部人完成工做才能够下班安全
测试代码多线程
package com.zhiwei.thread; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { // 思想:只有各个子任务都完成了采起执行下一步,若是有线程提早完成则等待 ExecutorService threadPool = Executors.newCachedThreadPool(); // 规定总的任务量:只有所有完成才会进行下一步处理 CyclicBarrier cb = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + ":完成分任务,剩余任务:" + (2 - cb.getNumberWaiting())); //若是前面2个线程阻塞 + 正在运行的线程 = 3,代表总任务完成 if (cb.getNumberWaiting() == 2) { System.out.println("恭喜,总任务已完成!"); } //分任务完成则等待,直到搜索的任务都完成,才执行await后面的代码 cb.await(); } catch (Exception e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
效果:并发
CountDownLatch:可理解为全部线程都就绪以后就一块儿执行,相似旅游跟团,只有全部人都到了才能够触发dom
测试代码:ide
package com.zhiwei.thread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchTest { public static void main(String[] args) { // 缓存线程池:自动建立线程执行任务,若是线程执行完成任务则保存,供下次使用,若是线程不够则动态建立 ExecutorService threadPool = Executors.newCachedThreadPool(); // 表示将完成3个任务量:任务计数器:多线程完成一些列操做 CountDownLatch ct = new CountDownLatch(3); for (int i = 0; i < 3; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); ct.countDown(); // 减1 System.out.println(Thread.currentThread().getName() + "准备分任务,剩余任务:" + ct.getCount()); // 若是ct计数器不为0则阻塞,为0 则一块儿执行 ct.await(); System.out.println(Thread.currentThread().getName() + "完成分任务"); } catch (Exception e) { e.printStackTrace(); } } }); } if (ct.getCount() == 0) { System.out.println("恭喜,总任务已完成!"); } threadPool.shutdown(); } }
效果:测试
ArrayBlockingQueue: JDK内部提供的阻塞队列,可以保证线程安全优化
主要同步方法:ui
put方法
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
测试代码:
package com.zhiwei.thread; import java.util.concurrent.ArrayBlockingQueue; /** * 阻塞队列:可用于处理生产消费的问题 * * 实现机制:put/take利用重入锁ReentrantLock实现同步效果 */ public class ArrayBlockingQueueTest { public static void main(String[] args) { ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(3); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(1000); abq.put("Hello Java World"); System.out.println(Thread.currentThread().getName()+":放入数据,剩余数据:"+abq.size()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep(10000); abq.take(); System.out.println(Thread.currentThread().getName()+":取出数据,剩余数据:"+abq.size()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } }
效果:
做用:两个线程之间交换数据,不过要两个线程都先拿出数据,而后才能进行数据交换
测试代码
package com.zhiwei.thread; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** *特色: 必定要等双方将数据都拿出来后才能交换(只能是两个线程) * @author Yang ZhiWei * */ public class ExchangerTest { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); Exchanger<String> exchanger = new Exchanger<String>(); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+":交换数据:"+Thread.currentThread().getName()); try { String getDate = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName()+":收到数据:"+getDate); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"交换数据:"+Thread.currentThread().getName()); try { String getDate = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName()+"收到数据:"+getDate); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.shutdown(); } }
效果: