Java编程思想 第21章 并发

这是在2013年的笔记整理。如今从新拿出来,放在网上,从新总结下。html

两种基本的线程实现方式 以及中断

 

package thread; java

 

 

/** 编程

*小程序

* @author zjf windows

* @create_time 2013-12-18 缓存

* @use测试基本的两种线程的实现方式安全

*         测试中断多线程

*/ 并发

public class BasicThreadTest { app

 

    public static void main(String[] args) {

        Counter c1 = new Counter();

        c1.start();

        Thread c2 = new Thread(new Countable());

        c2.start();

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        //中断

        c1.interrupt();

        c2.interrupt();

          

        

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        //此时c1线程已经终止不能再次start 屡次启动一个线程是非法的。java.lang.IllegalThreadStateException

        c1.start();

    }

      

    

    /**

     *

     * @author zjf

     * @create_time 2013-12-18

     * @use Runnable接口方式的实现

     */

    static class Countable implements Runnable{

        public void run() {

            int i = 0;

            while(!Thread.currentThread().isInterrupted())

            {

                System.out.println(this.toString() + "-------------" + i);

                i ++;

            }

        }

    }

    /**

     *

     * @author zjf

     * @create_time 2013-12-18

     * @use Thread继承方式的实现

     */

    static class Counter extends Thread{

        public void run() {

            int i = 0;

            while(!Thread.currentThread().isInterrupted())

            {

                System.out.println(this.toString() + "-------------" + i);

                i ++;

            }

        }

    }

}

中断
  • public void interrupt()

中断线程。

若是线程在调用 Object 类的 wait()wait(long)wait(long, int) 方法,或者该类的 join()join(long)join(long, int)sleep(long)sleep(long, int) 方法过程当中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException

  • public static boolean interrupted()

测试当前线程是否已经中断。线程的中断状态 由该方法清除。

  • public boolean isInterrupted()

测试线程是否已经中断。线程的中断状态 不受该方法的影响。

测试睡眠被中断

sleep是静态方法。

 

package thread;

 

 

/**

*

* @author zjf

* @create_time 2013-12-18

* @use测试Sleep方法被中断

*/

public class SleepTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use测试目的:睡眠时是否能够被中断

     * @param args

     */

    public static void main(String[] args) {

        Thread t = new Thread(new Sleepable());

        t.start();

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        t.interrupt();

    }

 

    

    static class Sleepable implements Runnable{

        public void run() {

            try {

                //睡眠10可是线程开始1秒后被中断当前线程在睡眠时可以接收到中断而后退出

                Thread.sleep(10000);

            } catch (InterruptedException e) {

                //若是被中断就退出

                System.out.println("exit");//一秒后退出

            }

        }

    }

}

 

测试使用yield让步

yield是静态方法。

 

package thread;

 

public class YieldTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use测试yield方法

     * @param args

     */

    public static void main(String[] args) {

        new Thread() {

            @Override

            public void run() {

                for(int i = 1; i < 100; i++)

                {

                    System.out.println("yield-----" + i);

                    //测试结果显示使用yield让步与不使用差异不大

                    Thread.yield();

                }

            }

        }.start();

        

        new Thread() {

            @Override

            public void run() {

                for(int i = 1; i < 100; i++)

                {

                    System.out.println("notyield-----" + i);

                }

            }

        }.start();

    }

 

}

 

测试cached线程池

 

newCachedThreadPool:建立一个可根据须要建立新线程的线程池,可是在之前构造的线程可用时将重用它们。对于执行不少短时间异步任务的程序而言,这些线程池一般可提升程序性能。调用 execute 将重用之前构造的线程(若是线程可用)。若是现有线程没有可用的,则建立一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。所以,长时间保持空闲的线程池不会使用任何资源。

 

CachedThreadPool通常会建立所需数量的线程,而且会复用,这是选择的首选。

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class CachedThreadPoolTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use测试Cached线程池

     * @param args

     */

    public static void main(String[] args) {

 

        /*

         * cached线程池不能设置拥有线程的数量

         */

        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            es.execute(new Countable(i));

        }

        

        /*

         * 由于要复用线程因此线程执行完任务以后不会马上关闭而是等待一分钟(可配置)

         * 的时间若是在这一分钟期间没有新的任务要执行会自动关闭

         * shutdown标明不会再有新的任务加入因此加入shutdown代码以后任务执行以后就会关闭线程

         * 不会等待一分钟

         */

        es.shutdown();

 

    }

 

    static class Countable implements Runnable {

 

        private int i;

 

        public Countable(int i) {

            this.i = i;

        }

 

        public void run() {

 

            System.out.println("" + i + "启动的线程的ID"

                    + Thread.currentThread().getId());

            

            /**

             *输出为

                 0启动的线程的ID7

                2启动的线程的ID9

                1启动的线程的ID8

                3启动的线程的ID10

                4启动的线程的ID11

                5启动的线程的ID12

                6启动的线程的ID13

                8启动的线程的ID8

                7启动的线程的ID9

                9启动的线程的ID10

                

                可见在地8 9 10个线程的时候复用了第1 2 3个线程。

                这创建在第1 2 3个线程已经运行完的基础上。

             */

 

        }

    }

 

}

shutdown和shutdownnow

shutdown:

  • 阻止加入新的任务。
  • 结束已经完成任务的空闲线程,直到全部任务执行完毕,关闭全部线程为止。

shutdownnow:

  1. 完成shutdown的功能。
  2. 向每一个未完成的线程发布中断命令。
  3. 返回未执行的任务列表
shutdownnow

package thread;

 

import java.util.List;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class ShutdownNowTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-18

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newFixedThreadPool(3);

        for(int i = 0; i < 10; i ++)

        {

            es.execute(new Countable(i));

        }

        TimeUnit.SECONDS.sleep(1);

        //返回等待的任务列表

        List<Runnable> countList = es.shutdownNow();

        for(Runnable r : countList)

        {

            System.out.println(r.toString() + " is not done...");

        }

    }

 

}

 

class Countable implements Runnable{

    private int i;

    public Countable(int i) {

        this.i = i;

    }

    

    public int getI() {

        return i;

    }

 

    @Override

    public String toString() {

        

        return "thread, id : " + i;

    }

    

    public void run() {

        

        try {

            System.out.println(this.toString() + " is start to run...");

            TimeUnit.MILLISECONDS.sleep(500);

            System.out.println(this.toString() + " is done...");

        } catch (InterruptedException e) {

            System.out.println(this.toString() + " is interrupted...");

        }

    }

    

}

 

 

/**输出

 

thread, id : 0 is start to run...

thread, id : 1 is start to run...

thread, id : 2 is start to run...

thread, id : 0 is done...

thread, id : 1 is done...

thread, id : 2 is done...

thread, id : 3 is start to run...

thread, id : 4 is start to run...

thread, id : 5 is start to run...

thread, id : 5 is done...

thread, id : 3 is done...

thread, id : 4 is done...

thread, id : 6 is start to run...

thread, id : 6 is interrupted...

thread, id : 7 is not done...

thread, id : 8 is not done...

thread, id : 9 is not done...

 

*/

测试ThreadFactory

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadFactory;

 

public class ThreadFactoryTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use测试Cached线程池

     * @param args

     */

    public static void main(String[] args) {

 

        ThreadFactory threadFactory = new MyThreadFactory();

        ExecutorService es = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 10; i++) {

            es.execute(new Countable(i));

        }

        es.shutdown();

    }

 

    static class Countable implements Runnable {

 

        private int i;

        public Countable(int i) {

            this.i = i;

        }

        public void run() {

            System.out.println("" + i + "个任务正在运行!");

        }

    }

 

    static class MyThreadFactory implements ThreadFactory {

        private static int count = 0;

        public Thread newThread(Runnable r) {

            return new MyThread(r,count++);

        }

    };

    

    static class MyThread extends Thread

    {

        private Runnable target;

        private int count;

        public MyThread(Runnable target, int count) {

            super();

            this.target = target;

            this.count = count;

        }

        @Override

        public void run() {

            System.out.println("" + count + "个线程启动!" );

            if(target != null)

            {

                target.run();

            }

            System.out.println("" + count + "个线程结束!" );

        }

    }

}

/*

* 输出结果

        0个线程启动!

        1个线程启动!

        2个线程启动!

        3个线程启动!

        0个任务正在运行!

        1个任务正在运行!

        2个任务正在运行!

        4个线程启动!

        3个任务正在运行!

        5个线程启动!

        4个任务正在运行!

        5个任务正在运行!

        8个任务正在运行!

        6个线程启动!

        7个任务正在运行!

        7个线程启动!

        6个任务正在运行!

        9个任务正在运行!

        7个线程结束!

        0个线程结束!

        3个线程结束!

        6个线程结束!

        5个线程结束!

        1个线程结束!

        4个线程结束!

        2个线程结束!

        

    证实:    Countable中的run方法被执行了10

            MyThread中的run方法只被执行了9

            缘由:CachedThreadPool在须要的时候会调用ThreadFactorynewThread方法可是也会用到缓存

            */

测试FixedThreadPool

 

newFixedThreadPool:建立一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。若是在关闭前的执行期间因为失败而致使任何线程终止,那么一个新线程将代替它执行后续的任务(若是须要)。在某个线程被显式地关闭以前,池中的线程将一直存在。(这与cacheThreadPool不同)

 

 

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class FixedThreadPoolTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-18

     * @use测试Fixed线程池

     * @param args

     */

    public static void main(String[] args) {

 

        

        ExecutorService es = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {

            es.execute(new Countable(i));

        }

        es.shutdown();

 

    }

 

    static class Countable implements Runnable {

 

        private int i;

 

        public Countable(int i) {

            this.i = i;

        }

 

        public void run() {

 

            System.out.println("" + i + "启动的线程的ID"

                    + Thread.currentThread().getId());

 

        }

    }

 

}

/*

0启动的线程的ID7

2启动的线程的ID9

1启动的线程的ID8

3启动的线程的ID7

4启动的线程的ID9

*/

 

SingleThreadExecutor

newSingleThreadExecutor():

建立一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,若是由于在关闭前的执行期间出现失败而终止了此单个线程,那么若是须要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,而且在任意给定的时间不会有多个线程是活动的。与其余等效的 newFixedThreadPool(1) 不一样,可保证无需从新配置此方法所返回的执行程序便可使用其余的线程(备注:应该是内部实现的差别 外部的使用没什么差别)。

 

由于一个任务执行完毕以后,线程才会空闲下来去执行另一个任务,因此能够保证顺序执行任务。

测试ScheduledExecutorService

scheduled

adj. 预约的;已排程的

v. 把…列表;把…列入计划;安排(schedule的过去分词)

 

上面演示的线程执行器或者线程池都是ExecutorService,下面看看ScheduledExecutorService。ScheduledExecutorService集成而且扩展了ExecutorService,可安排在给定的延迟后运行或按期执行的命令。

 

package thread;

 

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

 

public class SingleThreadScheduledTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use测试SingleThreadScheduled线程池

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ScheduledExecutorService es = Executors

                .newSingleThreadScheduledExecutor();

        //ScheduledThreadPool须要传参控制池中所保存的线程数(即便线程是空闲的也包括在内)

        //ScheduledExecutorService es = Executors.newScheduledThreadPool(1);

        // 给定时间延迟后执行

        // es.schedule(new Countable(), 1, TimeUnit.SECONDS);

        // 传入一个任务而后按照给定频率循环执行在每次任务开始执行的时间点之间存在固定间隔

        //es.scheduleAtFixedRate(new Countable(), 2, 1, TimeUnit.SECONDS);

        // 传入一个任务而后按照给定频率循环执行每一次执行终止和下一次执行开始之间都存在给定的间隔

        es.scheduleWithFixedDelay(new Countable(), 2, 1, TimeUnit.SECONDS);

        // 若是没有这句代码将没有任何反应,由于----|

        // 下面的shutdown代码将会阻止执行新加入任务包含延迟执行而未执行的任务

        TimeUnit.SECONDS.sleep(10);

        es.shutdown();

    }

 

    static class Countable implements Runnable {

 

        public void run() {

            System.out.println("一个任务运行开始!");

            try {

                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {

            }

            System.out.println("一个任务运行结束!");

        }

    }

 

}

 

package thread;

 

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

 

public class ScheduledThreadPoolTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use测试SingleThreadScheduled线程池

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        //ScheduledThreadPool须要传参控制池中所保存的线程数(即便线程是空闲的也包括在内)

        ScheduledExecutorService es = Executors.newScheduledThreadPool(1);

        es.scheduleAtFixedRate(new Countable(), 0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);

        es.shutdown();

    }

 

    static class Countable implements Runnable {

 

        public void run() {

            System.out.println("一个任务运行开始!");

            try {

                TimeUnit.SECONDS.sleep(3);

            } catch (InterruptedException e) {

            }

            System.out.println("一个任务运行结束!");

        }

    }

}

/*

* 线程池中只有一个线程 + 每隔1秒要执行一个任务 + 一个任务要运行3秒才结束

* 结果是每隔3秒才能执行一次

*/

 

优先级

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class PriorTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use测试优先级

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                Thread.currentThread().setPriority(Thread.MAX_PRIORITY);

                int i = 0;

                while (!Thread.currentThread().isInterrupted()) {

                    System.out.println("MAX_PRIORITY" + i);

                    i++;

                }

            }

        });

        

        es.execute(new Runnable() {

            public void run() {

                Thread.currentThread().setPriority(Thread.MIN_PRIORITY);

                int i = 0;

                while (!Thread.currentThread().isInterrupted()) {

                    System.out.println("MIN_PRIORITY" + i);

                    i++;

                }

            }

        });

        TimeUnit.SECONDS.sleep(1);

        es.shutdownNow();

    }

}

/*

* 最后一次输出结果是

*     MAX_PRIORITY32525

*    MIN_PRIORITY31289

* 差异并不大调整优先级适用于作适当的强弱调整不能用来控制流程走势

* windows7个优先级 java能够设置10个优先级

*/

测试Callable

package thread;

 

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

 

public class CallableTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use

     * @param args

     */

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

        //CallableFuture都是泛型设计的 T表明返回值的类型

        Future<String> future = es.submit(new Callable<String>() {

            //call方法返回T 而且能够抛出异常到主线程

            public String call() throws Exception {

                System.out.println("running.");

                TimeUnit.SECONDS.sleep(1);

                return "hello world!";

            }

        });

 

        es.shutdown();

        

        //若是被调用线程尚未完成 get方法将阻塞也可使用isDone()方法来判断是否完成

        try {

            System.out.println(future.get());

        } catch (InterruptedException e) {

            e.printStackTrace();//异常处理

        } catch (ExecutionException e) {

            e.printStackTrace();//异常处理

        }

    }

 

}

submit解析

package thread;

 

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

public class SubmitTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-18

     * @use

     * @param args

     */

 

    /*

     * submit方法提交了一个任务给es去执行 es将分配一个线程来执行若是遇到ruturn或者抛出了异常信息都将记录到Future对象中

     * 注意异常不会马上抛出只是记录到future 在调用futureget方法时候才抛出

     */

 

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

 

        // 方式1

        Future<String> future1 = es.submit(new Callable<String>() {

            public String call() throws Exception {

                return "done";

            }

        });

 

        try {

            future1.get();

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

 

        // 方式2 这种方式的get永远为null 可是能够抛出异常

        Future<?> future2 = es.submit(new Runnable() {

 

            public void run() {

 

            }

        });

 

        try {

            future2.get();

        } catch (InterruptedException e) {

 

            e.printStackTrace();

        } catch (ExecutionException e) {

 

            e.printStackTrace();

        }

 

        // 方式3 由于run方法是void即便加上String.class 只能得到一个 Future<String>对象而已 get的结果仍然是String型的null

        es.submit(new Runnable() {

            public void run() {

                

            }

        }, String.class);

    }

 

}

join方法

若是在一个线程的run方法中调用t.join,那么将会在t执行结束以后才会继续当前线程。

 void

join()
          等待该线程终止。

 void

join(long millis)
          等待该线程终止的时间最长为 millis 毫秒。

Join抛出InterruptedException。能够被中断。

package thread;

 

public class JoinTest {

 

    public static void main(String[] args) {

        Thrd thrd = new Thrd();

        thrd.start();

        try {

            thrd.join();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("after join");

        System.out.println("exit");

    }

 

    static class Thrd extends Thread {

        @Override

        public void run() {

            for (int i = 0; i < 10; i++) {

                System.out.println("running " + i);

                try {

                    Thread.sleep(1000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

}

 

未捕捉异常处理器

经过futureget方法也能够获取异常不知道这两种方式有何差异?

 

package thread;

 

import java.lang.Thread.UncaughtExceptionHandler;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadFactory;

 

public class UncaughtExceptionHandlerTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-23

     * @use

     * @param args

     */

    public static void main(String[] args) {

 

        ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() {

            public Thread newThread(Runnable r) {

                Thread t = new Thread(r);

                t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {

                    public void uncaughtException(Thread t, Throwable e) {

                        System.out.println("线程" + t.getId() + "发生了异常:"

                                + e.getMessage());

                    }

                });

                return t;

            }

        });

 

        es.execute(new Runnable() {

            public void run() {

                throw new RuntimeException("自定义异常");

            }

        });

 

        es.shutdown();

    }

}

经过Funture来捕捉异常:

package thread;

 

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class ExceptionTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-18

     * @use

     * @param args

     */

    public static void main(String[] args) {

 

        ExecutorService es = Executors.newCachedThreadPool();

        try {

            es.submit(new Runnable() {

                public void run() {

                    throw new RuntimeException("error");

                }

            }).get();

        } catch (InterruptedException e) {

            

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

            System.out.println("-------------------------------------------------------------------------------------------------------------");

            e.getCause().printStackTrace();

        }

        es.shutdown();

    }

}

java.util.concurrent.ExecutionException: java.lang.RuntimeException: error

    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:205)

    at java.util.concurrent.FutureTask.get(FutureTask.java:80)

    at thread.ExceptionTest.main(ExceptionTest.java:23)

Caused by: java.lang.RuntimeException: error

    at thread.ExceptionTest$1.run(ExceptionTest.java:21)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)

    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)

    at java.util.concurrent.FutureTask.run(FutureTask.java:123)

    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)

    at java.lang.Thread.run(Thread.java:595)

-------------------------------------------------------------------------------------------------------------

java.lang.RuntimeException: error

    at thread.ExceptionTest$1.run(ExceptionTest.java:21)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)

    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:269)

    at java.util.concurrent.FutureTask.run(FutureTask.java:123)

    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)

    at java.lang.Thread.run(Thread.java:595)

 

20131224 周二

java编程思想 第21章 并发

监视器

内部的默认锁:对象内部的锁 用于对象synchronized方法。class内部的锁用于类的static synchronized方法。

何时使用同步

对于共享资源的访问:

若是一个线程正在写入一个数据,而这个数据有可能会被另一个线程读取,那么当前写入操做应该同步。

若是一个线程正在读取一个数据,而这个数据有可能以前被另一个线程写入,那么当前读取操做应该同步。

ReentrantLock

可重入锁,累计进入锁定数值累计,须要屡次解锁才能完全解锁。

保持计数:当前线程重入锁的次数。

默认为非公平锁:有一个构造函数能够接受公平参数,若是设置为true,将会尽力保证将锁资源分配给等待时间最长的线程以保证公平。若是使用无参构造函数,那么将采用非公平锁。

lock():尝试获取锁,成功后将保持计数+1,若是锁被另一个线程持有,将等待。

lockInterruptibly():和lock用法意同样,区别是它在获取锁的等待过程当中能够被中断。

tryLock():和lock的差异:1.它将忽略公平设置。永远不公平。2. 若是锁被另一个线程持有,当即返回或者按照传递的等待时间超时后返回。

unlock():若是当前线程是此锁定全部者,则将保持计数减 1。若是保持计数如今为 0,则释放该锁定。

 

何时须要使用锁

使用默认的synchronized方法,或者synchronized块,是基于默认的ReentrantLock实现的。可是ReentrantLock的功能要更多。并且不带参数的synchronized是针对当前对象或者当前类的默认锁的,若是一个类有多个方法要同步,可是不是每一个方法都相互牵制,那么应该使用lock来区别对待。

原子操做

对非long和double以外的基本类型的读取和赋值操做时原子操做

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class AtomiTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newCachedThreadPool();

        for(int i =0; i <runCount; i++)

        {

            //新建100个线程来执行countAdd的操做

            es.execute(new Runnable(){

                public void run() {

                    for(int i =0; i <100; i++)

                    {

                        countAdd();

                    }

                    //标示当前线程已经运行完毕

                    iAmDone();

                }});

        }

        es.shutdown();

        //全部线程运行完毕后打印出结果

        while(!isAllDone())

        {

            TimeUnit.MILLISECONDS.sleep(500);

        }

        System.out.println(getCount());

    }

 

    private static int count = 0;

    

    private static int done = 0;

    

    private static int runCount = 100;

    

    private static Lock countLock = new ReentrantLock();

    

    //此处不加上synchronized 结果将不是10000(比10000小)

    public synchronized static void iAmDone()

    {

        done ++;

    }

    

    public synchronized static boolean isAllDone()

    {

        return done == 100;

    }

    

    public static void countAdd()

    {

        //使用lock 不在方法中加synchronized 这样不会与上面的done方法公用thislock 能够提高性能

        countLock.lock();

        count ++;

        countLock.unlock();

    }

    

    public static int getCount()

    {

        return count;

    }

}

 

原子性和可视性

在多核处理器中,多任务被分配到多核上去处理,一个核上的更改不会便可刷新到其余核。加上volatile关键字能够保证可视性。

java能够保证对除了long和double以外的基本类型的简单的赋值和读取操做的原子性。可是不能保证可视性。

若是加上了volatile关键字,不只能够保证可视性,同时也能够保证long和double的原子性。

synchronized能够保证可视性。

若是对于基本类型(不包含long和double)执行简单的(一句代码的)读写操做(比较抽象,++就不是),能够保证原子性和可视性。可是既然是一句话那么不如使用sychronized,也不会耗费多少资源,并且稳定。

对原子性的误用

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class UseAtomiWrong {

 

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     */

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < 100; i++) {

            es.execute(new Runnable() {

                public void run() {

                    for (int i = 0; i < 100; i++) {

                        add();

                    }

                }

            });

        }

        es.execute(new Runnable(){

            public void run() {

                int i = getEvent();

                while(i%2 == 0)

                {

                    i = getEvent();

                }

                System.out.println(i);//打印出了奇数

            }});

 

        es.shutdown();

    }

 

    public volatile static int even = 0;

 

    //虽然对even加上了volatile 可是可能会读到只执行了一次even++以后的不稳定状态仍是要加synchronized

    public static int getEvent()

    {

        return even;

    }

    public synchronized static void add() {

        even++;

        even++;

    }

}

原子类

AtomicBooleanAtomicIntegerAtomicLongAtomicReference 的实例各自提供对相应类型单个变量的访问和更新。每一个类也为该类型提供适当的实用工具方法。例如,类 AtomicLong 和 AtomicInteger 提供了原子增量方法。一个应用程序将按如下方式生成序列号:

class Sequencer {

private AtomicLong sequenceNumber = new AtomicLong(0);

public long next() { return sequenceNumber.getAndIncrement(); }

}

原子访问和更新的内存效果通常遵循如下可变规则:

get 具备读取 volatile 变量的内存效果。

set 具备写入(分配) volatile 变量的内存效果。

weakCompareAndSet 以原子方式读取和有条件地写入变量,并对于该变量上的其余内存操做进行排序,不然将充当普通的非可变内存操做。

compareAndSet 和全部其余的读取和更新操做(如 getAndIncrement)都有读取和写入 volatile 变量的内存效果。

临界区

Object obj = new Object();

        Lock lock = new ReentrantLock();

        //使用当前对象的默认所

        synchronized (this) {

            //代码

        }

        //使用其余对象的默认锁

        synchronized (obj) {

            //代码

        }

        //使用指定锁

        synchronized (lock) {

            //代码

        }

判断ExecutorService中的全部任务是否完成

ExecutorService.isTerminated():

Returns true if all tasks have completed following shut down. Note that isTerminated is never true unless either shutdown or shutdownNow was called first.

 

ExecutorService.awaitTermination(long timeout, TimeUnit unit) throws InterruptedException:

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

 

线程状态

新建:最初创建须要经历的一个极其短暂的临时状态。

就绪:只要给了时间片就能够运行。

阻塞:sleep wait或者等待锁资源的时候处于的状态。

死亡。运行完成的线程进入死亡状态。不能够再运行其余任务。

关闭线程的方法:

经过interrupt来结束线程。es.shutdownnow能够给全部线程发送中断指令。若是须要对某一个线程中断,可使用Future。如:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class FutureInterruptTest {

 

    public static Lock lock = new ReentrantLock();

    

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                lock.lock();

                System.out.println("locking...");

                try {

                    //锁定5秒钟

                    TimeUnit.SECONDS.sleep(5);

                } catch (InterruptedException e) {

                    

                }

                lock.unlock();

            }

        });

        

        Future<?> future = es.submit(new Runnable() {

            public void run() {

                try {

                    //陷入等待锁的状态

                    lock.lockInterruptibly();

                    lock.unlock();

                } catch (InterruptedException e) {

                    

                }

                System.out.println("done...");

            }

        });

        

        TimeUnit.SECONDS.sleep(1);

        //cancel方法若是线程时新建未运行状态那么就结束它若是已经运行那么中断它

        //若是把上面的lock.lockInterruptibly();改成lock那么将接收不到中断响应,直到得到锁。

        future.cancel(true);

    }

}

 

使用volatile的boolean变量:

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class TerminateThreadUserBoolean {

 

    public static volatile boolean stop = false;

 

    /**

     * @author zjf

     * @create_time 2013-12-25

     * @use

     * @param args

     * @throws InterruptedException

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {

            es.execute(new Runnable() {

                public void run() {

                    

                    

                    while(!stop)

                    {

                        System.out.println("running...");

                        try {

                            TimeUnit.SECONDS.sleep(1);

                        } catch (InterruptedException e) {

                            System.out.println("InterruptedException");

                        }

                        System.out.println("runed");

                    }

                    

                }

            });

        }

        TimeUnit.SECONDS.sleep(3);

        stop = true;

        System.out.println("try to shutdown");

        // shutdown以后才能测试awaitTermination

        es.shutdown();

        boolean success = es.awaitTermination(1, TimeUnit.SECONDS);

        if(success)

        {

            System.out.println("全部线程已经关闭..");

        }

        else {

            System.out.println("部分线程没有关闭..");

        }

        

    }

 

}

 

中断循环线程:

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class ThreadCloseTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-21

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                //try放在while的外层

                try {

                    //while中检测中断

                    while(!Thread.interrupted())

                    {

                        System.out.println("i am running");

                        TimeUnit.MILLISECONDS.sleep(500);

                    }

                } catch (InterruptedException e) {

                    

                }

                finally

                {

                    System.out.println("i am interrupted");

                }

            }

        });

        TimeUnit.SECONDS.sleep(3);

        es.shutdownNow();

    }

 

}

 

synchronized使用默认lock.lock;在等待默认锁的过程当中不能被中断。
interrupt

public void interrupt()

中断线程。

public static boolean interrupted()

测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,若是连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态以后,且第二次调用检验完中断状态前,当前线程再次中断的状况除外)。

public boolean isInterrupted()

测试线程是否已经中断。线程的中断状态 不受该方法的影响。

备注:若是使用执行器,在每一个任务结束以后 ,将会自动将线程的中断状态清除,而后再去执行下一个任务。示例代码:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

 

public class InterruptTest {

 

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newSingleThreadExecutor();

        Future<?> future = es.submit(new Runnable() {

 

            public void run() {

                //只判断不清除中断

                while(!Thread.currentThread().isInterrupted())

                {

                    System.out.println("1st running");

                }

            }

        });

        TimeUnit.SECONDS.sleep(1);

        //发送中断

        future.cancel(true);

        es.execute(new Runnable() {

 

            public void run() {

                while(!Thread.currentThread().isInterrupted())

                {

                    //若是中断标志没有清楚将不会打印出下面代码

                    System.out.println("2st running");

                }

            }

        });

        es.shutdown();

    }

 

}

Wait和notify

锁和同步块是用来解决线程互斥的问题。wait和notify是用来解决线程协做的问题。

wait和notify是针对一个锁的,wait和notify的对象是这个锁上的其余等待对象。

wait方法能够被中断。notify不会等待,因此不须要被中断。

sleep和yield不会释放锁(它们跟锁没有关系,只跟线程有关),而wait和notify必须放在sychronized块内,由于跟锁是关联在一块儿的。wait将释放锁,而后等待。

notify会唤醒在这个锁上等待的其余程序。可是不会释放锁。直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其余全部线程进行竞争;例如,唤醒的线程在获取notify线程释放的锁方面没有特权。

 

wait(long timeout):此方法致使当前线程(称之为 T将其自身放置在对象的等待集中,而后放弃此对象上的全部同步要求(包含放弃锁)。出于线程调度目的,线程 T 被禁用,且处于休眠状态,直到发生如下四种状况之一:
  • 其余某个线程调用此对象的 notify 方法,而且线程 T 碰巧被任选为被唤醒的线程。
  • 其余某个线程调用此对象的 notifyAll 方法。
  • 其余某个线程中断线程 T。
  • 已经到达指定的实际时间。可是,若是 timeout 为零,则不考虑实际时间,该线程将一直等待,直到得到通知。

而后,从对象的等待集中删除线程 T,并从新进行线程调度。而后,该线程以常规方式与其余线程竞争,以得到在该对象上同步的权利;一旦得到对该对象的控制权,该对象上的全部其同步声明都将被还原到之前的状态 - 这就是调用 wait 方法时的状况。而后,线程 T 从 wait 方法的调用中返回。因此,从 wait 方法返回时,该对象和线程 T 的同步状态与调用 wait 方法时的状况彻底相同。

如:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class NotifyTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-30

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        final NotifyTest t = new NotifyTest();

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

 

            public void run() {

                try {

                    t.testWait();

                } catch (InterruptedException e) {

                    

                    e.printStackTrace();

                }

            }

        });

          

        

        es.execute(new Runnable() {

 

            public void run() {

                t.testNotify();

            }

        });

        

    }

 

    public synchronized void testWait() throws InterruptedException {

        System.out.println("pre wait");

        wait();

        System.out.println("after wait");

    }

 

    public synchronized void testNotify() {

        System.out.println("pre notify");

        //虽然唤醒了testWait 可是没有释放所资源 testWait仍然没法运行

        notify();

        try {

            //20秒以后程序执行完毕而后释放了锁这时才会输出 "after wait"

            TimeUnit.SECONDS.sleep(20);

        } catch (InterruptedException e) {

            

            e.printStackTrace();

        }

        System.out.println("after notify");

    }

}

Wait的简单例子

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class WaxTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-30

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        Car car = new Car();

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Waxing(car));

        es.execute(new Buffering(car));

        TimeUnit.SECONDS.sleep(6);

        es.shutdownNow();

    }

 

    static class Car{

        private boolean isWaxOn = false;

        

        //通常将InterruptedException抛出到任务的run方法中去控制

        //这里不加synchronized也能够应为它是在synchronizedwaxing方法中被调用的

        public synchronized void waitForWaxing() throws InterruptedException

        {

            //使用while

            while(isWaxOn == true)

            {

                wait();

            }

        }

        

        public synchronized void waitForBuffing() throws InterruptedException

        {

            //使用while

            while(isWaxOn == false)

            {

                wait();

            }

        }

          

        

        public synchronized void waxing() throws InterruptedException

        {

            waitForWaxing();

            TimeUnit.MILLISECONDS.sleep(200);

            System.out.println("waxing on");

            isWaxOn = true;

            notifyAll();

        }

        

        public synchronized void buffing() throws InterruptedException

        {

            waitForBuffing();

            TimeUnit.MILLISECONDS.sleep(200);

            System.out.println("buffing over");

            isWaxOn = false;

            notifyAll();

        }

    }

    

    //涂蜡任务

    static class Waxing implements Runnable{

 

        private Car car;

        

        public Waxing(Car car) {

            super();

            this.car = car;

        }

 

        public void run() {

            try {

                while(!Thread.interrupted())

                {

                    car.waxing();

                }

            } catch (InterruptedException e) {

                

            }

        }

        

    }

    

    //抛光任务

    static class Buffering implements Runnable{

 

        private Car car;

        

        public Buffering(Car car) {

            super();

            this.car = car;

        }

 

        public void run() {

            try {

                while(!Thread.interrupted())

                {

                    car.buffing();

                }

            } catch (InterruptedException e) {

                

            }

        }

    }

    

}

 

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class RestanurantTest {

 

    /**

     * @author zjf

     * @create_time 2013-12-31

     * @use

     * @param args

     */

    public static void main(String[] args) {

        new Restanurant();

    }

 

    static class Meal {

        private int orderNumber;

 

        public Meal(int orderNumber) {

            super();

            this.orderNumber = orderNumber;

        }

 

        @Override

        public String toString() {

 

            return "Meal:" + orderNumber;

        }

    }

 

    static class Restanurant {

        Meal meal;

        WaitPerson waiter = new WaitPerson(this);

        Chef chef = new Chef(this);

        ExecutorService es = Executors.newCachedThreadPool();

 

        public Restanurant() {

            es.execute(waiter);

            es.execute(chef);

            es.shutdown();

        }

    }

 

    static class WaitPerson implements Runnable {

 

        private Restanurant restanurant;

 

        public WaitPerson(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    //使用共有的restanurant来控制同步

                    synchronized (restanurant) {

                        while (restanurant.meal == null) {

                            //由于是synchronizedrestanurant 因此wait方法也是调用restanurant

                            restanurant.wait();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = null;

                        System.out.println("服务员上餐结束!");

                        restanurant.notifyAll();

                    }

 

                }

            } catch (InterruptedException e) {

 

            }

 

        }

 

    }

 

    static class Chef implements Runnable {

 

        private Restanurant restanurant;

 

        public Chef(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        private int orderNumber = 0;

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    synchronized (restanurant) {

                        while (restanurant.meal != null) {

                            restanurant.wait();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = new Meal(++orderNumber);

                        System.out.println("厨师作饭完毕!");

                        restanurant.notifyAll();

                        if(orderNumber >= 10)

                        {

                            restanurant.es.shutdownNow();

                        }

                    }

                }

            } catch (InterruptedException e) {

 

            }

        }

 

    }

}

 

使用显式的condition

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

public class RestanurantTest1 {

 

    /**

     * @author zjf

     * @create_time 2013-12-31

     * @use

     * @param args

     */

    public static void main(String[] args) {

        new Restanurant();

    }

 

    static class Meal {

        private int orderNumber;

 

        public Meal(int orderNumber) {

            super();

            this.orderNumber = orderNumber;

        }

 

        @Override

        public String toString() {

 

            return "Meal:" + orderNumber;

        }

    }

 

    static class Restanurant {

        Lock mealLock = new ReentrantLock();

        Condition condition = mealLock.newCondition();

        Meal meal;

        WaitPerson waiter = new WaitPerson(this);

        Chef chef = new Chef(this);

        ExecutorService es = Executors.newCachedThreadPool();

 

        public Restanurant() {

            es.execute(waiter);

            es.execute(chef);

            es.shutdown();

        }

    }

 

    static class WaitPerson implements Runnable {

 

        private Restanurant restanurant;

 

        public WaitPerson(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    restanurant.mealLock.lockInterruptibly();

                        while (restanurant.meal == null) {

                            restanurant.condition.await();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = null;

                        System.out.println("服务员上餐结束!");

                        restanurant.condition.signalAll();

                        restanurant.mealLock.unlock();

 

                }

            } catch (InterruptedException e) {

 

            }

 

        }

 

    }

 

    static class Chef implements Runnable {

 

        private Restanurant restanurant;

 

        public Chef(Restanurant restanurant) {

            super();

            this.restanurant = restanurant;

        }

 

        private int orderNumber = 0;

 

        public void run() {

            try {

                while (!Thread.interrupted()) {

                    restanurant.mealLock.lockInterruptibly();

                        while (restanurant.meal != null) {

                            restanurant.condition.await();

                        }

                        TimeUnit.MILLISECONDS.sleep(200);

                        restanurant.meal = new Meal(++orderNumber);

                        System.out.println("厨师作饭完毕!");

                        restanurant.condition.signalAll();

                        if(orderNumber >= 10)

                        {

                            restanurant.es.shutdownNow();

                        }

                        restanurant.mealLock.unlock();

                }

            } catch (InterruptedException e) {

 

            }

        }

 

    }

}

BlockingQueue

接口 BlockingQueue<E>

E take()

throws InterruptedException

检索并移除此队列的头部,若是此队列不存在任何元素,则一直等待。

void put(E o)

throws InterruptedException

将指定元素添加到此队列尾部,若是没有可用空间,将一直等待(若是有必要)。

实现类:ArrayBlockingQueue<E> 固定数目, LinkedBlockingQueue<E>不固定数目。

20140224 周一

java编程思想 第21章 并发

 

使用管道在线程间传递数据

package thread;

 

import java.io.IOException;

import java.io.PipedReader;

import java.io.PipedWriter;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

public class PipedReaderTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-24

     * @use

     * @param args

     * @throws IOException

     * @throws InterruptedException

     */

    public static void main(String[] args) throws IOException,

            InterruptedException {

 

        ExecutorService es = Executors.newCachedThreadPool();

        final PipedWriter writer = new PipedWriter();

        final PipedReader reader = new PipedReader();

        //两个关联上

        writer.connect(reader);

        es.execute(new Runnable() {

            public void run() {

                char[] strs = "my name is zjf".toCharArray();

                try {

                    for (char c : strs) {

                        writer.write(c);

                        TimeUnit.MILLISECONDS.sleep(300);

                    }

 

                } catch (IOException e) {

 

                    e.printStackTrace();

                } catch (InterruptedException e) {

 

                    e.printStackTrace();

                } finally {

                    try {

                        //输出完成后关闭writer

                        writer.close();

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }

        });

 

        es.execute(new Runnable() {

 

            public void run() {

                try {

                    while (!Thread.interrupted()) {

                        int c;

                        //writer关闭以后将会获取-1 循环被终止陷入外层interruptedwhile循环中

                        while ((c = reader.read()) != -1) {

                            System.out.println((char) c);

                        }

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        });

 

        //5秒后发送中断指令

        TimeUnit.SECONDS.sleep(5);

        es.shutdownNow();

    }

}

 

20140225 周二

java编程思想 第21章 并发

CountDownLatch

用给定的计数 初始化 CountDownLatch。因为调用了 countDown() 方法,因此在当前计数到达零以前,await 方法会一直受阻塞。以后,会释放全部等待的线程,await 的全部后续调用都将当即返回。这种现象只出现一次——计数没法被重置。若是须要重置计数,请考虑使用 CyclicBarrier

CountDownLatch 是一个通用同步工具,它有不少用途。将计数 1 初始化的 CountDownLatch 用做一个简单的开/关锁存器,或入口:在经过调用 countDown() 的线程打开入口前,全部调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可使一个线程在 N 个线程完成某项操做以前一直等待,或者使其在某项操做完成 N 次以前一直等待。

 

示例用法: 下面给出了两个类,其中一组 worker 线程使用了两个倒计数锁存器:

第一个类是一个启动信号,在 driver 为继续执行 worker 作好准备以前,它会阻止全部的 worker 继续执行。

第二个类是一个完成信号,它容许 driver 在完成全部 worker 以前一直等待。

class Driver { // ...

void main() throws InterruptedException {

CountDownLatch startSignal = new CountDownLatch(1);

CountDownLatch doneSignal = new CountDownLatch(N);

 

for (int i = 0; i < N; ++i) // create and start threads

new Thread(new Worker(startSignal, doneSignal)).start();

 

doSomethingElse(); // don't let run yet

startSignal.countDown(); // let all threads proceed

doSomethingElse();

doneSignal.await(); // wait for all to finish

}

}

 

class Worker implements Runnable {

private final CountDownLatch startSignal;

private final CountDownLatch doneSignal;

Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {

this.startSignal = startSignal;

this.doneSignal = doneSignal;

}

public void run() {

try {

startSignal.await();

doWork();

doneSignal.countDown();

} catch (InterruptedException ex) {} // return;

}

 

void doWork() { ... }

}

 

 

另外一种典型用法是,将一个问题分红 N 个部分,用执行每一个部分并让锁存器倒计数的 Runnable 来描述每一个部分,而后将全部 Runnable 加入到 Executor 队列。当全部的子部分完成后,协调线程就可以经过 await。(当线程必须用这种方法反复倒计数时,可改成使用 CyclicBarrier。)

class Driver2 { // ...

void main() throws InterruptedException {

CountDownLatch doneSignal = new CountDownLatch(N);

Executor e = ...

 

for (int i = 0; i < N; ++i) // create and start threads

e.execute(new WorkerRunnable(doneSignal, i));

 

doneSignal.await(); // wait for all to finish

}

}

 

class WorkerRunnable implements Runnable {

private final CountDownLatch doneSignal;

private final int i;

WorkerRunnable(CountDownLatch doneSignal, int i) {

this.doneSignal = doneSignal;

this.i = i;

}

public void run() {

try {

doWork(i);

doneSignal.countDown();

} catch (InterruptedException ex) {} // return;

}

 

void doWork() { ... }

}

 

 

个人例子:

 

package thread;

 

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class CountDownLatchTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-25

     * @use计算1 + 2 + 3 + ... + 1000000000

     * @param args

     */

    public static void main(String[] args) {

        long startTime = System.currentTimeMillis();

        long sum = sum(1, 1000000000, 10);

        long endTime = System.currentTimeMillis();

        System.out.println("计算结果是" + sum + ",耗时" + (endTime - startTime) + "毫秒!");

    }

 

    public static long sum(int start, int end, int concurrentSize) {

        ConcurrentSumer sumer = new ConcurrentSumer(start, end, concurrentSize);

        return sumer.sum();

    }

}

 

class ConcurrentSumer {

    private long sum = 0;

    private int start;

    private int end;

    private int concurrentSize;

    CountDownLatch countDownLatch;

    

    public ConcurrentSumer(int start, int end, int concurrentSize) {

        super();

        this.start = start;

        this.end = end;

        this.concurrentSize = concurrentSize;

        countDownLatch = new CountDownLatch(concurrentSize);

    }

    

    private synchronized void addSum(long add)

    {

        sum += add;

    }

    

    public long sum() {

        ExecutorService es = Executors.newCachedThreadPool();

        int extend = (end - start)/concurrentSize +1;

        while(start <= end)

        {

            es.execute(new SumTask(start,(start + extend) > end ? end : (start + extend)));

            start = start + extend + 1;

        }

        es.shutdown();

        try {

            //等待全部任务完成

            countDownLatch.await();

            System.out.println("全部任务已经完成...");

        } catch (InterruptedException e) {

            //若是没有等到全部任务都完成就被中断那么返回0

            sum = 0;

        }

        return sum;

    }

 

    class SumTask implements Runnable {

        private int st;

        private int ed;

        

        public SumTask(int st, int ed) {

            super();

            this.st = st;

            this.ed = ed;

        }

 

        public void run() {

            long s = 0;

            for(int i = st; i <= ed; i++ )

            {

                s += i;

            }

            addSum(s);

            System.out.println("一个线程已经完成...");

            countDownLatch.countDown();

        }

    }

 

}

 

CyclicBarrier

cyclic ['saiklik]

adj.

1. 周期的,构成周期的

2. 循环的,轮转的;往复运动的

 

barrier ['bæriə]

n.

1. (阻碍通道的)障碍物,屏障(如栅栏、挡板、挡墙、壁垒、障壁、十字转门等)

 

 

一个同步辅助类,它容许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

public CyclicBarrier(int parties,
Runnable barrierAction)

建立一个新的 CyclicBarrier

parties:并行运行的任务数。

barrierAction:每当并行任务的任务调用的barrier的await方法的次数到达parties此的时候,那么barrierAction方法将会被执行一次,此时全部的并行任务处于等待状态,等待barrierAction执行完毕,全部卡在await方法的并行任务得以继续执行。

public int await()
throws InterruptedException,
BrokenBarrierException

在全部参与者都已经在此 barrier 上调用 await 方法以前,将一直等待。

 

赛马例子:

package thread;

 

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

/**

*

* @author zjf

* @create_time 2014-2-26

* @use赛马模拟小程序

*/

public class CyclicBarrierTest {

    public static void main(String[] args) {

        //赛马数量

        int horseCount = 10;

        //目标距离

        final int targetLine = 20;

        final List<Horse> horses = new ArrayList<Horse>();

        final ExecutorService es = Executors.newCachedThreadPool();

        //每一组马匹走完一步以后统计是否已经到达终点

        CyclicBarrier barrier = new CyclicBarrier(horseCount, new Runnable() {

            public void run() {

                System.out.println("");

                for (Horse horse : horses) {

                    if (horse.getComplete() >= targetLine) {

                        System.out.println(horse + " wone!");

                        //若是有到达终点的发送中断

                        es.shutdownNow();

                        break;

                    }

 

                }

            }

        });

 

        for (int i = 0; i < horseCount; i++) {

            Horse horse = new Horse(i, barrier);

            horses.add(horse);

            es.execute(horse);

        }

    }

}

 

class Horse implements Runnable {

    private int id;

    private CyclicBarrier barrier;

    private int complete = 0;

    private Random random = new Random();

 

    public synchronized int getComplete() {

        return complete;

    }

 

    public synchronized void oneStep() {

        //模拟一步的距离

        complete += random.nextInt(3);

        System.out.print(this + " : " + complete + "--");

    }

 

    public Horse(int i, CyclicBarrier barrier) {

        this.id = i;

        this.barrier = barrier;

    }

 

    public void run() {

        try {

            while (!Thread.interrupted()) {

                oneStep();

                //每执行一步以后等待并行的都执行完await以后才能继续执行

                barrier.await();

                TimeUnit.MILLISECONDS.sleep(300);

            }

        } catch (InterruptedException e) {

 

        } catch (BrokenBarrierException e) {

 

        }

    }

 

    @Override

    public String toString() {

        return "horse" + id;

    }

}

 

20140226 周三

java编程思想 第21章 并发

PriorityBlockingQueue

一个无界的阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,而且提供了阻塞检索的操做。虽然此队列逻辑上是无界的,可是因为资源被耗尽,因此试图执行添加操做可能会失败(致使 OutOfMemoryError)。此类不容许使用 null 元素。

 

例子:

package thread;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.PriorityBlockingQueue;

import java.util.concurrent.TimeUnit;

 

public class PriorityBlockingQueueTest {

 

    /**

     * @author zjf

     * @create_time 2014-2-26

     * @use

     * @param args

     * @throws InterruptedException

     */

    public static void main(String[] args) throws InterruptedException {

        //Task类实现了Comparable接口,按照排序来决定优先级。

        final PriorityBlockingQueue<Task> taskQueue = new PriorityBlockingQueue<Task>();

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(new Runnable() {

            public void run() {

                try {

                    while(!Thread.interrupted())

                    {

                        Task task = taskQueue.take();

                        System.out.println(task);

                        //每隔200秒取出一个

                        TimeUnit.MILLISECONDS.sleep(200);

                    }

                } catch (InterruptedException e) {

                    

                }

            }

        });

        for (final TASKLEVLE taskLevel : TASKLEVLE.values()) {

            es.execute(new Runnable() {

                public void run() {

                    try {

                        for (int i = 0; i < 5; i++) {

                            //5个任务每隔300秒放入一个

                            taskQueue.put(new Task(taskLevel, i));

                            TimeUnit.MILLISECONDS.sleep(300);

                        }

                    } catch (Exception e) {

 

                    }

                }

            });

        }

        

        TimeUnit.SECONDS.sleep(10);

        es.shutdownNow();

    }

 

}

/*

* Task要实现Comparable接口

*/

class Task implements Comparable<Task> {

    private TASKLEVLE taskLevel = TASKLEVLE.MIDDLE;

 

    private final long id;

 

    public Task(TASKLEVLE taskLevel, long id) {

        super();

        this.taskLevel = taskLevel;

        this.id = id;

    }

 

    public int compareTo(Task o) {

        return o.taskLevel.compareTo(taskLevel);

    }

 

    @Override

    public String toString() {

        return "task-" + id + "level-" + taskLevel;

    }

 

}

 

enum TASKLEVLE {

    LOW, MIDDLE, HIGH, SUPER

};

并发

优先权
  • 线程的"优先权"priority)能告诉调度程序其重要性如何。尽管处理器处理现有线程集的顺序是不肯定的,可是若是有许多线程被阻塞并在等待运行,那么调度程序将倾向于让优先权最高的线程先执行。然而,这并非意味着优先权较低的线程将得不到执行(也就是说,优先权不会致使死锁)。优先级较低的线程仅仅是执行的频率较低。
  • 对于已存在的线程,你能够用getPriority( )方法获得其优先权,也能够在任什么时候候使用setPriority( )方法更改其优先权
  • 尽管JDK10个优先级别,但它与多数操做系统都不能映射得很好。好比,Windows 20007个优先级且不是固定的,因此这种映射关系也是不肯定的(尽管SunSolaris231个优先级)。惟一可移植的策略是当你调整优先级的时候,只使用MAX_PRIORITYNORM_PRIORITY,和MIN_PRIORITY三种级别。
来自JDK API

线程 是程序中的执行线程。Java 虚拟机容许应用程序并发地运行多个执行线程。

每一个线程都有一个优先级,高优先级线程的执行优先于低优先级线程。每一个线程均可以或不能够标记为一个守护程序。当某个线程中运行的代码建立一个新 Thread 对象时,该新线程的初始优先级被设定为建立线程的优先级,而且当且仅当建立线程是守护线程时,新线程才是守护程序。

当 Java 虚拟机启动时,一般都会有单个非守护线程(它一般会调用某个指定类的 main 方法)。Java 虚拟机会继续执行线程,直到下列任一状况出现时为止: 调用了 Runtime 类的 exit 方法,而且安全管理器容许退出操做发生。

非守护线程的全部线程都已中止运行,不管是经过从对 run 方法的调用中返回,仍是经过抛出一个传播到 run 方法以外的异常。

守护线程

public final void setDaemon(boolean on)

将该线程标记为守护线程或用户线程。当正在运行的线程都是守护线程时,Java 虚拟机退出。

该方法必须在启动线程前调用。

Thread和Runnable
  • 你的类也许已经继承了其它的类,在这种状况下,就不可能同时继承ThreadJava并不支持多重继承)。这时,你可使用"实现Runnable接口"的方法做为替代。
  • Thread也是从Runnable接口实现而来的。
  • Runnable类型的类只需一个run( )方法,可是若是你想要对这个Thread对象作点别的事情(好比在toString( )里调用getName( )),那么你就必须经过调用Thread.currentThread( )方法明确获得对此线程的引用。
相关文章
相关标签/搜索