Java Condition

根据工做经验,总结Java并发的使用 java

public class SingletonData {
    private static final int QUEUE_MAX_SIZE = 500;
    private static Integer[] blockingQueue = new Integer[QUEUE_MAX_SIZE];
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();
    private static int putptr = 0;  //写索引
    private static int takeptr = 0; //读索引
    final static Condition notFull = lock.newCondition(); //写线程锁
    final static Condition notEmpty = lock.newCondition(); //读线程锁

    private SingletonData() {
    }

    ;

    private static class SingletonHolder {
        /**
        * 静态初始化器,由JVM来保证线程安全
        */
        private static SingletonData queue = new SingletonData();
    }

    public static SingletonData getSingletonData() {
        return SingletonHolder.queue;
    }

    public void set(Integer data) {
        lock.lock();// 取到写锁
        try {
            System.out.println(Thread.currentThread().getName() + "准备写入数据");
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            blockingQueue[putptr] = data;
            if (++putptr == blockingQueue.length) putptr = 0;
            ++count;
            notEmpty.signal();

        } finally {
            lock.unlock();//解除锁定
        }
    }

    public Integer[] getBlockingQueue() {
        return blockingQueue;
    }

    public int get() {
        lock.lock(); //锁定
        try {
            // 若是队列空,则阻塞<读线程>
            while (count == 0) {
                notEmpty.await();
            }

            //读取队列,并更新读索引
            int x = blockingQueue[takeptr];
            if (++takeptr == blockingQueue.length) takeptr = 0;
            --count;

            // 唤醒<写线程>
            notFull.signal();
            return x;
        }catch (Exception e){
            return 0;
        } finally {
            lock.unlock();//解除锁定
        }
    }

}

 

public class CallableDemo {
    public static void main(String[] args){
        ExecutorService executorService = Executors.newFixedThreadPool(40);
        List<Future<String>> resultList = new ArrayList<Future<String>>();

        //建立10个任务并执行
        for (int i = 0; i < 500; i++){
            //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
            Future<String> future = executorService.submit(new TaskWithResult(i));
            //将任务执行结果存储到List中
            resultList.add(future);
        }

        //遍历任务的结果
        for (Future<String> fs : resultList){
            try{
                while(! fs.isDone());//Future返回若是没有完成,则一直循环等待,直到Future返回完成
                System.out.println(fs.get());     //打印各个线程(任务)执行的结果
            }catch(InterruptedException e){
                e.printStackTrace();
            }catch(ExecutionException e){
                e.printStackTrace();
            }finally{
                //启动一次顺序关闭,执行之前提交的任务,但不接受新任务
                executorService.shutdown();
            }
        }
        List<Integer> eles =  Arrays.asList( SingletonData.getSingletonData().getBlockingQueue());
        eles.sort((a,b)->{return( a > b? a : b); });

        for(Integer ele : eles){
            System.out.println(ele);
        }

        System.out.println(eles.size());
    }
}


class TaskWithResult implements Callable<String> {
    private int id;

    public TaskWithResult(int id){
        this.id = id;
    }

    /**
     * 任务的具体过程,一旦任务传给ExecutorService的submit方法,
     * 则该方法自动在一个线程上执行
     */
    @Override
    public String call() throws Exception {
        System.out.println("call()方法被自动调用!!!    " + Thread.currentThread().getName());
        //该返回结果将被Future的get方法获得
        SingletonData.getSingletonData().set(id);
        return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();
    }
}
相关文章
相关标签/搜索