多线程(四)

1.等待/通知机制(wait/notify)java

wait使线程中止,notify使线程继续运行.数组

当方法wait()执行后,锁被自动释放,但执行完notify()方法,锁却不自动释放.dom

验证:当方法wait()执行后,锁被自动释放ide

public class Service {
    public void testMethod(Object lock){
        synchronized (lock){
            try {
                System.out.println("begin wait()");
                lock.wait();
                System.out.println(" end wait()");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ThreadA extends Thread{
    private Object lock;
    public ThreadA(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class ThreadB extends Thread{
    private Object lock;
    public ThreadB(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class Test {
    public static void main(String[] args) {
        Object lock=new Object();
        ThreadA a= new ThreadA(lock);
        a.start();
        ThreadB b=new ThreadB(lock);
        b.start();
    }
/*output:
begin wait()
begin wait()
*/
}

总结:线程a wait()以后释放锁,线程b进入,验证成立this

public class Service {
    public void testMethod(Object lock){
        synchronized (lock){
            try {
                System.out.println("begin wait() ThreadName="+Thread.currentThread().getName());
                lock.wait();
                System.out.println("end wait() ThreadName="+Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public void synNotifyMethod(Object lock){
        try {
            synchronized (lock){
                System.out.println("begin notify ThreadName="+Thread.currentThread().getName()+" time="+System.currentTimeMillis());
                lock.notify();
                Thread.sleep(5000);
                System.out.println("end notify ThreadName="+Thread.currentThread().getName()+" time="+System.currentTimeMillis());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
public class ThreadA extends Thread{
    private Object lock;
    public ThreadA(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class NotifyThread extends Thread{
    private Object lock;
    public NotifyThread(Object lock){
        super();
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.synNotifyMethod(lock);
    }
}
public class SynNotifyThread extends Thread{
    private Object lock;
    public SynNotifyThread(Object lock){
        super();
        this.lock=lock;
}

    @Override
    public void run() {
        Service service=new Service();
        service.synNotifyMethod(lock);
    }
}
public class Test {
    public static void main(String[] args) {
        Object lock=new Object();
        ThreadA a= new ThreadA(lock);
        a.start();
        NotifyThread notifyThread=new NotifyThread(lock);
        notifyThread.start();
        SynNotifyThread synNotifyThread=new SynNotifyThread(lock);
        synNotifyThread.start();
    }
/*
begin wait() ThreadName=Thread-0
begin notify ThreadName=Thread-1 time=1515776621424
end notify ThreadName=Thread-1 time=1515776626424
end wait() ThreadName=Thread-0
begin notify ThreadName=Thread-2 time=1515776626424
end notify ThreadName=Thread-2 time=1515776631425
*/
}

总结:线程a wait()以后被线程notifyThread 和synNotifyThread notify(),可是线程Thread-1没有立刻释放锁,运行完synchronized代码块才释放锁.线程

2.当interrupt()遇到wait()对象

当线程呈wait()状态时,调用线程对象的interrupt()方法会出现interruptedException()异常.ip

3.notify()一次只随机通知一个线程进行唤醒内存

public class Service {
    public void testMethod(Object lock){
        try {
            synchronized (lock) {
                System.out.println("begin wait() ThreadName=" + Thread.currentThread().getName());
                lock.wait();
                System.out.println("end wait() ThreadName=" + Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadA extends Thread{
    private Object lock;
    public ThreadA(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class ThreadB extends Thread{
    private Object lock;
    public ThreadB(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class ThreadC extends Thread{
    private Object lock;
    public ThreadC(Object lock){
        this.lock=lock;
    }

    @Override
    public void run() {
        Service service=new Service();
        service.testMethod(lock);
    }
}
public class NotifyThread extends Thread{
    private Object lock;
    public NotifyThread(Object lock){
        super();
        this.lock=lock;
    }

    @Override
    public void run() {
        synchronized (lock){
            lock.notifyAll();
        }
    }
}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Object lock=new Object();
        ThreadA a= new ThreadA(lock);
        a.start();
        ThreadB b= new ThreadB(lock);
        b.start();
        ThreadC c= new ThreadC(lock);
        c.start();
        Thread.sleep(1000);
        NotifyThread notifyThread=new NotifyThread(lock);
        notifyThread.start();
    }
/*output:
begin wait() ThreadName=Thread-0
begin wait() ThreadName=Thread-2
begin wait() ThreadName=Thread-1
end wait() ThreadName=Thread-0
end wait() ThreadName=Thread-2
*/
}

总结:notify()方法随机通知一个线程进行唤醒rem

notifyAll()是唤醒全部线程.

4.方法wait(long)
等待某一时间内是否有线程对锁进行唤醒,若是超过这个时间则自动唤醒

public class MyRunnable {
    static private Object lock = new Object();
    static private Runnable runnable1 = new Runnable() {
        @Override
        public void run() {
            try {
                synchronized (lock) {
                    System.out.println("wait begin timer=" + System.currentTimeMillis());
                    lock.wait(5000);
                    System.out.println("wait end timer=" + System.currentTimeMillis());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    static private Runnable runnable= new Runnable() {
        @Override
        public void run() {
            synchronized (lock){
                System.out.println("notify begin timer=" + System.currentTimeMillis());
                lock.notify();
                System.out.println("notify end timer=" + System.currentTimeMillis());
            }
        }
    };

    public static void main(String[] args) throws InterruptedException {
        Thread t1=new Thread(runnable1);
        t1.start();
        Thread.sleep(3000);
        Thread t2=new Thread(runnable);
        t2.start();
    }
/*
wait begin timer=1516010021128
notify begin timer=1516010024127//睡了3秒
notify end timer=1516010024128
wait end timer=1516010024128//立刻激活,输出打印
*/
}

5.生产者/消费者模式实现

//生产者
public class P {
    private String lock;
    public P(String lock){
        this.lock=lock;
    }
    public void setValue(){
        try {
            synchronized (lock){
                while(!ValueObject.value.equals("")){
                    System.out.println("生产者"+Thread.currentThread().getName()+" waiting了");
                    lock.wait();
                }
                System.out.println("生产者"+Thread.currentThread().getName()+" running了");
                String value=System.currentTimeMillis()+"_"+System.nanoTime();
                System.out.println("set的值是"+value);
                ValueObject.value=value;
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//消费者
public class C {
    private String lock;
    public C(String lock){
        this.lock=lock;
    }
    public void getValue(){
        try {
            synchronized (lock){
                while(ValueObject.value.equals("")){
                    System.out.println("消费者"+Thread.currentThread().getName()+" waiting了");
                    lock.wait();
                }
                System.out.println("消费者"+Thread.currentThread().getName()+" running了");
                System.out.println("get的值是"+ValueObject.value);
                ValueObject.value="";
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//生产线程
public class ThreadP extends Thread{
    private P p;
    public ThreadP(P p){
        super();
        this.p=p;
    }

    @Override
    public void run() {
        while(true){
            p.setValue();
        }
    }
}
//消费线程
public class ThreadC extends Thread{
    private C c;
    public ThreadC(C c){
        super();
        this.c=c;
    }

    @Override
    public void run() {
        while(true){
            c.getValue();
        }
    }
}
public class Run {
    public static void main(String[] args) throws InterruptedException {
        String lock=new String("");
        P p=new P(lock);
        C c=new C(lock);
        ThreadP[] tp=new ThreadP[2];
        ThreadC[] tc=new ThreadC[2];
        for (int i = 0; i <2 ; i++) {
            tp[i]=new ThreadP(p);
            tp[i].setName("生产者"+(i+1));
            tc[i]=new ThreadC(c);
            tc[i].setName("消费者"+(i+1));
            tp[i].start();
            tc[i].start();
        }
        Thread.sleep(5000);
        Thread[] threadArray=new Thread[Thread.currentThread().getThreadGroup().activeCount()];
        Thread.currentThread().getThreadGroup().enumerate(threadArray);//将list转成数组
        for (int i = 0; i <threadArray.length ; i++) {
            System.out.println(threadArray[i].getName()+" "+threadArray[i].getState());
        }
    }
/*//
生产者生产者2 running了
set的值是1516022907331_19063484002988
生产者生产者2 waiting了
生产者生产者1 waiting了
消费者消费者1 running了
get的值是1516022907331_19063484002988
消费者消费者1 waiting了
消费者消费者2 waiting了
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
生产者1 WAITING
消费者1 WAITING
生产者2 WAITING
消费者2 WAITING
*/
}

总结:产生缘由是由于notify()是随机通知,致使生产类没有被通知,陷入假死状态

解决方法:使用notifyAll()

一个生产者对应多个消费者:

public class MyStack {
    private List list=new ArrayList();
    synchronized public void push(){
        try {
            if(list.size()!=0){
                this.wait();
            }
            list.add("anything "+ Math.random());
            this.notify();
            System.out.println("push="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    synchronized public String pop(){
        String returnValue="";
        try {
            if(list.size()==0){
                System.out.println("pop操做中的:"+Thread.currentThread().getName()+" 线程呈wait" +
                        "状态");
                this.wait();
            }
            returnValue=""+list.get(0);
            list.remove(0);
            this.notify();
            System.out.println("pop="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
}
public class P_Thread extends Thread{
    private P p;
    public P_Thread(P p){
        super();
        this.p=p;
    }

    @Override
    public void run() {
        while(true){
            p.pushService();
        }
    }
}
public class C_Thread extends Thread{
    private C c;
    public C_Thread(C c){
        super();
        this.c=c;
    }

    @Override
    public void run() {
        while(true){
            c.popService();
        }
    }
}
public class P {
    private MyStack stack;
    public P(MyStack stack){
        this.stack=stack;
    }
    public void pushService(){
        stack.push();
    }
}
public class C {
    private MyStack stack;
    public C(MyStack stack){
        this.stack=stack;
    }
    public void popService(){
        stack.pop();
    }
}
public class Run {
    public static void main(String[] args) {
        MyStack stack=new MyStack();
        P p=new P(stack);
        C c1=new C(stack);
        C c2=new C(stack);
        C c3=new C(stack);
        C c4=new C(stack);
        C c5=new C(stack);
        P_Thread pt=new P_Thread(p);
        C_Thread ct=new C_Thread(c1);
        C_Thread ct2=new C_Thread(c2);
        C_Thread ct3=new C_Thread(c3);
        C_Thread ct4=new C_Thread(c4);
        C_Thread ct5=new C_Thread(c5);
        pt.start();
        ct.start();
        ct2.start();
        ct3.start();
        ct4.start();
        ct5.start();

    }
/*
pop操做中的:Thread-2 线程呈wait状态
pop操做中的:Thread-3 线程呈wait状态
push=1
pop=0
pop操做中的:Thread-2 线程呈wait状态
pop操做中的:Thread-4 线程呈wait状态
pop操做中的:Thread-1 线程呈wait状态
pop操做中的:Thread-5 线程呈wait状态
Exception in thread "Thread-3" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:604)
    at java.util.ArrayList.get(ArrayList.java:382)
    at mutithreading.MyStack.pop(MyStack.java:33)
    at mutithreading.C.popService(C.java:13)
    at mutithreading.C_Thread.run(C_Thread.java:16)
*/
}

总结: 产生问题的缘由,条件改变时(list.size())并无获得及时的响应,因此多个呈wait状态的线程被唤醒,继而执行list.remove(0)代码而出现异常.

这边我试了一下用volidate关键字修饰list,思路是,将list的修改强行操做到内存,让全部的线程共享list,可是实际操做结果仍是同样.

解决方法:

public class MyStack {
    private List list=new ArrayList();
    synchronized public void push(){
        try {
            while(list.size()!=0){
                this.wait();
            }
            list.add("anything "+ Math.random());
            System.out.println("list的size="+list.size());
            this.notify();
            System.out.println("push="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    synchronized public String pop(){
        String returnValue="";
        try {
            while(list.size()==0){
                System.out.println("pop操做中的:"+Thread.currentThread().getName()+" 线程呈wait" +
                        "状态");
                this.wait();
            }
            returnValue=""+list.get(0);
            list.remove(0);
            this.notify();
            System.out.println("pop="+list.size());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
/*
pop操做中的:Thread-5 线程呈wait状态
pop操做中的:Thread-3 线程呈wait状态
pop操做中的:Thread-2 线程呈wait状态
pop操做中的:Thread-4 线程呈wait状态
list的size=1
push=1
pop=0
pop操做中的:Thread-1 线程呈wait状态
pop操做中的:Thread-5 线程呈wait状态
*/
}

陷入假死状态,老办法,改成notifyAll(),顺利解决.

6.经过管道进行线程间通讯:字节流

public class WriteData {
    public void writeMethod(PipedOutputStream out){
        try {
            System.out.println("write:");
            for (int i = 0; i <300 ; i++) {
                String outData=""+(i+1);
                out.write(outData.getBytes());
                System.out.println(outData);
            }
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class ReadData {
    public void readMethod(PipedInputStream in) throws IOException {
        System.out.println("read: ");
        byte[] byteArray=new byte[20];
        int readLength=in.read(byteArray);
        while(readLength!=-1){
            String newData=new String(byteArray,0,readLength);
            System.out.println(newData);
            readLength=in.read(byteArray);
        }
        in.close();
    }
}
public class ThreadWrite extends Thread{
    private WriteData write;
    private PipedOutputStream out;
    public ThreadWrite(WriteData write,PipedOutputStream out){
        super();
        this.write=write;
        this.out=out;
    }

    @Override
    public void run() {
        write.writeMethod(out);
    }
}
public class ThreadRead extends Thread{
    private ReadData read;
    private PipedInputStream in;
    public ThreadRead(ReadData read, PipedInputStream in){
        super();
        this.read=read;
        this.in=in;
    }

    @Override
    public void run() {
        try {
            read.readMethod(in);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Run {
    public static void main(String[] args) throws IOException {
        try {
            WriteData write=new WriteData();
            ReadData read=new ReadData();
            PipedOutputStream out=new PipedOutputStream();
            PipedInputStream in =new PipedInputStream();
            out.connect(in);//使两个Stream之间产生通讯
            ThreadRead threadRead=new ThreadRead(read,in);
            threadRead.start();
            Thread.sleep(2000);
            ThreadWrite threadWrite=new ThreadWrite(write,out);
            threadWrite.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

字符流差很少.

public class ReadData {
    public void readMethod(PipedReader in) throws IOException {
        System.out.println("read: ");
        char[] byteArray=new char[20];
        int readLength=in.read(byteArray);
        while(readLength!=-1){
            String newData=new String(byteArray,0,readLength);
            System.out.println(newData);
            readLength=in.read(byteArray);
        }
        in.close();
    }
}
public class WriteData {
    public void writeMethod(PipedWriter out){
        try {
            System.out.println("write:");
            for (int i = 0; i <300 ; i++) {
                String outData=""+(i+1);
                out.write(outData);
                System.out.println(outData);
            }
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}