java多线程设计模式

详见:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt220java

java多线程设计模式算法

java语言已经内置了多线程支持,全部实现Runnable接口的类均可被启动一个新线程,新线程会执行该实例的run()方法,当run()方法执行完毕后,线程就结束了。一旦一个线程执行完毕,这个实例就不能再从新启动,只能从新生成一个新实例,再启动一个新线程。

Thread类是实现了Runnable接口的一个实例,它表明一个线程的实例,而且,启动线程的惟一方法就是经过Thread类的start()实例方法:

Thread t = new Thread();
t.start();

start()方法是一个native方法,它将启动一个新线程,并执行run()方法。Thread类默认的run()方法什么也不作就退出了。注意:直接调用run()方法并不会启动一个新线程,它和调用一个普通的java方法没有什么区别。

所以,有两个方法能够实现本身的线程:

方法1:本身的类extend Thread,并复写run()方法,就能够启动新线程并执行本身定义的run()方法。例如:

public class MyThread extends Thread {
    public run() {
        System.out.println("MyThread.run()");
    }
}

在合适的地方启动线程:new MyThread().start();

方法2:若是本身的类已经extends另外一个类,就没法直接extends Thread,此时,必须实现一个Runnable接口:

public class MyThread extends OtherClass implements Runnable {
    public run() {
        System.out.println("MyThread.run()");
    }
}

为了启动MyThread,须要首先实例化一个Thread,并传入本身的MyThread实例:

MyThread myt = new MyThread();
Thread t = new Thread(myt);
t.start();

事实上,当传入一个Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:

public void run() {
    if (target != null) {
        target.run();
    }
}

线程还有一些Name, ThreadGroup, isDaemon等设置,因为和线程设计模式关联不多,这里就很少说了。


因为同一进程内的多个线程共享内存空间,在Java中,就是共享实例,当多个线程试图同时修改某个实例的内容时,就会形成冲突,所以,线程必须实现共享互斥,使多线程同步。

最简单的同步是将一个方法标记为synchronized,对同一个实例来讲,任一时刻只能有一个synchronized方法在执行。当一个方法正在执行某个synchronized方法时,其余线程若是想要执行这个实例的任意一个synchronized方法,都必须等待当前执行 synchronized方法的线程退出此方法后,才能依次执行。

可是,非synchronized方法不受影响,无论当前有没有执行synchronized方法,非synchronized方法均可以被多个线程同时执行。

此外,必须注意,只有同一实例的synchronized方法同一时间只能被一个线程执行,不一样实例的synchronized方法是能够并发的。例如,class A定义了synchronized方法sync(),则不一样实例a1.sync()和a2.sync()能够同时由两个线程来执行。


多线程同步的实现最终依赖锁机制。咱们能够想象某一共享资源是一间屋子,每一个人都是一个线程。当A但愿进入房间时,他必须得到门锁,一旦A得到门锁,他进去后就马上将门锁上,因而B,C,D就不得不在门外等待,直到A释放锁出来后,B,C,D中的某一人抢到了该锁(具体抢法依赖于 JVM的实现,能够先到先得,也能够随机挑选),而后进屋又将门锁上。这样,任一时刻最多有一人在屋内(使用共享资源)。

Java语言规范内置了对多线程的支持。对于Java程序来讲,每个对象实例都有一把“锁”,一旦某个线程得到了该锁,别的线程若是但愿得到该锁,只能等待这个线程释放锁以后。得到锁的方法只有一个,就是synchronized关键字。例如:

public class SharedResource {
    private int count = 0;

    public int getCount() { return count; }

    public synchronized void setCount(int count) { this.count = count; }

}

同步方法public synchronized void setCount(int count) { this.count = count; } 事实上至关于:

public void setCount(int count) {
    synchronized(this) { // 在此得到this锁
         this.count = count;
    } // 在此释放this锁
}

红色部分表示须要同步的代码段,该区域为“危险区域”,若是两个以上的线程同时执行,会引起冲突,所以,要更改SharedResource的内部状态,必须先得到SharedResource实例的锁。

退出synchronized块时,线程拥有的锁自动释放,因而,别的线程又能够获取该锁了。

为了提升性能,不必定要锁定this,例如,SharedResource有两个独立变化的变量:

public class SharedResouce {
    private int a = 0;
    private int b = 0;

    public synchronized void setA(int a) { this.a = a; }

    public synchronized void setB(int b) { this.b = b; }
}

若同步整个方法,则setA()的时候没法setB(),setB()时没法setA()。为了提升性能,可使用不一样对象的锁:

public class SharedResouce {
    private int a = 0;
    private int b = 0;
    private Object sync_a = new Object();
    private Object sync_b = new Object();

    public void setA(int a) {
        synchronized(sync_a) {
            this.a = a;
        }
    }

    public synchronized void setB(int b) {
        synchronized(sync_b) {
            this.b = b;
        }
    }
}


一般,多线程之间须要协调工做。例如,浏览器的一个显示图片的线程displayThread想要执行显示图片的任务,必须等待下载线程 downloadThread将该图片下载完毕。若是图片尚未下载完,displayThread能够暂停,当downloadThread完成了任务后,再通知displayThread“图片准备完毕,能够显示了”,这时,displayThread继续执行。

以上逻辑简单的说就是:若是条件不知足,则等待。当条件知足时,等待该条件的线程将被唤醒。在Java中,这个机制的实现依赖于wait/notify。等待机制与锁机制是密切关联的。例如:

synchronized(obj) {
    while(!condition) {
        obj.wait();
    }
    obj.doSomething();
}

当线程A得到了obj锁后,发现条件condition不知足,没法继续下一处理,因而线程A就wait()。

在另外一线程B中,若是B更改了某些条件,使得线程A的condition条件知足了,就能够唤醒线程A:

synchronized(obj) {
    condition = true;
    obj.notify();
}

须要注意的概念是:

# 调用obj的wait(), notify()方法前,必须得到obj锁,也就是必须写在synchronized(obj) {} 代码段内。

# 调用obj.wait()后,线程A就释放了obj的锁,不然线程B没法得到obj锁,也就没法在synchronized(obj) {} 代码段内唤醒A。

# 当obj.wait()方法返回后,线程A须要再次得到obj锁,才能继续执行。

# 若是A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪个由JVM决定)。

# obj.notifyAll()则能所有唤醒A1,A2,A3,可是要继续执行obj.wait()的下一条语句,必须得到obj锁,所以,A1,A2,A3只有一个有机会得到锁继续执行,例如A1,其他的须要等待A1释放obj锁以后才能继续执行。

# 当B调用obj.notify/notifyAll的时候,B正持有obj锁,所以,A1,A2,A3虽被唤醒,可是仍没法得到obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会得到锁继续执行。



前面讲了wait/notify机制,Thread还有一个sleep()静态方法,它也能使线程暂停一段时间。sleep与wait的不一样点是: sleep并不释放锁,而且sleep的暂停和wait暂停是不同的。obj.wait会使线程进入obj对象的等待集合中并等待唤醒。

可是wait()和sleep()均可以经过interrupt()方法打断线程的暂停状态,从而使线程马上抛出InterruptedException。

若是线程A但愿当即结束线程B,则能够对线程B对应的Thread实例调用interrupt方法。若是此刻线程B正在 wait/sleep/join,则线程B会马上抛出InterruptedException,在catch() {} 中直接return便可安全地结束线程。

须要注意的是,InterruptedException是线程本身从内部抛出的,并非interrupt()方法抛出的。对某一线程调用 interrupt()时,若是该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。可是,一旦该线程进入到 wait()/sleep()/join()后,就会马上抛出InterruptedException。



GuardedSuspention模式主要思想是:

当条件不知足时,线程等待,直到条件知足时,等待该条件的线程被唤醒。

咱们设计一个客户端线程和一个服务器线程,客户端线程不断发送请求给服务器线程,服务器线程不断处理请求。当请求队列为空时,服务器线程就必须等待,直到客户端发送了请求。

先定义一个请求队列:Queue

package com.crackj2ee.thread;

import java.util.*;

public class Queue {
    private List queue = new LinkedList();

    public synchronized Request getRequest() {
        while(queue.size()==0) {
            try {
                this.wait();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Request)queue.remove(0);
    }

    public synchronized void putRequest(Request request) {
        queue.add(request);
        this.notifyAll();
    }

}

蓝色部分就是服务器线程的等待条件,而客户端线程在放入了一个request后,就使服务器线程等待条件知足,因而唤醒服务器线程。

客户端线程:ClientThread

package com.crackj2ee.thread;

public class ClientThread extends Thread {
    private Queue queue;
    private String clientName;

    public ClientThread(Queue queue, String clientName) {
        this.queue = queue;
        this.clientName = clientName;
    }

    public String toString() {
        return "[ClientThread-" + clientName + "]";
    }

    public void run() {
        for(int i=0; i<100; i++) {
            Request request = new Request("" + (long)(Math.random()*10000));
            System.out.println(this + " send request: " + request);
            queue.putRequest(request);
            try {
                Thread.sleep((long)(Math.random() * 10000 + 1000));
            }
            catch(InterruptedException ie) {
            }
        }
        System.out.println(this + " shutdown.");
    }
}

服务器线程:ServerThread

package com.crackj2ee.thread;
public class ServerThread extends Thread {
    private boolean stop = false;
    private Queue queue;

    public ServerThread(Queue queue) {
        this.queue = queue;
    }

    public void shutdown() {
        stop = true;
        this.interrupt();
        try {
            this.join();
        }
        catch(InterruptedException ie) {}
    }

    public void run() {
        while(!stop) {
            Request request = queue.getRequest();
            System.out.println("[ServerThread] handle request: " + request);
            try {
                Thread.sleep(2000);
            }
            catch(InterruptedException ie) {}
        }
        System.out.println("[ServerThread] shutdown.");
    }
}

服务器线程在红色部分可能会阻塞,也就是说,Queue.getRequest是一个阻塞方法。这和java标准库的许多IO方法相似。

最后,写一个Main来启动他们:

package com.crackj2ee.thread;

public class Main {

    public static void main(String[] args) {
        Queue queue = new Queue();
        ServerThread server = new ServerThread(queue);
        server.start();
        ClientThread[] clients = new ClientThread[5];
        for(int i=0; i<clients.length; i++) {
            clients[i] = new ClientThread(queue, ""+i);
            clients[i].start();
        }
        try {
            Thread.sleep(100000);
        }
        catch(InterruptedException ie) {}
        server.shutdown();
    }
}

咱们启动了5个客户端线程和一个服务器线程,运行结果以下:

[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906


能够观察到ServerThread处理来自不一样客户端的请求。

思考

Q: 服务器线程的wait条件while(queue.size()==0)可否换成if(queue.size()==0)?

A: 在这个例子中能够,由于服务器线程只有一个。可是,若是服务器线程有多个(例如Web应用程序有多个线程处理并发请求,这很是广泛),就会形成严重问题。

Q: 可否用sleep(1000)代替wait()?

A: 绝对不能够。sleep()不会释放锁,所以sleep期间别的线程根本没有办法调用getRequest()和putRequest(),致使全部相关线程都被阻塞。

Q: (Request)queue.remove(0)能够放到synchronized() {}块外面吗?

A: 不能够。由于while()是测试queue,remove()是使用queue,二者是一个原子操做,不能放在synchronized外面。

总结

多线程设计看似简单,实际上必须很是仔细地考虑各类锁定/同步的条件,稍不当心,就可能出错。而且,当线程较少时,极可能发现不了问题,一旦问题出现又难以调试。

所幸的是,已有一些被验证过的模式能够供咱们使用,咱们会继续介绍一些经常使用的多线程设计模式。


前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器链接时,服务器就能够启动一个新线程为用户服务。

然而,建立和销毁线程自己就有必定的开销,若是频繁建立和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工做。所以,线程池就是为了不频繁建立和销毁线程。

每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁建立和销毁线程。

Worker Pattern实现了相似线程池的功能。首先定义Task接口:

package com.crackj2ee.thread;
public interface Task {
    void execute();
}

线程将负责执行execute()方法。注意到任务是由子类经过实现execute()方法实现的,线程自己并不知道本身执行的任务。它只负责运行一个耗时的execute()方法。

具体任务由子类实现,咱们定义了一个CalculateTask和一个TimerTask:

// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
    private static int count = 0;
    private int num = count;
    public CalculateTask() {
        count++;
    }
    public void execute() {
        System.out.println("[CalculateTask " + num + "] start");
        try {
            Thread.sleep(3000);
        }
        catch(InterruptedException ie) {}
        System.out.println("[CalculateTask " + num + "] done.");
    }
}

// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
    private static int count = 0;
    private int num = count;
    public TimerTask() {
        count++;
    }
    public void execute() {
        System.out.println("[TimerTask " + num + "] start");
        try {
            Thread.sleep(2000);
        }
        catch(InterruptedException ie) {}
        System.out.println("[TimerTask " + num + "] done.");
    }
}

以上任务均简单的sleep若干秒。

TaskQueue实现了一个队列,客户端能够将请求放入队列,服务器线程能够从队列中取出任务:

package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
    private List queue = new LinkedList();
    public synchronized Task getTask() {
        while(queue.size()==0) {
            try {
                this.wait();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Task)queue.remove(0);
    }
    public synchronized void putTask(Task task) {
        queue.add(task);
        this.notifyAll();
    }
}

终于到了真正的WorkerThread,这是真正执行任务的服务器线程:

package com.crackj2ee.thread;
public class WorkerThread extends Thread {
    private static int count = 0;
    private boolean busy = false;
    private boolean stop = false;
    private TaskQueue queue;
    public WorkerThread(ThreadGroup group, TaskQueue queue) {
        super(group, "worker-" + count);
        count++;
        this.queue = queue;
    }
    public void shutdown() {
        stop = true;
        this.interrupt();
        try {
            this.join();
        }
        catch(InterruptedException ie) {}
    }
    public boolean isIdle() {
        return !busy;
    }
    public void run() {
        System.out.println(getName() + " start.");        
        while(!stop) {
            Task task = queue.getTask();
            if(task!=null) {
                busy = true;
                task.execute();
                busy = false;
            }
        }
        System.out.println(getName() + " end.");
    }
}

前面已经讲过,queue.getTask()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。

最后是ThreadPool,负责管理全部的服务器线程,还能够动态增长和减小线程数:

package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
    private List threads = new LinkedList();
    private TaskQueue queue;
    public ThreadPool(TaskQueue queue) {
        super("Thread-Pool");
        this.queue = queue;
    }
    public synchronized void addWorkerThread() {
        Thread t = new WorkerThread(this, queue);
        threads.add(t);
        t.start();
    }
    public synchronized void removeWorkerThread() {
        if(threads.size()>0) {
            WorkerThread t = (WorkerThread)threads.remove(0);
            t.shutdown();
        }
    }
    public synchronized void currentStatus() {
        System.out.println("-----------------------------------------------");
        System.out.println("Thread count = " + threads.size());
        Iterator it = threads.iterator();
        while(it.hasNext()) {
            WorkerThread t = (WorkerThread)it.next();
            System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
        }
        System.out.println("-----------------------------------------------");
    }
}

currentStatus()方法是为了方便调试,打印出全部线程的当前状态。

最后,Main负责完成main()方法:

package com.crackj2ee.thread;
public class Main {
    public static void main(String[] args) {
        TaskQueue queue = new TaskQueue();
        ThreadPool pool = new ThreadPool(queue);
        for(int i=0; i<10; i++) {
            queue.putTask(new CalculateTask());
            queue.putTask(new TimerTask());
        }
        pool.addWorkerThread();
        pool.addWorkerThread();
        doSleep(8000);
        pool.currentStatus();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        pool.addWorkerThread();
        doSleep(5000);
        pool.currentStatus();
    }
    private static void doSleep(long ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

main()一开始放入了20个Task,而后动态添加了一些服务线程,并按期打印线程状态,运行结果以下:

worker-0 start.
[CalculateTask 0] start
worker-1 start.
[TimerTask 0] start
[TimerTask 0] done.
[CalculateTask 1] start
[CalculateTask 0] done.
[TimerTask 1] start
[CalculateTask 1] done.
[CalculateTask 2] start
[TimerTask 1] done.
[TimerTask 2] start
[TimerTask 2] done.
[CalculateTask 3] start
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start
worker-2 start.
[CalculateTask 4] start
worker-3 start.
[TimerTask 4] start
worker-4 start.
[CalculateTask 5] start
worker-5 start.
[TimerTask 5] start
worker-6 start.
[CalculateTask 6] start
[CalculateTask 3] done.
[TimerTask 6] start
[TimerTask 3] done.
[CalculateTask 7] start
[TimerTask 4] done.
[TimerTask 7] start
[TimerTask 5] done.
[CalculateTask 8] start
[CalculateTask 4] done.
[TimerTask 8] start
[CalculateTask 5] done.
[CalculateTask 9] start
[CalculateTask 6] done.
[TimerTask 9] start
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔细观察:一开始只有两个服务器线程,所以线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。

思考:本例的线程调度算法其实根本没有,由于这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。所以,Task调度取决于TaskQueue的getTask()方法,你能够改进这个方法,例如使用优先队列,使优先级高的任务先被执行。

若是全部的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列愈来愈长,最终会致使服务器内存耗尽。所以,能够限制 TaskQueue的等待任务数,超过最大长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE


多线程读写同一个对象的数据是很广泛的,一般,要避免读写冲突,必须保证任什么时候候仅有一个线程在写入,有线程正在读取的时候,写入操做就必须等待。简单说,就是要避免“写-写”冲突和“读-写”冲突。可是同时读是容许的,由于“读-读”不冲突,并且很安全。

要实现以上的ReadWriteLock,简单的使用synchronized就不行,咱们必须本身设计一个ReadWriteLock类,在读以前,必须先得到“读锁”,写以前,必须先得到“写锁”。举例说明:

DataHandler对象保存了一个可读写的char[]数组:

package com.crackj2ee.thread;

public class DataHandler {
    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();

    private char[] doRead() {
        char[] ret = new char[buffer.length];
        for(int i=0; i<buffer.length; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }

    private void doWrite(char[] data) {
        if(data!=null) {
            buffer = new char[data.length];
            for(int i=0; i<buffer.length; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
    }

    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合一般的状况。

为了让多线程能安全读写,咱们设计了一个ReadWriteLock:

package com.crackj2ee.thread;
public class ReadWriteLock {
    private int readingThreads = 0;
    private int writingThreads = 0;
    private int waitingThreads = 0; // waiting for write
    private boolean preferWrite = true;

    public synchronized void readLock() throws InterruptedException {
        while(writingThreads>0 || (preferWrite && waitingThreads>0))
            this.wait();
        readingThreads++;
    }

    public synchronized void readUnlock() {
        readingThreads--;
        preferWrite = true;
        notifyAll();
    }

    public synchronized void writeLock() throws InterruptedException {
        waitingThreads++;
        try {
            while(readingThreads>0 || writingThreads>0)
                this.wait();
        }
        finally {
            waitingThreads--;
        }
        writingThreads++;
    }

    public synchronized void writeUnlock() {
        writingThreads--;
        preferWrite = false;
        notifyAll();
    }
}

readLock()用于得到读锁,readUnlock()释放读锁,writeLock()和writeUnlock()同样。因为锁用完必须释放,所以,必须保证lock和unlock匹配。咱们修改DataHandler,加入ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {
    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();
    // lock:
    private ReadWriteLock lock = new ReadWriteLock();

    public char[] read(String name) throws InterruptedException {
        System.out.println(name + " waiting for read");
        lock.readLock();
        try {
            char[] data = doRead();
            System.out.println(name + " reads data: " + new String(data));
            return data;
        }
        finally {
            lock.readUnlock();
        }
    }

    public void write(String name, char[] data) throws InterruptedException {
        System.out.println(name + " waiting for write");
        lock.writeLock();
        try {
            System.out.println(name + " wrote data: " + new String(data));
            doWrite(data);
        }
        finally {
            lock.writeUnlock();
        }
    }

    private char[] doRead() {
        char[] ret = new char[buffer.length];
        for(int i=0; i<buffer.length; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }
    private void doWrite(char[] data) {
        if(data!=null) {
            buffer = new char[data.length];
            for(int i=0; i<buffer.length; i++) {
                buffer[i] = data[i];
                sleep(10);
            }
        }
    }
    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

public方法read()和write()彻底封装了底层的ReadWriteLock,所以,多线程能够安全地调用这两个方法:

// ReadingThread不断读取数据:
package com.crackj2ee.thread;
public class ReadingThread extends Thread {
    private DataHandler handler;
    public ReadingThread(DataHandler handler) {
        this.handler = handler;
    }
    public void run() {
        for(;;) {
            try {
                char[] data = handler.read(getName());
                Thread.sleep((long)(Math.random()*1000+100));
            }
            catch(InterruptedException ie) {
                break;
            }
        }
    }
}

// WritingThread不断写入数据,每次写入的都是10个相同的字符:
package com.crackj2ee.thread;
public class WritingThread extends Thread {
    private DataHandler handler;
    public WritingThread(DataHandler handler) {
        this.handler = handler;
    }
    public void run() {
        char[] data = new char[10];
        for(;;) {
            try {
                fill(data);
                handler.write(getName(), data);
                Thread.sleep((long)(Math.random()*1000+100));
            }
            catch(InterruptedException ie) {
                break;
            }
        }
    }
    // 产生一个A-Z随机字符,填入char[10]:
    private void fill(char[] data) {
        char c = (char)(Math.random()*26+'A');
        for(int i=0; i<data.length; i++)
            data[i] = c;
    }
}

最后Main负责启动这些线程:

package com.crackj2ee.thread;
public class Main {
    public static void main(String[] args) {
        DataHandler handler = new DataHandler();
        Thread[] ts = new Thread[] {
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new ReadingThread(handler),
                new WritingThread(handler),
                new WritingThread(handler)
        };
        for(int i=0; i<ts.length; i++) {
            ts[i].start();
        }
    }
}

咱们启动了5个读线程和2个写线程,运行结果以下:

Thread-0 waiting for read
Thread-1 waiting for read
Thread-2 waiting for read
Thread-3 waiting for read
Thread-4 waiting for read
Thread-5 waiting for write
Thread-6 waiting for write
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-0 reads data: AAAAAAAAAA
Thread-5 wrote data: EEEEEEEEEE
Thread-6 wrote data: MMMMMMMMMM
Thread-1 waiting for read
Thread-4 waiting for read
Thread-1 reads data: MMMMMMMMMM
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read
Thread-2 reads data: MMMMMMMMMM
Thread-0 waiting for read
Thread-0 reads data: MMMMMMMMMM
Thread-4 waiting for read
Thread-4 reads data: MMMMMMMMMM
Thread-2 waiting for read
Thread-5 waiting for write
Thread-2 reads data: MMMMMMMMMM
Thread-5 wrote data: GGGGGGGGGG
Thread-6 waiting for write
Thread-6 wrote data: AAAAAAAAAA
Thread-3 waiting for read
Thread-3 reads data: AAAAAAAAAA


能够看到,每次读/写都是完整的原子操做,由于咱们每次写入的都是10个相同字符。而且,每次读出的都是最近一次写入的内容。

若是去掉ReadWriteLock:

package com.crackj2ee.thread;
public class DataHandler {

    // store data:
    private char[] buffer = "AAAAAAAAAA".toCharArray();

    public char[] read(String name) throws InterruptedException {
        char[] data = doRead();
        System.out.println(name + " reads data: " + new String(data));
        return data;
    }
    public void write(String name, char[] data) throws InterruptedException {
        System.out.println(name + " wrote data: " + new String(data));
        doWrite(data);
    }

    private char[] doRead() {
        char[] ret = new char[10];
        for(int i=0; i<10; i++) {
            ret[i] = buffer[i];
            sleep(3);
        }
        return ret;
    }
    private void doWrite(char[] data) {
        for(int i=0; i<10; i++) {
            buffer[i] = data[i];
            sleep(10);
        }
    }
    private void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch(InterruptedException ie) {}
    }
}

运行结果以下:

Thread-5 wrote data: AAAAAAAAAA
Thread-6 wrote data: MMMMMMMMMM
Thread-0 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-2 reads data: MAAAAAAAAA
Thread-3 reads data: MAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-1 reads data: MAAAAAAAAA
Thread-0 reads data: MAAAAAAAAA
Thread-4 reads data: MAAAAAAAAA
Thread-6 wrote data: EEEEEEEEEE
Thread-3 reads data: EEEEECCCCC
Thread-4 reads data: EEEEEEEEEC
Thread-1 reads data: EEEEEEEEEE

能够看到在Thread-6写入EEEEEEEEEE的过程当中,3个线程读取的内容是不一样的。

思考

java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现本身的逻辑锁,就必须仔细设计ReadWriteLock。

Q: lock.readLock()为何不放入try{ } 内?
A: 由于readLock()会抛出InterruptedException,致使readingThreads++不执行,而readUnlock()在 finally{ } 中,致使readingThreads--执行,从而使readingThread状态出错。writeLock()也是相似的。

Q: preferWrite有用吗?
A: 若是去掉preferWrite,线程安全不受影响。可是,若是读取线程不少,上一个线程尚未读取完,下一个线程又开始读了,就致使写入线程长时间没法得到writeLock;若是写入线程等待的不少,一个接一个写,也会致使读取线程长时间没法得到readLock。preferWrite的做用是让读 /写交替执行,避免因为读线程繁忙致使写没法进行和因为写线程繁忙致使读没法进行。

Q: notifyAll()换成notify()行不行?
A: 不能够。因为preferWrite的存在,若是一个线程刚读取完毕,此时preferWrite=true,再notify(),若刚好唤醒的是一个读线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true致使该读线程继续等待,而等待写入的线程也处于wait()中,结果全部线程都处于wait ()状态,谁也没法唤醒谁。所以,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:

Thread-0 waiting for read
Thread-1 waiting for read
Thread-2 waiting for read
Thread-3 waiting for read
Thread-4 waiting for read
Thread-5 waiting for write
Thread-6 waiting for write
Thread-0 reads data: AAAAAAAAAA
Thread-4 reads data: AAAAAAAAAA
Thread-3 reads data: AAAAAAAAAA
Thread-2 reads data: AAAAAAAAAA
Thread-1 reads data: AAAAAAAAAA
Thread-5 wrote data: CCCCCCCCCC
Thread-2 waiting for read
Thread-1 waiting for read
Thread-3 waiting for read
Thread-0 waiting for read
Thread-4 waiting for read
Thread-6 wrote data: LLLLLLLLLL
Thread-5 waiting for write
Thread-6 waiting for write
Thread-2 reads data: LLLLLLLLLL
Thread-2 waiting for read
(运行到此不动了)

注意到这种死锁是因为全部线程都在等待别的线程唤醒本身,结果都没法醒过来。这和两个线程但愿得到对方已有的锁形成死锁不一样。所以多线程设计的难度远远高于单线程应用。设计模式

相关文章
相关标签/搜索