【Java并发编程实战】– 修改锁的公平性 lock_3

1、概述

一、fairjava

ReentrantLock 和 ReentrantReadWriterLock 类的 构造器都含有一个布尔参数 fair,这个参数能够容许你控制这两个类的行为。多线程

默认 fair 值是 false; 称之为 非公平模式(Non-Fair-Mode),在 非公平模式下,有不少线程组 等待锁(ReentrantLock 和 ReentrantReadWriterLock)时,锁将选择它们中的一个来访问临界区,这个选择是没有任何约束的。ide

若 fair 值是 true,则称为公平模式(Fair Mode),在公平模式下,有不少线程组 等待锁(ReentrantLock 和 ReentrantReadWriterLock)时,锁将选择它们中的一个来访问临界区,并且选择的是 等待时间最长的。this

上面的 fair 2种模式只适用 lock() 和 unlock() 方法。而 Lock 接口的 tryLock() 方法没有将线程置于休眠,fair 属性并不影响这个方法。spa

二、Condition线程

Condition 将 Object的通讯方法(wait、notify 和 notifyAll)分解成大相径庭的对象,以便经过将这些对象与任意 Lock 实现组合使用,为每一个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 通讯方法的使用。code

在Condition中,用 await() 替换wait(),用 signal() 替换 notify(),用 signalAll()替换 notifyAll(),传统线程的通讯方式,Condition均可以实现,这里注意,Condition是被绑定到Lock上的,要建立一个Lock的Condition必须用 newCondition() 方法。对象

Condition 的强大之处在于它能够为多个线程间创建不一样的 Condition, 使用 synchronized/wait() 只有一个阻塞队列,notifyAll会唤起全部阻塞队列下的线程,而使用 lock/condition,能够实现多个阻塞队列,signalAll 只会唤起某个阻塞队列下的阻塞线程。接口

一个锁可能关联一个或者多个条件,这些条件经过 Condition 接口声明。目的是容许线程 获取锁而且查看等待的某一个条件是否知足,若是不知足就挂起直到某个线程唤醒它们。队列

2、实现

import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用lock/condition实现生产者消费者模式
 * Condition 
 * Condition 将 Object的通讯方法(wait、notify 和 notifyAll)分解成大相径庭的对象,
 * 以便经过将这些对象与任意 Lock 实现组合使用,为每一个对象提供多个等待 set (wait-set)。
 * 其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 通讯方法的使用。
 */
public class Buffer {

	private Lock lock;
    private Condition notFull;
    private Condition notEmpty;
    private int maxSize;
    private List<Date> storage;
    public Buffer(int size){
        //使用锁lock,而且建立两个condition,至关于两个阻塞队列
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
        maxSize = size;
        storage = new LinkedList<>();
    }
    public void put()  {
        lock.lock();
        try {   
            while (storage.size() == maxSize ){//若是队列满了
                System.out.print(Thread.currentThread().getName()+": wait \n");;
                notFull.await(); // 阻塞生产线程  
            }
            storage.add(new Date());
            System.out.print(Thread.currentThread().getName()+": put:"+storage.size()+ "\n");
            Thread.sleep(1000);         
            //当生产者执行put方法时,调用 notEmpty.signalAll()只会唤醒  notEmpty.await()下的消费者线程。 
            notEmpty.signalAll(); // 唤醒消费线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{   
            lock.unlock();
        }
    }

    public  void take() {       
        lock.lock();
        try {  
            while (storage.size() == 0 ){//若是队列满了
                System.out.print(Thread.currentThread().getName()+": wait \n");;
                notEmpty.await(); // 阻塞消费线程
            }
            ((LinkedList<Date>)storage).poll();
            System.out.print(Thread.currentThread().getName()+": take:"+storage.size()+ "\n");
            Thread.sleep(1000); 
            //当消费者执行塔克方法时,调用notFull.signalAll()只会唤醒notFull.await()下的消费者线程。
            notFull.signalAll(); // 唤醒生产线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
            lock.unlock();
        }
    }
	
}
import java.util.concurrent.TimeUnit;

/**
 * 生产者
 */
public class Producer implements Runnable{

	private Buffer buffer;
	
	public Producer(Buffer buffer) {
		this.buffer = buffer;
	}
	
	@Override
	public void run() {
	    while(true){
	    	buffer.put();
	    	try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
	    }	
	}
}
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class Consumer implements Runnable{

	private Buffer buffer;
	
	public Consumer(Buffer buffer) {
		this.buffer = buffer;
	}
	
	@Override
	public void run() {
		while(true){
			buffer.take();
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
public class BufferTest {

	public static void main(String[] args) {
		Buffer buffer = new Buffer(5);
		Producer producer = new Producer(buffer);
		Consumer consumer = new Consumer(buffer);
		for(int i=0;i<3;i++){
			Thread producerThread = new Thread(producer, "producer_" + i);
			producerThread.start();
		}
		
		for(int i=0;i<5;i++){
			Thread consumerThread = new Thread(consumer, "consumer_" + i);
			consumerThread.start();
		}
	}
	
}
相关文章
相关标签/搜索