避免java虚拟机指令重排序,保证共享数据修改同步,数据可见性。volatile相较于synchronized是一种比较轻量级地同步策略,但不具有互斥性,不能成为synchronized的替代,不能保证原子性。java
package com.wang.test.juc.test_volatile; public class TestVolatile { public static void main(String[] args) { TestThread testThread = new TestThread(); new Thread(testThread).start(); while (true)//在线程运行后,读取线程中的flag值 { if(testThread.isFlag()){ System.out.println(" get Flag success! "); break; } } } } class TestThread implements Runnable{ private boolean flag = false; public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public void run(){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } flag = true; System.out.println("flag:" + flag); } }
主线程中的读取操做看似在线程执行后,但并发地执行,在线程更改数据以前,主线程已经读取了数据,共享地数据flag不一样步。算法
public class TestVolatile { public static void main(String[] args) { TestThread testThread = new TestThread(); new Thread(testThread).start(); while (true)//在线程运行后,读取线程中的flag值 { if(testThread.isFlag()){ System.out.println(" get Flag success! "); break; } } } } class TestThread implements Runnable{ private volatile boolean flag = false; public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public void run(){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } flag = true; System.out.println("flag:" + flag); } }
在Flag属性上添加volatile主方法便可读取正确值。数据库
package com.wang.test.juc.test_volatile; public class TestAtomic { public static void main(String[] args) { Atomic atomic = new Atomic(); Thread t1 = new Thread(atomic); Thread t2 = new Thread(atomic); t1.start(); t2.start(); } } class Atomic implements Runnable{ private int i =0; public void run() { while(true){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+": "+getCountI()); } } public int getCountI() { return i++; } }
因为线程中操做i++不具有原子性,线程执行中,数据i的递增会出现重复问题,即i的值不会正常递增,会出现线程作出一样操做的状况。此时由于这个操做不是原子的,使用volitale修饰不能解决同步问题。
安全
Compare-And-Swap算法时硬件对于并发操做共享数据的支持,包含三个操做数,内存值V、预估值A、更新值B,只有 V == A 时,V = B执行,不然不进行任何操做。并发
import java.util.concurrent.atomic.AtomicInteger; public class TestAtomic { public static void main(String[] args) { Atomic atomic = new Atomic(); Thread t1 = new Thread(atomic); Thread t2 = new Thread(atomic); t1.start(); t2.start(); } } class Atomic implements Runnable{ private AtomicInteger i = new AtomicInteger(0); public void run() { while(true){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+": "+getCountI()); } } public int getCountI() { return i.addAndGet(1); } }
此时线程中i已是一个原子类型,那么在对数据进行操做的时候是具有原子性的,全部线程在执行i自增时具备源自性,解决了并发问题。框架
HashTable是线程安全的,在访问HashTable时会加上表锁,将操做转为串行,不容许有空值,效率比较低。dom
CuncurrentHashMap是线程安全的HashMap,采用锁分段机制,每一个数据段都是独立的锁,在访问时,能够并行执行,提升效率工具
ConcurrentSkipListMap:同步的TreeMapthis
CopyOnWriteArrayList:同步的ArrayList (读取和遍历大于更新)atom
Collections.synchronizedList(new ArrayList(String))
CountDownLatch为同步辅助类,在完成一组正在其余线程中执行的操做以前,容许一个或多个线程一直等待。
import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public static void main(String[] args) { final CountDownLatch countDownLatch = new CountDownLatch(2); //锁值为2 LatchLock latchLock = new LatchLock(countDownLatch); long start = System.currentTimeMillis(); Thread t1 = new Thread(latchLock); Thread t2 = new Thread(latchLock); t1.start(); t2.start(); try { countDownLatch.await();//锁不为0 主线程等待 } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("Time = [" + (end - start) + "mms"+"]"); } } class LatchLock implements Runnable{ private CountDownLatch countDownLatch; public LatchLock(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } public void run() { synchronized (this){ try { for(int i=0;i<1000;i++) { System.out.println(Thread.currentThread().getName()+": "+i); } }finally { countDownLatch.countDown();//线程执行一次锁减一 } } } }
## Callable接口
此接口相较于Runnable接口,能够返回值和抛异常。
public class TestCallable { public static void main(String[] args) { ThreadCallable testCallable = new ThreadCallable(); //执行Callable,须要使用FutureTask 实现类用于接收结果 FutureTask<Integer> futureTask = new FutureTask(testCallable); Thread thread = new Thread(futureTask); thread.start(); Integer result = 0; try { result = futureTask.get(); //此方法将在线程执行结束后才会执行 } catch (Exception e) { e.printStackTrace(); } System.out.println("result = [" + result + "]"); } } class ThreadCallable implements Callable<Integer>{ public Integer call() throws Exception { int sum = 0; for (int i=0;i<100;i++) { sum+=i; System.out.println("i: "+i); } return sum; } }
在解决同步问题时,采用synchronized关键字给代码块加锁或者给方法加锁,关键字加锁方式时隐式的,所的获取和释放由执行过程当中的线程自行完成,须要显式地完成加锁和锁释放时,可使用lock加锁方式。
package com.wang.test.juc.cchm; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) { TestThread testThread = new TestThread(); Thread t1 = new Thread(testThread); Thread t2 = new Thread(testThread); Thread t3 = new Thread(testThread); t1.start(); t2.start(); t3.start(); } } class TestThread implements Runnable{ private Lock lock = new ReentrantLock();//锁 private int count = 1000; public void run() { while (true){ // 自旋等待! lock.lock(); try { Thread.sleep(1); if (count > 0) System.out.println(Thread.currentThread().getName()+" count :"+ --count); } catch (InterruptedException e) { e.printStackTrace(); }finally { // finally释放锁! lock.unlock(); } } } }
public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); } } class Clerk{ private int pruduct = 0; public synchronized void income(){ if (pruduct >= 10){ System.out.println("Can not add more"); }else{ System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); } } public synchronized void sale(){ if (pruduct <= 0) {System.out.println("Can not sale anything!");} else { System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
生产者消费者都会一直进行,会出现没有产品继续消费和库存已满继续生产,即没有货物依旧被屡次消费,没法库存仍旧屡次生产。
public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); } } class Clerk{ private int pruduct = 0; public synchronized void income(){ if (pruduct >= 10){ System.out.println("Can not add more"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); this.notifyAll(); } } public synchronized void sale(){ if (pruduct <= 0) {System.out.println("Can not sale anything!"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); this.notifyAll(); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
等待唤醒,当发生满货或是销空时,进行等待。以上代码没法结束,最后一次地等待,没法被唤醒,由else引起,继续增长生产者和消费者,将会出现虚假唤醒,必须让它自旋等待。
package com.wang.test.juc.cchm; public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); new Thread(productor,"Producter2").start(); new Thread(consumer,"Customer2").start(); } } class Clerk{ private int pruduct = 0; public synchronized void income(){ while (pruduct >= 10){//自旋 System.out.println("Can not add more"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); this.notifyAll(); } public synchronized void sale(){ while (pruduct <= 0) {System.out.println("Can not sale anything!"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); this.notifyAll(); } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor,"Producter").start(); new Thread(consumer,"Customer").start(); new Thread(productor,"Producter2").start(); new Thread(consumer,"Customer2").start(); } } class Clerk{ private int pruduct = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //!!!! public void income(){ lock.lock(); try { while (pruduct >= 10){ System.out.println("Can not add more"); try { condition.await(); //!!!!! } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+": "+ ++pruduct); condition.signalAll(); //!!!! }finally { lock.unlock(); } } public void sale(){ lock.lock(); try { while (pruduct <= 0) {System.out.println("Can not sale anything!"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+" :"+ --pruduct); condition.signalAll(); }finally { lock.unlock(); } } } class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk = clerk; } public void run(){ for (int i=0;i<20;i++){ clerk.income(); } } } class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i=0;i<20;i++){ clerk.sale(); } } }
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestPrintInOrder { public static void main(String[] args) { final Alternate alternate = new Alternate(); new Thread(new Runnable() { public void run() { for (int i=0;i<20;i++){ alternate.PrintA(); } } }).start(); new Thread(new Runnable() { public void run() { for (int i=0;i<20;i++){ alternate.PrintB(); } } }).start(); new Thread(new Runnable() { public void run() { for (int i=0;i<20;i++){ alternate.PrintC(); } } }).start(); } } class Alternate{ private int mark = 1; private Lock lock = new ReentrantLock(); private Condition c1 =lock.newCondition(); private Condition c2 =lock.newCondition(); private Condition c3 =lock.newCondition(); public void PrintA(){ lock.lock(); try{ while (mark != 1){ c1.await(); } System.out.println(Thread.currentThread().getName()+": "+"A"); mark = 2; c2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void PrintB(){ lock.lock(); try{ while (mark != 2){ c2.await(); } System.out.println(Thread.currentThread().getName()+": "+"B"); mark = 3; c3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void PrintC(){ lock.lock(); try{ while (mark != 3){ c3.await(); } System.out.println(Thread.currentThread().getName()+": "+"C"); mark = 1; c1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
当数据在进行写入时,读取操做须要保持同步,即读写应当时互斥的,读取锁能够共享,写入锁独占。
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TestReadWriteLock { public static void main(String[] args) { final TestLockRW testLockRW = new TestLockRW(); new Thread(new Runnable() { public void run() { testLockRW.write((int)(Math.random()*1000)); } }).start(); for (int i=0;i<20;i++){ new Thread(new Runnable() { public void run() { testLockRW.read(); } }).start(); } } } class TestLockRW{ private int i = 0; private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void read(){ readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+": "+i); }finally { readWriteLock.readLock().unlock(); } } public void write(int random){ readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+": write "+random); i = random; }finally { readWriteLock.writeLock().unlock(); } } }
public class TestThread8Monitor { public static void main(String[] args) { Number number = new Number(); Number number2 = new Number(); new Thread(new Runnable() { public void run() { number.getOne(); } }).start(); new Thread(new Runnable() { public void run() { number2.getTwo(); } }).start(); // new Thread(new Runnable() { // public void run() { // number.getThree(); // } // }).start(); } } class Number{ public static synchronized void getOne(){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("One"); } public static synchronized void getTwo(){ System.out.println("Two"); } public void getThree(){ System.out.println("Three"); } }
类比数据库链接池,建立线程和销毁线程比较浪费资源,创建一个线程池,线程池提供一个线程队列,队列中保存着全部等待状态的线程,在须要使用线程时直接在线程池中获取,使用完毕后,归还给线程池,提升相应速度。
java.util.concurrent.Executor |--ExecutorService 线程池主要接口 |--ThreadPoolExecutor 线程池实现类 |--ScheduleExecutorService 线程调度接口 |--ScheduledThreadPoolExecutor 继承线程池实现调度接口 使用方法: 工具类:Executors Executors.ewCachedThreadPool() 数量不固定,动态更改数量 Executors.newFixedThreadPool(int) 固定容量 Executors.newSingleThreadExecutor() 单个线程线程池 返回值类型为ExecurotService ScheduledThreadPoolExecutor 线程调度 静态方法。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestThreadPool { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); ThreadPoolimp threadPoolimp = new ThreadPoolimp(); for ( int i = 0;i<1000;i++){ executorService.submit(threadPoolimp); //支持多种线程初始化参数 Runnable、Callable... } executorService.shutdown(); //new Thread(new ThreadPoolimp()).start(); } } class ThreadPoolimp implements Runnable{ private int i = 0; public void run() { while(true){ System.out.println(Thread.currentThread().getName()+" :"+ ++i); } } }
public class TestScheduledThreadPool { public static void main(String[] args) throws ExecutionException, InterruptedException { ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); Future<Integer> future = pool.schedule(new Callable<Integer>() { public Integer call() throws Exception { int i =1; System.out.println(Thread.currentThread().getName()); return i; } },5, TimeUnit.SECONDS);//延迟时间 时间单位 System.out.println(future.get()); pool.shutdown(); } }
在必要的状况下,将一个大人物,进行拆分,拆分红若干的小人物,再将一个个小任务的运算结果进行汇总。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class TestForkJoinPool { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); ForkSun forkSun = new ForkSun(0L, 1000000000L); Long sum = pool.invoke(forkSun); System.out.println("sum = [" + sum + "]"); } } class ForkSun extends RecursiveTask<Long>{ private static final long serialVersionUID = 7430797084800536110L; private long start; private long end; private static final long max = 10000l; public ForkSun(long start, long end) { this.start = start; this.end = end; } protected Long compute() { long len = end - start; if(len <= max){ long sum = 0L; for(long i = start;i<=end;i++) { sum+=i; } return sum; }else { long middle = (start + end)/2; ForkSun left = new ForkSun(start,middle); left.fork(); ForkSun right = new ForkSun(middle+1,end); right.fork(); return left.join() + right.join(); } } }