【Thread】- JUC 5种线程同步工具

Semaphore:信号灯

特色:控制每次执行的线程数,达到控制线程并发的效果java

  • 测试代码
package com.zhiwei.thread;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 信号灯:当线程空闲时自动去执行阻塞的线程,实现运行最优化
 */
public class SemaphoreTest {

public static void main(String[] args) {

    ExecutorService threadPool = Executors.newCachedThreadPool();

    // 定义信号灯,一次最多能处理3个线程
    Semaphore sp = new Semaphore(3); 

    for (int i = 0; i < 20; i++) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    
                    // 获取信号,信号灯处理阻塞线程,最多容许3个线程访问
                    sp.acquire(); 
                    
                    System.out.println(Thread.currentThread().getName() + ":进入信号灯,还有" +  sp.availablePermits() + "个信号");
                    
                    Thread.sleep(new Random().nextInt(2000));
                    
                    // 回收信号灯信号,供别的线程使用
                    sp.release();
                    System.out.println(Thread.currentThread().getName() + ":离开信号灯,还有" +  sp.availablePermits() + "个信号");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    threadPool.shutdown();
}
}

效果:缓存


CyclicBarrier

做用:控制线程运行的任务总量同步,例如等待全部人完成工做才能够下班安全

测试代码多线程

package com.zhiwei.thread;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {

	public static void main(String[] args) {
		
    // 思想:只有各个子任务都完成了采起执行下一步,若是有线程提早完成则等待
    ExecutorService threadPool = Executors.newCachedThreadPool();
    
    // 规定总的任务量:只有所有完成才会进行下一步处理
    CyclicBarrier cb = new CyclicBarrier(3); 

    for (int i = 0; i < 3; i++) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep((long) (Math.random() * 10000));
                    System.out.println(Thread.currentThread().getName() + ":完成分任务,剩余任务:" + (2 - cb.getNumberWaiting()));
                    
                    //若是前面2个线程阻塞 + 正在运行的线程 = 3,代表总任务完成
                    if (cb.getNumberWaiting() == 2) {
                        System.out.println("恭喜,总任务已完成!");
                    }

                    //分任务完成则等待,直到搜索的任务都完成,才执行await后面的代码
                    cb.await(); 
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    threadPool.shutdown();
}
}

效果:并发


CountDownLatch 发令枪

CountDownLatch:可理解为全部线程都就绪以后就一块儿执行,相似旅游跟团,只有全部人都到了才能够触发dom

测试代码ide

package com.zhiwei.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {

	public static void main(String[] args) {

    // 缓存线程池:自动建立线程执行任务,若是线程执行完成任务则保存,供下次使用,若是线程不够则动态建立
    ExecutorService threadPool = Executors.newCachedThreadPool();

    // 表示将完成3个任务量:任务计数器:多线程完成一些列操做
    CountDownLatch ct = new CountDownLatch(3);

    for (int i = 0; i < 3; i++) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {

                    Thread.sleep((long) (Math.random() * 10000));
                    ct.countDown(); // 减1
 					System.out.println(Thread.currentThread().getName() + "准备分任务,剩余任务:" + ct.getCount());
                   

                    // 若是ct计数器不为0则阻塞,为0 则一块儿执行
                    ct.await(); 
                     System.out.println(Thread.currentThread().getName() + "完成分任务");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
	if (ct.getCount() == 0) {
                        System.out.println("恭喜,总任务已完成!");
                    }
    threadPool.shutdown();
}
}

效果:测试


ArrayBlockingQueue

ArrayBlockingQueue: JDK内部提供的阻塞队列,可以保证线程安全优化

主要同步方法:ui

put方法

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

take方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

测试代码:

package com.zhiwei.thread;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * 阻塞队列:可用于处理生产消费的问题
 * 
 * 实现机制:put/take利用重入锁ReentrantLock实现同步效果
 */
public class ArrayBlockingQueueTest {

	public static void main(String[] args) {
		
        ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(3);

        new Thread(new Runnable() {
	
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep(1000);
						abq.put("Hello Java World");
						System.out.println(Thread.currentThread().getName()+":放入数据,剩余数据:"+abq.size());
					} catch (Exception e) {
						e.printStackTrace();
					}	
				}
			}
		}).start();
        
        new Thread(new Runnable() {
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep(10000);
						abq.take();
						System.out.println(Thread.currentThread().getName()+":取出数据,剩余数据:"+abq.size());
					} catch (Exception e) {
						e.printStackTrace();
					}	
				}
			}
		}).start();
	}
}

效果:


Exchanger

做用:两个线程之间交换数据,不过要两个线程都先拿出数据,而后才能进行数据交换

测试代码

package com.zhiwei.thread;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 *特色: 必定要等双方将数据都拿出来后才能交换(只能是两个线程)
 * @author Yang ZhiWei
 *
 */
public class ExchangerTest {

public static void main(String[] args) {
    
    ExecutorService threadPool = Executors.newCachedThreadPool();
    
    Exchanger<String> exchanger = new Exchanger<String>();
    
    threadPool.execute(new Runnable() {
        @Override
        public void run() {

            System.out.println(Thread.currentThread().getName()+":交换数据:"+Thread.currentThread().getName());
            
            try {
                String getDate = exchanger.exchange(Thread.currentThread().getName());
                
                System.out.println(Thread.currentThread().getName()+":收到数据:"+getDate);
            
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    
    threadPool.execute(new Runnable() {
        @Override
        public void run() {
            
            System.out.println(Thread.currentThread().getName()+"交换数据:"+Thread.currentThread().getName());
            try {
                
                String getDate = exchanger.exchange(Thread.currentThread().getName());
                
                System.out.println(Thread.currentThread().getName()+"收到数据:"+getDate);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    threadPool.shutdown();
}
}

效果:

相关文章
相关标签/搜索