java concurrent包介绍及使用

说一说java的concurrent包1-concurrent包简介

前面一个系列的文章都在围绕hash展开,今天准备先说下concurrent包,这个系列可能会以使用场景说明为主,concurrent包自己的代码分析可能比较少; 我在这方面的实践经验较为有限,有错误欢迎批评指正 html

不过前一个系列并未结束,还有一些文章没有放出来,欢迎关注核桃博客 

concurrent包是jdk1.5引入的重要的包,主要代码由大牛Doug Lea完成,实际上是在jdk1.4时代,因为java语言内置对多线程编程的支持比较基础和有限,因此他写了这个,由于实在太过于优秀,因此被加入到jdk之中; 

一般所说的concurrent包基本有3个package组成 
java.util.concurrent:提供大部分关于并发的接口和类,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等 
java.util.concurrent.atomic:提供全部原子操做的类, 如AtomicInteger, AtomicLong等; 
java.util.concurrent.locks:提供锁相关的类, 如Lock, ReentrantLock, ReadWriteLock, Condition等; 

concurrent包的优势: 
1. 首先,功能很是丰富,诸如线程池(ThreadPoolExecutor),CountDownLatch等并发编程中须要的类已经有现成的实现,不须要本身去实现一套; 毕竟jdk1.4对多线程编程的主要支持几乎就只有Thread, Runnable,synchronized等 

2. concurrent包里面的一些操做是基于硬件级别的CAS(compare and swap),就是在cpu级别提供了原子操做,简单的说就能够提供无阻塞、无锁定的算法; 而现代cpu大部分都是支持这样的算法的;java

说一说java的concurrent包2-等待多个线程完成执行的CountDownLatch 

前面一篇说了concurrent包的基本结构,接下来首先看一下一个很是有用的类,CountDownLatch, 能够用来在一个线程中等待多个线程完成任务的类; 算法

前面一篇说了concurrent包的基本结构,接下来首先看一下一个很是有用的类,CountDownLatch, 能够用来在一个线程中等待多个线程完成任务的类; 
一般的使用场景是,某个主线程接到一个任务,起了n个子线程去完成,可是主线程须要等待这n个子线程都完成任务了之后才开始执行某个操做; 

下面是一段演示代码 

Java代码  编程

@Test  
public void demoCountDown()  
{  
    int count = 10;  
  
    final CountDownLatch l = new CountDownLatch(count);  
    for(int i = 0; i < count; ++i)  
    {  
        final int index = i;  
        new Thread(new Runnable() {  
  
            @Override  
            public void run() {  
  
                try {  
                    Thread.currentThread().sleep(20 * 1000);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
  
                System.out.println("thread " + index + " has finished...");  
  
                l.countDown();  
  
            }  
        }).start();  
    }  
  
    try {  
        l.await();  
    } catch (InterruptedException e) {  
  
        e.printStackTrace();  
    }  
  
    System.out.println("now all threads have finished");  
  
}
运行的结果 
thread 1 has finished... 
thread 3 has finished... 
thread 4 has finished... 
thread 6 has finished... 
thread 8 has finished... 
thread 0 has finished... 
thread 7 has finished... 
thread 9 has finished... 
thread 2 has finished... 
thread 5 has finished... 
now all threads have finished 

前面10个线程的执行完成顺序会变化,可是最后一句始终会等待前面10个线程都完成以后才会执行数组

说一说java的concurrent包3-线程安全而且无阻塞的Atomic类 

有了CountDownLatch,涉及到多线程同步的演示就比较容易了,接下来咱们看下Atomic相关的类, 好比AtomicLong, AtomicInteger等这些; 缓存

有了CountDownLatch,涉及到多线程同步的演示就比较容易了,接下来咱们看下Atomic相关的类, 好比AtomicLong, AtomicInteger等这些; 
简单的说,这些类都是线程安全的,支持无阻塞无锁定的 

Java代码  安全

set()  多线程

get()  并发

getAndSet()  jvm

getAndIncrement()  

getAndDecrement()  

getAndAdd()  


等操做 

下面是一个测试代码 

Java代码  

package com.hetaoblog.concurrent.test;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.junit.Test;  
/** 
 * 
 * by http://www.hetaoblog.com 
 * @author hetaoblog 
 * 
 */  
public class AtomicTest {  
  
    @Test  
    public void testAtomic()  
    {  
        final int loopcount = 10000;  
        int threadcount = 10;  
  
        final NonSafeSeq seq1 = new NonSafeSeq();  
        final SafeSeq seq2 = new SafeSeq();  
  
        final CountDownLatch l = new CountDownLatch(threadcount);  
  
        for(int i = 0; i < threadcount; ++i)  
        {  
            final int index = i;  
            new Thread(new Runnable() {  
  
                @Override  
                public void run() {  
                    for(int j = 0; j < loopcount; ++j)  
                    {  
  
                        seq1.inc();  
                        seq2.inc();  
                    }  
  
                    System.out.println("finished : " + index);  
                    l.countDown();  
  
                }  
            }).start();  
        }  
  
        try {  
            l.await();  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println("both have finished....");  
  
        System.out.println("NonSafeSeq:" + seq1.get());  
        System.out.println("SafeSeq with atomic: " + seq2.get());  
  
    }  
}  
  
class NonSafeSeq{  
    private long count = 0;  
    public void inc()  
    {  
        count++;  
    }  
  
    public long  get()  
    {  
        return count;  
    }  
}  
  
class SafeSeq{  
    private AtomicLong count  = new AtomicLong(0);  
  
    public void inc()  
    {  
        count.incrementAndGet();  
    }  
  
    public long get()  
    {  
        return count.longValue();  
    }  
}
其中NonSafeSeq是做为对比的类,直接放一个private long count不是线程安全的,而SafeSeq里面放了一个AtomicLong,是线程安全的;能够直接调用incrementAndGet来增长 

运行代码,能够获得相似这样的结果 
finished : 1 
finished : 0 
finished : 3 
finished : 2 
finished : 5 
finished : 4 
finished : 6 
finished : 8 
finished : 9 
finished : 7 
both have finished.... 
NonSafeSeq:91723 
SafeSeq with atomic: 100000 

能够看到,10个线程,每一个线程运行了10,000次,理论上应该有100,000次增长,使用了普通的long是非线程安全的,而使用了AtomicLong是线程安全的; 

注意,这个例子也说明,虽然long自己的单个设置是原子的,要么成功要么不成功,可是诸如count++这样的操做就不是线程安全的;由于这包括了读取和写入两步操做;

说一说java的concurrent包4--能够代替synchronized关键字的ReentrantLock 

在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,能够加在不一样的instance上或者class上,从使用的角度则分别能够加在非静态方法,静态方法,以及直接synchronized(MyObject)这样的用法; 

在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,能够加在不一样的instance上或者class上,从使用的角度则分别能够加在非静态方法,静态方法,以及直接synchronized(MyObject)这样的用法; 
concurrent包提供了一个能够替代synchronized关键字的ReentrantLock, 
简单的说你能够new一个ReentrantLock, 而后经过lock.lock和lock.unlock来获取锁和释放锁;注意必须将unlock放在finally块里面, 
reentrantlock的好处 
1. 是更好的性能, 
2. 提供同一个lock对象上不一样condition的信号通知 
3. 还提供lockInterruptibly这样支持响应中断的加锁过程,意思是说你试图去加锁,可是当前锁被其余线程hold住,而后你这个线程能够被中断; 

简单的一个例子: 

Java代码  

package com.hetaoblog.concurrent.test;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.locks.ReentrantLock;  
  
import org.junit.Test;  
  
public class ReentrantLockDemo {  
  
    @Test  
    public void demoLock()  
    {  
        final int loopcount = 10000;  
        int threadcount = 10;  
  
        final SafeSeqWithLock seq = new SafeSeqWithLock();  
  
        final CountDownLatch l = new CountDownLatch(threadcount);  
  
        for(int i = 0; i < threadcount; ++i)  
        {  
            final int index = i;  
            new Thread(new Runnable() {  
  
                @Override  
                public void run() {  
                    for(int j = 0; j < loopcount; ++j)  
                    {  
  
                        seq.inc();  
  
                    }  
  
                    System.out.println("finished : " + index);  
                    l.countDown();  
  
                }  
            }).start();  
        }  
  
        try {  
            l.await();  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println("both have finished....");  
  
        System.out.println("SafeSeqWithLock:" + seq.get());  
  
    }  
}  
  
class SafeSeqWithLock{  
    private long count = 0;  
  
    private ReentrantLock lock = new ReentrantLock();  
  
    public void inc()  
    {  
        lock.lock();  
  
        try{  
            count++;  
        }  
        finally{  
            lock.unlock();  
        }  
    }  
  
    public long get()  
    {  
        return count;  
    }  
}
一样之前面的相似Sequence的类举例,经过对inc操做加锁,保证了线程安全; 
固然,这里get()我没有加锁,对于这样直接读取返回原子类型的函数,我认为不加锁是没问题的,至关于返回最近成功操做的值; 

运行结果相似这样, 
finished : 7 
finished : 2 
finished : 6 
finished : 1 
finished : 5 
finished : 3 
finished : 0 
finished : 9 
finished : 8 
finished : 4 
both have finished.... 

SafeSeqWithLock:100000

说一说java的concurrent包5--读写锁ReadWriteLock 

concurrent包里面还提供了一个很是有用的锁,读写锁ReadWriteLock 

concurrent包里面还提供了一个很是有用的锁,读写锁ReadWriteLock 
下面是ReadWriteLock接口的说明: 
A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive. 

意思是说读锁能够有不少个锁同时上锁,只要当前没有写锁; 
写锁是排他的,上了写锁,其余线程既不能上读锁,也不能上写锁;一样,须要上写锁的前提是既没有读锁,也没有写锁; 
两个写锁不能同时得到无需说明,下面一段程序说明下上了读锁之后,其余线程须要上写锁也没法得到 

Java代码  

@Test  
public void testRWLock_getw_onr()  
{  
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
    final Lock rlock = lock.readLock();  
    final Lock wlock = lock.writeLock();  
  
    final CountDownLatch l  = new CountDownLatch(2);  
  
    // start r thread  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
  
            System.out.println(new Date() + "now to get rlock");  
            rlock.lock();  
  
            try {  
                Thread.currentThread().sleep(20 * 1000);  
            } catch (InterruptedException e) {  
  
                e.printStackTrace();  
            }  
  
            System.out.println(new Date() + "now to unlock rlock");  
            rlock.unlock();  
  
            l.countDown();  
        }  
    }).start();  
  
    // start w thread  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
  
            System.out.println(new Date() + "now to get wlock");  
            wlock.lock();  
  
            System.out.println(new Date() + "now to unlock wlock");  
            wlock.unlock();  
  
            l.countDown();  
        }  
    }).start();  
  
    try {  
        l.await();  
    } catch (InterruptedException e) {  
  
        e.printStackTrace();  
    }  
  
    System.out.println(new Date() + "finished");  
}
这代码在我机器上打印的结果是, 也就是试图得到写锁的线程只有当另一个线程将读锁释放了之后才能够得到 
Tue Feb 28 23:18:13 CST 2012now to get rlock 
Tue Feb 28 23:18:13 CST 2012now to get wlock 
Tue Feb 28 23:18:33 CST 2012now to unlock rlock 
Tue Feb 28 23:18:33 CST 2012now to unlock wlock 
Tue Feb 28 23:18:33 CST 2012finished 

ReadWriteLock的实现是ReentrantReadWriteLock, 
有趣的是,在一个线程中,读锁不能直接升级为写锁,可是写锁能够降级为读锁; 
这意思是,若是你已经有了读锁,再去试图得到写锁,将会没法得到, 一直堵住了; 
可是若是你有了写锁,再去试图得到读锁,没问题; 

下面是一段降级的代码, 

Java代码  

@Test  
public void testRWLock_downgrade()  
{  
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
    Lock rlock = lock.readLock();  
    Lock wlock = lock.writeLock();  
  
    System.out.println("now to get wlock");  
  
    wlock.lock();  
    System.out.println("now to get rlock");  
    rlock.lock();  
  
    System.out.println("now to unlock wlock");  
  
    wlock.unlock();  
  
    System.out.println("now to unlock rlock");  
    rlock.unlock();  
  
    System.out.println("finished");  
  
}
能够正常打印出 
now to get wlock 
now to get rlock 
now to unlock wlock 
now to unlock rlock 
finished 

下面是一段升级的代码, 

Java代码  

@Test  
    public void testRWLock_upgrade()  
    {  
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
        Lock rlock = lock.readLock();  
        Lock wlock = lock.writeLock();  
  
        System.out.println("now to get rlock");  
        rlock.lock();  
  
        System.out.println("now to get wlock");  
        wlock.lock();  
  
        System.out.println("now to unlock wlock");  
        wlock.unlock();  
  
        System.out.println("now to unlock rlock");  
        rlock.unlock();  
  
        System.out.println("finished");  
  
    }
只能打印出下面两句,后面就一直挂住了 
now to get rlock 

now to get wlock

说一说java的concurrent包6–java里面的线程基础类Thread 

有网友建议我在介绍concurrent包以前先介绍下jdk1.5以前的多线程知识,这是个至关不错的想法, 这篇就先介绍下Thread类; 

有网友建议我在介绍concurrent包以前先介绍下jdk1.5以前的多线程知识,这是个至关不错的想法, 这篇就先介绍下Thread类; 
Thread类是java中的线程,几乎全部的多线程都在Thread这个类的基础以后展开; 
下面介绍这个类的基本用法,Thread类的最基本函数就是run函数 
public void run() 
简单的说来,基本的建立一个完成本身功能的线程能够继承Thread类,而后override这个run方法, 以下所示 

Java代码  

public class ThreadDemo {  
  
    @Test  
    public void testThread()  
    {  
        SimpleThread t = new SimpleThread();  
        t.start();  
  
    }   
  
}  
class SimpleThread extends Thread{  
  
    @Override  
    public void run() {  
  
        System.out.println( Thread.currentThread().getName() + " is running  ");  
    }  
}
一般在run方法里面实现本身要作的功能,这里简单的打印了了一句话, 运行结果是 
Thread-0 is running 
启动一个线程就是new一个本身的Thread对象,而后调用其中的start方法启动这个线程;注意, run()方法运行结束以后这个线程的生命周期就结束了; 

上面举的例子是说启动一个线程就去完成一个任务,有的时候咱们须要一个线程始终在跑,按期执行一些任务,而后在某个时刻中止这个线程的运行; 那么能够有相似下面的一段代码: 

Java代码  

public class ThreadDemo {  
  
    public static void main(String[] args)  
    {  
        PeriodicalRunningThread t = new PeriodicalRunningThread();  
        t.start();  
  
        System.out.println("main thread is going to sleep...");  
        try {  
            Thread.currentThread().sleep(20 * 1000);  
  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println(new Date() + " now to stop PeriodicalRunningThread");  
        t.setRunning(false);  
  
    }  
  
}   
  
class PeriodicalRunningThread extends Thread{  
  
    private volatile boolean running = true;  
  
    @Override  
    public void run() {  
  
        while(running)  
        {  
            System.out.println(new Date() + " " + Thread.currentThread().getName() +  " is running " + new Date());  
  
            try {  
                Thread.currentThread().sleep(5 * 1000);  
  
            } catch (InterruptedException e) {  
  
                e.printStackTrace();  
            }  
        }  
  
        System.out.println(new Date() + " " + Thread.currentThread().getName() + " will end");  
    }  
  
    public void setRunning(boolean running) {  
        this.running = running;  
    }  
  
}
这段代码的打印结果是: 
main thread is going to sleep… 
Wed Feb 29 21:10:39 CST 2012 Thread-0 is running Wed Feb 29 21:10:39 CST 2012 
Wed Feb 29 21:10:44 CST 2012 Thread-0 is running Wed Feb 29 21:10:44 CST 2012 
Wed Feb 29 21:10:49 CST 2012 Thread-0 is running Wed Feb 29 21:10:49 CST 2012 
Wed Feb 29 21:10:54 CST 2012 Thread-0 is running Wed Feb 29 21:10:54 CST 2012 
Wed Feb 29 21:10:59 CST 2012 now to stop PeriodicalRunningThread 
Wed Feb 29 21:10:59 CST 2012 Thread-0 will end 

这里经过一个volatile的boolean值来做为标识表示这个线程的中止; 
关于这里的volatile关键字的使用,若有兴趣能够先看这个,核桃博客也会在这个系列的后续文章中对这个关键字作说明 
http://www.ibm.com/developerworks/cn/java/j-jtp06197.html 

这样,在这个running标识为true的时候,该线程一直在跑,可是完成一段任务后会sleep一段时间,而后继续执行;

说一说java的concurrent包7–Thread和Runnable 

这篇仍是Thread和Runnable的基础 

这篇仍是Thread和Runnable的基础 
在前面一篇的代码里面已经介绍了Thread类的其余几个经常使用的方法, 
1. sleep函数,做用是让当前线程sleep一段时间,单位以毫秒计算; 
public static void sleep(long millis) 
2. 静态方法Thread.currentThread(), 获得当前线程 
public static Thread currentThread() 
3. getName方法,获得当前线程名称 
public final String getName() 

这个名称能够在构造Thread的时候传入, 也能够经过setName()方法设置;这个在多线程调试的时候是比较有用的,设置当前线程名,而后在log4j的输出字符串格式里面加入%t,就能够在日志中打印当前线程名称,方便看到当前的日志是从哪里来的; 

如今介绍下多线程里面另一个重要的接口Runnable, 这个接口表示能够被一个线程执行的任务,事实上Thread类也实现了这个Runnable接口; 
这个接口只有一个函数, 实现者只要在里面调用代码就能够了 
void run() 
同时, Thread类有个构造函数是传入一个Runnable实现的; 
经常使用的一个用法就是经过匿名内部类来建立线程执行简单任务,避免写太多的类,外部须要的变量能够经过加final修饰符后传入, 代码例子以下: 

Java代码  

public static void testThreadWithRunnable()  
{  
    final String word = "hello,world";  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
            System.out.println(word);  
  
        }  
    }).start();  
}  
  
public static void main(String[] args)  
{  
    //periodicalThreadTest();  
  
    testThreadWithRunnable();  
  
}
上面的代码会打印 

hello,world

说一说java的concurrent包8–用在一个lock上的多个Condition 

concurrent系列的前一篇说到说一说java的concurrent包7–thread和runnable,如今继续,今天介绍下Condtion这个接口,能够用在一个lock上的多个不一样的状况; 

在jdk的线程同步代码中,不管的synchronized关键字,或者是lock上的await/signal等,都只能在一个锁上作同步通知; 
假设有3个线程,要对一个资源作同步,通常只能有一个锁来作同步通知操做,那么通知的时候没法作到精确的通知3个线程中的某一个的; 
由于你调用了wait()/notify()的时候,具体的调度是jvm决定的; 

可是有的时候的确须要须要对一个锁作多种不一样状况的精确通知, 好比一个缓存,满了和空了是两种不一样的状况,能够分别通知取数据的线程和放数据的线程; 

Condition的基本使用以下: 
* Condition是个接口,基本的方法就是await()和signal()方法; 
* Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 
* 调用Condition的await()和signal()方法,都必须在lock保护以内,就是说必须在lock.lock()和lock.unlock之间才能够 
* 和Object.wait()方法同样,每次调用Condition的await()方法的时候,当前线程就自动释放了对当前锁的拥有权 

固然,Condition实际上是个接口,上面说的这几点,在实现Condition的时候能够自由控制一点;可是jdk的javadoc说了,若是有啥特别的实现,必需要清楚的说明的; 

下一节我会结合具体的代码来介绍下Condition的使用;

说一说java的concurrent包9–Condition的代码例子BoundedBuffer 

面说了Condition的基本含义,今天这篇说下Condition的一个代码例子; 
javadoc里面对Condition有一个绝佳的例子,BoundedBuffer类,就是一个线程安全的有界限的缓存;很是巧妙的利用了Condition,根据来通知不一样的线程作不一样的事情; 
下面先看下具体代码: 

Java代码  

class BoundedBuffer {  
  
   final Lock lock = new ReentrantLock();  
  
   final Condition notFull  = lock.newCondition();   
  
   final Condition notEmpty = lock.newCondition();   
  
  
  
   final Object[] items = new Object[100];  
  
   int putptr, takeptr, count;  
  
  
  
   public void put(Object x) throws InterruptedException {  
  
     lock.lock();  
  
     try {  
  
       while (count == items.length)   
  
         notFull.await();  
  
       items[putptr] = x;   
  
       if (++putptr == items.length) putptr = 0;  
  
       ++count;  
  
       notEmpty.signal();  
  
     } finally {  
  
       lock.unlock();  
  
     }  
  
   }  
  
  
  
   public Object take() throws InterruptedException {  
  
     lock.lock();  
  
     try {  
  
       while (count == 0)   
  
         notEmpty.await();  
  
       Object x = items[takeptr];   
  
       if (++takeptr == items.length) takeptr = 0;  
  
       --count;  
  
       notFull.signal();  
  
       return x;  
  
     } finally {  
  
       lock.unlock();  
  
     }  
  
   }   
  
 }
代码意思不复杂,一个有界的buffer,里面是个数组,能够往里面放数据和取数据; 
因为该buffer被多个线程共享,因此每次放和取操做的时候都用一个lock保护起来; 
每次取数据(take)的时候, 
a. 若是当前个数是0(用一个count计数), 那么就调用notEmpty.await等待,锁就释放了; 
b. 取数据的索引专门有一个,每次向前一步; 若是到头了就从0开始循环使用 
c.若是有数据,那就取一个数据,将count减1,同时调用notfull.signal(), 

每次放数据(put)的时候 
a.若是count和length相等,也就是满了,那就调用notFull.await等待,释放了锁; 等待有一些take()调用完成以后才会进入 
b. 放数据也有一个索引putptr, 放入数据; 若是到头了也从0开始循环使用 

c. 调用notempty.signal(); 若是有线程在take()的时候await住了,那么就会被通知到,能够继续进行操做

说一说java的concurrent包10–Condition和BoundedBuffer的测试代码

前面一篇说了Condition和BoundedBuffer的基本代码,下面写一个简单的程序测试下这个BoundedBuffer; 

前面一篇说了Condition和BoundedBuffer的基本代码,下面写一个简单的程序测试下这个BoundedBuffer; 

这段程序的目的是测试先put()后take()的操做, 
1. 我将BoundedBuffer的大小设置成5,同时在每次进入notFull和notEmpty的await()的时候打印一下表示当前线程正在等待; 
2. 先开启10个线程作put()操做,预计有5个线程能够完成,另外5个会进入等待 
3. 主线程sleep10秒中,而后启动10个线程作take()操做; 

这个时候,首先第一个take()必然成功完成,在这以前等待的5个put()线程都不会被唤醒, 接下来的事情就很差说了; 
剩下的5个put()线程和9个take()线程中的任何一个均可能会被jvm调度; 
好比可能出现 
a. 开始take()的时候,有5个连续的take()线程完成操做; 而后又进入put()和take()交替的状况 
b. 第一个take()以后,马上会有一个put()线程被notFull().signal()唤醒; 而后继续有take()和put()交替的状况; 

其中take()线程也可能进入notEmpty.await()操做; 
可是任什么时候候,未完成的take()线程始终>=未完成的put()线程, 这个也是很天然的; 



Java代码  

package com.hetaoblog.concurrent.test;  
  
  
  
import java.util.Date;  
  
import java.util.concurrent.CountDownLatch;  
  
import java.util.concurrent.locks.Condition;  
  
import java.util.concurrent.locks.Lock;  
  
import java.util.concurrent.locks.ReentrantLock;  
  
  
  
import org.junit.Test;  
  
  
  
public class BoundedBufferTest {  
  
      
  
      
  
      
  
    @Test  
  
    public void testPutTake()  
  
    {  
  
          
  
        final BoundedBuffer bb = new BoundedBuffer();  
  
          
  
          
  
        int count = 10;  
  
        final CountDownLatch c = new CountDownLatch(count * 2);  
  
          
  
          
  
        System.out.println(new Date() + " now try to call put for " + count );  
  
          
  
        for(int i = 0; i < count ; ++i)  
  
        {  
  
            final int index = i;  
  
            try {  
  
                Thread t = new Thread(new Runnable() {  
  
                      
  
                    @Override  
  
                    public void run() {  
  
                          
  
                        try {  
  
                            bb.put(index);  
  
                            System.out.println(new Date() + "  put finished:  " + index);  
  
                        } catch (InterruptedException e) {  
  
                              
  
                            e.printStackTrace();  
  
                        }  
  
                          
  
                        c.countDown();  
  
                    }  
  
                });  
  
  
  
                t.start();  
  
                  
  
                  
  
            } catch (Exception e) {  
  
                  
  
                e.printStackTrace();  
  
            }  
  
        }  
  
          
  
        try {  
  
            System.out.println(new Date() + " main thread is going to sleep for 10 seconds");  
  
            Thread.sleep(10 * 1000);  
  
        } catch (InterruptedException e1) {  
  
              
  
            e1.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " now try to take for count: " + count);  
  
          
  
          
  
        for(int i =0; i < count; ++i)  
  
        {  
  
              
  
            Thread t= new Thread(new Runnable() {  
  
                  
  
                @Override  
  
                public void run() {  
  
                      
  
                    try {  
  
                          
  
                          
  
                        Object o = bb.take();  
  
                          
  
                        System.out.println(new Date() + " take get: " + o);  
  
                    } catch (InterruptedException e) {  
  
                          
  
                        e.printStackTrace();  
  
                    }  
  
                      
  
                      
  
                      
  
                      
  
                      
  
                      
  
                    c.countDown();  
  
                      
  
                }  
  
            });  
  
              
  
            t.start();  
  
        }  
  
          
  
        try {  
  
              
  
            System.out.println(new Date() + ": main thread is to wait for all threads");  
  
            c.await();  
  
              
  
              
  
        } catch (InterruptedException e) {  
  
              
  
            e.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " all threads finished");  
  
          
  
    }  
  
  
  
}  
  
class BoundedBuffer {  
  
       final Lock lock = new ReentrantLock();  
  
       final Condition notFull  = lock.newCondition();   
  
       final Condition notEmpty = lock.newCondition();   
  
  
  
       final Object[] items = new Object[5];  
  
       int putptr, takeptr, count;  
  
  
  
       public void put(Object x) throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == items.length)   
  
           {  
  
               System.out.println(new Date() + " put  is to wait....");  
  
             notFull.await();  
  
           }  
  
           items[putptr] = x;   
  
           if (++putptr == items.length) putptr = 0;  
  
           ++count;  
  
           notEmpty.signal();  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }  
  
  
  
       public Object take() throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == 0)  
  
           {  
  
               System.out.println(new Date() + " take is going to wait..");  
  
             notEmpty.await();  
  
           }  
  
           Object x = items[takeptr];   
  
           if (++takeptr == items.length) takeptr = 0;  
  
           --count;  
  
           notFull.signal();  
  
           return x;  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }   
  
     }
下面是这段程序在我机器上的运行结果: 

这是其中一个执行结果,正好对应前面说的状况a, 5个take()先完成;这里出现了take()线程调用notEmpty.await()的状况 
Thu Mar 15 21:15:13 CST 2012 now try to call put for 10 
Thu Mar 15 21:15:13 CST 2012 put finished: 0 
Thu Mar 15 21:15:13 CST 2012 put finished: 2 
Thu Mar 15 21:15:13 CST 2012 put finished: 3 
Thu Mar 15 21:15:13 CST 2012 put finished: 1 
Thu Mar 15 21:15:13 CST 2012 main thread is going to sleep for 10 seconds 
Thu Mar 15 21:15:13 CST 2012 put finished: 4 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:23 CST 2012 now try to take for count: 10 
Thu Mar 15 21:15:23 CST 2012 take get: 3 
Thu Mar 15 21:15:23 CST 2012 take get: 2 
Thu Mar 15 21:15:23 CST 2012 take get: 1 
Thu Mar 15 21:15:23 CST 2012 take get: 0 
Thu Mar 15 21:15:23 CST 2012 take get: 4 
Thu Mar 15 21:15:23 CST 2012 put finished: 5 
Thu Mar 15 21:15:23 CST 2012: main thread is to wait for all threads 
Thu Mar 15 21:15:23 CST 2012 take is going to wait.. 
Thu Mar 15 21:15:23 CST 2012 take get: 5 
Thu Mar 15 21:15:23 CST 2012 put finished: 6 
Thu Mar 15 21:15:23 CST 2012 put finished: 8 
Thu Mar 15 21:15:23 CST 2012 put finished: 7 
Thu Mar 15 21:15:23 CST 2012 put finished: 9 
Thu Mar 15 21:15:23 CST 2012 take get: 6 
Thu Mar 15 21:15:23 CST 2012 take get: 7 
Thu Mar 15 21:15:23 CST 2012 take get: 8 
Thu Mar 15 21:15:23 CST 2012 take get: 9 
Thu Mar 15 21:15:23 CST 2012 all threads finished 

这是另外一个执行结果: 
Thu Mar 15 21:02:49 CST 2012 now try to call put for 10 
Thu Mar 15 21:02:49 CST 2012 put finished: 3 
Thu Mar 15 21:02:49 CST 2012 put finished: 1 
Thu Mar 15 21:02:49 CST 2012 put finished: 0 
Thu Mar 15 21:02:49 CST 2012 put finished: 2 
Thu Mar 15 21:02:49 CST 2012 put finished: 4 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 main thread is going to sleep for 10 seconds 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:59 CST 2012 now try to take for count: 10 
Thu Mar 15 21:02:59 CST 2012 take get: 1 
Thu Mar 15 21:02:59 CST 2012 take get: 0 
Thu Mar 15 21:02:59 CST 2012 take get: 3 
Thu Mar 15 21:02:59 CST 2012 take get: 4 
Thu Mar 15 21:02:59 CST 2012: main thread is to wait for all threads 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 put finished: 5 
Thu Mar 15 21:02:59 CST 2012 take get: 2 
Thu Mar 15 21:02:59 CST 2012 take get: 5 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 put finished: 7 
Thu Mar 15 21:02:59 CST 2012 put finished: 6 
Thu Mar 15 21:02:59 CST 2012 put finished: 8 
Thu Mar 15 21:02:59 CST 2012 put finished: 9 
Thu Mar 15 21:02:59 CST 2012 take get: 7 
Thu Mar 15 21:02:59 CST 2012 take get: 6 
Thu Mar 15 21:02:59 CST 2012 take get: 8 
Thu Mar 15 21:02:59 CST 2012 take get: 9 
Thu Mar 15 21:02:59 CST 2012 all threads finished 

执行结果2: 
Thu Mar 15 21:14:30 CST 2012 now try to call put for 10 
Thu Mar 15 21:14:30 CST 2012 main thread is going to sleep for 10 seconds 
Thu Mar 15 21:14:30 CST 2012 put finished: 8 
Thu Mar 15 21:14:30 CST 2012 put finished: 6 
Thu Mar 15 21:14:30 CST 2012 put finished: 2 
Thu Mar 15 21:14:30 CST 2012 put finished: 0 
Thu Mar 15 21:14:30 CST 2012 put finished: 4 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:40 CST 2012 now try to take for count: 10 
Thu Mar 15 21:14:40 CST 2012 take get: 8 
Thu Mar 15 21:14:40 CST 2012 take get: 6 
Thu Mar 15 21:14:40 CST 2012 take get: 4 
Thu Mar 15 21:14:40 CST 2012 take get: 2 
Thu Mar 15 21:14:40 CST 2012: main thread is to wait for all threads 
Thu Mar 15 21:14:40 CST 2012 take get: 0 
Thu Mar 15 21:14:40 CST 2012 take is going to wait.. 
Thu Mar 15 21:14:40 CST 2012 take is going to wait.. 
Thu Mar 15 21:14:40 CST 2012 take is going to wait.. 
Thu Mar 15 21:14:40 CST 2012 put finished: 1 
Thu Mar 15 21:14:40 CST 2012 put finished: 5 
Thu Mar 15 21:14:40 CST 2012 put finished: 3 
Thu Mar 15 21:14:40 CST 2012 put finished: 9 
Thu Mar 15 21:14:40 CST 2012 take get: 1 
Thu Mar 15 21:14:40 CST 2012 put finished: 7 
Thu Mar 15 21:14:40 CST 2012 take get: 5 
Thu Mar 15 21:14:40 CST 2012 take get: 3 
Thu Mar 15 21:14:40 CST 2012 take get: 7 
Thu Mar 15 21:14:40 CST 2012 take get: 9 
Thu Mar 15 21:14:40 CST 2012 all threads finished 

在几回不一样的执行中,始终能够观察到任什么时候候,未完成的take()线程数>= 未完成的put()线程; 在未完成的线程数相等的状况下,即便jvm首先调度到了take()线程,也会进入notEmpty.await()释放锁,进入等待

说一说java的concurrent包11–Condition和BoundedBuffer的测试代码2 

前面一篇说了一个Condition和BoundedBuffer的测试代码例子,前面测试的是先put()再take()的操做,这篇说一下先take()再put()的操做; 

前面一篇说了一个Condition和BoundedBuffer的测试代码例子,前面测试的是先put()再take()的操做,这篇说一下先take()再put()的操做; 
固然,必须先要说明的是,这篇和前面这篇在打印日志的时候实际上是有错误的,这个错误在前面一篇并不明显,不会致使明显的问题; 
可是一样的缘由致使如今这个先take()再put()的操做会出现明显的错误,看上去会显得难以想象; 
具体状况留到下一篇详细说明,这里先上测试目的,测试代码和运行结果; 
同时说明多线程编程须要很是谨慎,不然极易出错 

测试目的: 
1. 我将BoundedBuffer的大小设置成5,同时在每次进入notFull和notEmpty的await()的时候打印一下表示当前线程正在等待; 
2. 先开启10个线程作take()操做,因为开始BoundedBuffer里面没有东西,因此10个线程所有调用await进入等待 
3. 主线程sleep10秒中,而后启动10个线程作put()操做; 
在第一个put()完成以后,接下来应该会有部分put()线程和take()线程前后完成; 
理论上, 
a. 任何一个元素的put()都会发生在take()以前; 
b. 若是X表示某个操做成功的次数,在X(put)-X(take)<5的时候,put线程不会进入等待状态 


下面是测试代码: 



Java代码  

    @Test  
  
    public void testTakePut()  
  
    {  
  
  
  
        final BoundedBuffer bb = new BoundedBuffer();  
  
          
  
          
  
        int count = 10;  
  
        final CountDownLatch c = new CountDownLatch(count * 2);  
  
          
  
  
  
        System.out.println(new Date() + " first try to call take for count: " + count);  
  
        for(int i =0; i < count; ++i)  
  
        {  
  
            final int index = i;  
  
            Thread t= new Thread(new Runnable() {  
  
                  
  
                @Override  
  
                public void run() {  
  
                      
  
                    try {  
  
                          
  
                        Thread.currentThread().setName(" TAKE " + index);  
  
                          
  
                          
  
                        Object o = bb.take();  
  
                          
  
                        System.out.println(new Date() + " " + " take get: " + o );  
  
                    } catch (InterruptedException e) {  
  
                          
  
                        e.printStackTrace();  
  
                    }  
  
                      
  
                      
  
                      
  
                      
  
                      
  
                      
  
                    c.countDown();  
  
                      
  
                }  
  
            });  
  
              
  
            t.start();  
  
        }  
  
          
  
        try {  
  
            System.out.println(new Date() + " main thread is going to sleep for 10 seconds");  
  
            Thread.sleep(10 * 1000);  
  
        } catch (InterruptedException e1) {  
  
              
  
            e1.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " now try to call put for " + count );  
  
          
  
        for(int i = 0; i < count ; ++i)  
  
        {  
  
            final int index = i;  
  
            try {  
  
                Thread t = new Thread(new Runnable() {  
  
                      
  
                    @Override  
  
                    public void run() {  
  
                          
  
                          
  
                        Thread.currentThread().setName(" PUT " + index);  
  
                          
  
                        try {  
  
                            bb.put(index);  
  
                            System.out.println(new Date() + " " + "  put finished:  " + index );  
  
                        } catch (InterruptedException e) {  
  
                              
  
                            e.printStackTrace();  
  
                        }  
  
                          
  
                        c.countDown();  
  
                    }  
  
                });  
  
  
  
                t.start();  
  
                  
  
                  
  
            } catch (Exception e) {  
  
                  
  
                e.printStackTrace();  
  
            }  
  
        }  
  
          
  
          
  
        try {  
  
              
  
            System.out.println(new Date() + ": main thread is to wait for all threads");  
  
            c.await();  
  
              
  
              
  
        } catch (InterruptedException e) {  
  
              
  
            e.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " all threads finished");  
  
    }  
  
  
  
  
  
class BoundedBuffer {  
  
       final Lock lock = new ReentrantLock();  
  
       final Condition notFull  = lock.newCondition();   
  
       final Condition notEmpty = lock.newCondition();   
  
  
  
       final Object[] items = new Object[5];  
  
       int putptr, takeptr, count;  
  
  
  
       public void put(Object x) throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == items.length)   
  
           {  
  
               System.out.println(new Date() + " " + Thread.currentThread().getName() + " put  is to wait....: " + System.currentTimeMillis());    
  
             notFull.await();  
  
               
  
           }  
  
           items[putptr] = x;   
  
           if (++putptr == items.length) putptr = 0;  
  
           ++count;  
  
             
  
           notEmpty.signal();  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }  
  
  
  
       public Object take() throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == 0)  
  
           {  
  
               System.out.println(new Date() + " " + Thread.currentThread().getName() + " take is going to wait.. " + System.currentTimeMillis());    
  
             notEmpty.await();  
  
               
  
           }  
  
           Object x = items[takeptr];   
  
           if (++takeptr == items.length) takeptr = 0;  
  
           --count;  
  
             
  
             
  
             
  
           notFull.signal();  
  
           return x;  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }   
  
     }
运行结果1:  Fri Mar 16 20:50:10 CST 2012 first try to call take for count: 10  Fri Mar 16 20:50:10 CST 2012 TAKE 0 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 1 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 2 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 3 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 5 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 main thread is going to sleep for 10 seconds  Fri Mar 16 20:50:10 CST 2012 TAKE 4 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 7 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 6 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 9 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 8 take is going to wait..  Fri Mar 16 20:50:20 CST 2012 now try to call put for 10  Fri Mar 16 20:50:20 CST 2012: main thread is to wait for all threads  Fri Mar 16 20:50:20 CST 2012 PUT 7 put finished: 7  Fri Mar 16 20:50:20 CST 2012 PUT 9 put finished: 9  Fri Mar 16 20:50:20 CST 2012 PUT 8 put finished: 8  Fri Mar 16 20:50:20 CST 2012 PUT 3 put is to wait....  Fri Mar 16 20:50:20 CST 2012 PUT 1 put is to wait....  Fri Mar 16 20:50:20 CST 2012 PUT 5 put finished: 5  Fri Mar 16 20:50:20 CST 2012 PUT 4 put is to wait....  Fri Mar 16 20:50:20 CST 2012 TAKE 0 take get: 8  Fri Mar 16 20:50:20 CST 2012 TAKE 2 take get: 9  Fri Mar 16 20:50:20 CST 2012 TAKE 3 take get: 0  Fri Mar 16 20:50:20 CST 2012 TAKE 5 take get: 6  Fri Mar 16 20:50:20 CST 2012 TAKE 4 take get: 5  Fri Mar 16 20:50:20 CST 2012 PUT 2 put finished: 2  Fri Mar 16 20:50:20 CST 2012 PUT 3 put finished: 3  Fri Mar 16 20:50:20 CST 2012 PUT 1 put finished: 1  Fri Mar 16 20:50:20 CST 2012 TAKE 7 take get: 2  Fri Mar 16 20:50:20 CST 2012 TAKE 6 take get: 3  Fri Mar 16 20:50:20 CST 2012 TAKE 9 take get: 1  Fri Mar 16 20:50:20 CST 2012 TAKE 8 take get: 4  Fri Mar 16 20:50:20 CST 2012 PUT 6 put finished: 6  Fri Mar 16 20:50:20 CST 2012 PUT 0 put finished: 0  Fri Mar 16 20:50:20 CST 2012 PUT 4 put finished: 4  Fri Mar 16 20:50:20 CST 2012 TAKE 1 take get: 7  Fri Mar 16 20:50:20 CST 2012 all threads finished  注意到红色部分:  第一个加为红色是由于按照打印结果,put()只完成了3次,就开始有put()进入等待了,而BoundedBuffer的大小是5,理论上应该没有满的!  第二个加为红色是由于元素4居然先被take,而后再被put! 显然程序有地方出错了!具体缘由分析,欢迎关注核桃博客:)
相关文章
相关标签/搜索