并发编程之:并发控制

1.内存模型

在Java中每一个线程有一块工做内存区,其中存放着被全部线程共享主内存中数据值的拷贝。当线程执行时,它在本身内存中操做这些变量。java

使用(use)、赋值(assign)、锁定(lock)和解锁(unlock)操做都是线程执行引擎和线程工做内存的原子操做。但主内存和线程工做内存的数据交换并不知足原子性。多线程

当数据从主内存复制到线程工做内存时有两个动做:第一由主内存执行的读(read)操做,第二由工做内存执行load操做。并发

当数据从工做内存拷贝到主内存时也有两个动做:第一由工做内存执行的存储(store)操做,第二由主内存执行的写操做。ide

各个操做含义函数

use:把一个变量在线程工做内存中的拷贝内容传送给线程执行引擎。高并发

assign:把一个变量从线程执行引擎传送到变量的线程工做内存。性能

read:把一个变量的主内存拷贝的内容传输到线程工做内存,以便load操做使用。测试

load:把read操做从主内存中获得的值放入到线程的工做内存中。优化

store:把一个变量的线程工做内存拷贝内容传送到主内存中,以便write操做使用。ui

write:把store操做从线程工做内存中获得的值放入到主内存的变量拷贝中。

lock:使线程得到一个独占锁。

unlock:释放一个线程的独占锁。

double和long类型变量的非原子处理:若是一个double或者long变量没有声明为volatile,则变量在进行read或write操做时,主内存把他当作两个32位的read或write操做进行处理,着两个操做在时间上是分开的,可能会有其余操做穿插其中。若是这种状况方法,则两个并发的线程对共享的非volatile类型的double或long变量赋不一样的值,那么随后对该变量的使用而获取的值可能不能等于任何一个线程所赋的值。所以在32位系统中,必须对double或long进行同步。

2.volatile变量使用

能够作以下保证:

(1)其余线程对变量的修改,能够及时反应到当前线程中。

(2)确保当前线程对volatile变量的修改,能及时写回共享主内存中,并被其余线程所见。

(3)使用volatile声明的变量,编译器会保证其有序性。

注意:使用volatile标识的变量,将迫使全部线程均读写主内存中对应的变量,从而使得volatile变量在多线程中可见。

public class VolatileTest extends Thread{
        private volatile boolean isStop = false;
        public void stopMe(){
            isStop = true;
        }

        public void run(){
            int i = 0;
            while (!isStop){
                i++;
            }
            System.out.println("stop thread !");
        }

    public static void main(String[] args) throws InterruptedException {
        VolatileTest m = new VolatileTest();
        m.start();
        System.out.println("启动线程");
        Thread.sleep(2000L);
        m.stopMe();
        System.out.println("关闭线程");
        Thread.sleep(2000L);
    }
}

单例模式使用volatile

public class Singleton {
    public static volatile Singleton singleton;

    /**
     * 构造函数私有,禁止外部实例化
     */
    private Singleton() {};

    public static Singleton getInstance() {
        if (singleton == null) {
            synchronized (singleton) {
                if (singleton == null) {
                    singleton = new Singleton();
                }
            }
        }
        return singleton;
    }
}

单例模式使用volatile的必要性,要理解这个问题首先要了解实例化一个对象步骤:

(1)分配内存空间

(2)初始化对象

(3)内存空间地址赋值给对应的引用。

可是因为操做系统能够对指令进行重排序,因此上面的过程也可能会变成以下过程:

(1)分配内存空间。

(2)将内存空间的地址赋值给对应的引用。

(3)初始化对象

若是是这个流程,多线程环境下就可能将一个未初始化的对象引用暴露出来,从而致使不可预料的结果。所以,为了防止这个过程的重排序,咱们须要将变量设置为volatile类型的变量。

3.同步关键字synchronized

在早期版本中synchronized性能并不算太好,可是随着JVM虚拟机不停的改进并优化synchronized,在JDK6中,synchronized和非公平锁的差距已经缩小,且从长远来看synchronized的性能还将会作进一步优化。而且synchronized的使用也比其余同步方式相比更为的简洁明了。

只是使用synchronized还不足以控制复杂逻辑的线程交换,还要配合Object对象的wait和notify方法。

wait方法可让当前线程进入等待状态,在wait过程当中线程会释放对象锁。当调用该对象的notify方法时,在该对象上面等待的线程会从新得到对象锁继续往下执行。当有多个线程在该对象上等待时,notify会随机选取其中一个。

下面就是一个阻塞队列的例子:

//阻塞队列来进行测试
class BlockQueue{

    private List list = new ArrayList();

    public synchronized Object pop() throws InterruptedException{
        while (list.size() == 0){ //若是队列为空,则等待
            System.out.println(Thread.currentThread().getName()+":进来等待!");
            wait();
            System.out.println(Thread.currentThread().getName()+":等待结束!");
        }
        if(list.size()>0){
            System.out.println(Thread.currentThread().getName()+":取值成功!");
            return list.remove(0);
        }else{
            return null;
        }

    }

    public synchronized void put(Object o){
        list.add(o);
        System.out.println("放入数据!");
        notify();//通知等待的线程能够取数据
    }

}
class PopWorker implements Callable<Object>{

    BlockQueue queue;

    PopWorker(BlockQueue queue){
        this.queue = queue;
    }

    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName()+":开始取值!");
        return queue.pop();
    }
}

开多个线程去一个阻塞队列中取值

public static void main(String[] args) throws InterruptedException {
    BlockQueue queue = new BlockQueue();
    //开多个线程去取值
    PopWorker worker1 = new PopWorker(queue);
    PopWorker worker2 = new PopWorker(queue);
    PopWorker worker3 = new PopWorker(queue);
    ExecutorService service = Executors.newCachedThreadPool();
    service.submit(worker1);
    service.submit(worker2);
    service.submit(worker3);

    //保证三个线程都进入wait状态
    Thread.sleep(2000);
    queue.put("1");
}

执行结果:只有线程“pool-1-thread-1”被唤醒取到值,其余两个线程都继续等待状态。

pool-1-thread-1:开始取值!
pool-1-thread-2:开始取值!
pool-1-thread-3:开始取值!
pool-1-thread-1:进来等待!
pool-1-thread-3:进来等待!
pool-1-thread-2:进来等待!
放入数据!
pool-1-thread-1:等待结束!
pool-1-thread-1:取值成功!

4. 重入锁ReentrantLock

JDK5中在高并发状况下比synchronized有明显的性能优点,在JDK6中因为JVM的优化,性能相差不大。

ReentrantLock提供了公平锁和非公平锁两种方式。公平锁能够保证在锁的等待队列中各线程是公平的,所以不会出现插队状况,对锁的获取老是先进先出,而非公平锁不作这个保证。

公平锁的实现代价比非公平锁大,所以在性能上分析,非公平锁性能要好的多。能够经过ReentrantLock的构造方法指定生产公平锁仍是非公平锁:

public ReentrantLock(boolean fair)

主要方法:

lock.lock();//得到锁,若是已经被占用则等待(在等待中不能被中断)
lock.tryLock();//尝试得到锁,若是成功返回TRUE,不等待
lock.lockInterruptibly();//得到锁,但优先响应中断(在等待中能够被中断)
lock.unlock();//释放锁

例如:

class Worker implements Runnable{
    ReentrantLock lock;
    String name;
    public Worker(String name,ReentrantLock lock){
        this.name = name;
        this.lock = lock;
    }
    @Override
    public void run() {
       try {
           System.out.println(name+":准备获取锁");
           lock.lock();
           System.out.println(name+":获取锁");
           Thread.sleep(10000);
       }catch (Exception e){
           e.printStackTrace();
       }finally {
           System.out.println(name+":释放锁");
           lock.unlock();
       }
    }
}

main方法

ReentrantLock lock = new ReentrantLock();

Worker w1 = new Worker("w1",lock);
Worker w2 = new Worker("w2",lock);
new Thread(w1).start();
new Thread(w2).start();

5. ReadWriteLock读写锁

ReadWriteLock是jdk5里面提供的读写分离锁。读写锁容许多个线程同时对资源进行读操做,写写操做或读写操做则依然须要互相等待。(这里读锁存在的意义就是在资源进行写锁控制时不容许读,读时不容许写)。

下面的例子

先分别开两个写锁:证实两个写锁是互斥的。

再开三个读锁:证实在写锁没有释放时读锁不能得到,写锁释放时三个线程能够同时获取读锁。

1)定义读写资源

class Handler{
    private  Lock readLock ;
    private  Lock writeLock ;
    private Map<String,String> map = new HashMap<>();

    public Handler(ReentrantReadWriteLock readWriteLock){
        readLock = readWriteLock.readLock();
        writeLock = readWriteLock.writeLock();
    }

    public String read(){
        try{
            System.out.println(Thread.currentThread().getName()+":开始获取读锁");
            readLock.lock();
            System.out.println(Thread.currentThread().getName()+":获取读锁");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return map.get("java");
        }finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName()+":释放读锁");
        }
    }

    public void write(String value){
        try {
            System.out.println(Thread.currentThread().getName()+":开始获取写锁");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName()+":获取写锁");
            map.put("java",value);
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName()+":释放写锁");
        }
    }
}

2)分别定义读和写操做

class Reader implements Runnable{

    private Handler handler;

    public Reader(Handler handler){
        this.handler = handler;
    }

    @Override
    public void run() {
        handler.read();
    }
}

class Writer implements Runnable
{
    private Handler handler;

    public Writer(Handler handler){
        this.handler = handler;
    }
    @Override
    public void run() {
        handler.write("hello world !");
    }
}

主方法调用

public static void main(String[] args) throws InterruptedException {
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Handler handler = new Handler(readWriteLock);

    Writer writer1 = new Writer(handler);
    Writer writer2 = new Writer(handler);

    Reader reader1 = new Reader(handler);
    Reader reader2 = new Reader(handler);
    Reader reader3 = new Reader(handler);

    ExecutorService service = Executors.newCachedThreadPool();
    service.execute(writer1);
    service.execute(writer2);
    Thread.sleep(20000) ;//先开两个写进程分别占用写锁30秒,20秒以后开三个读锁
    service.execute(reader1);
    service.execute(reader2);
    service.execute(reader3);
    service.shutdown();
}

后面有先用读占用锁,写线程去获取写锁,发现读锁被占用时写锁也是不能得到的,因此读写锁是互斥的。读的时候不能获取写,写的时候不能获取读。

6. Condition对象

线程间的协做光有锁是不够的,Condition能够用于协做线程间复杂的操做。Condition对象是与lock绑定的因此就有Lock的公平性特性:若是是公平锁,线程为按照FIFO的顺序从Condition.await中释放,若是是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。

Condition就至关是在使用synchronized时咱们须要使用Object的wait/notify/notifyAll这样的方法来控制更为复杂的业务逻辑。

Condition方法:

await*( ) 线程释放锁进入等待状态。

singal*( )线程被唤醒获取锁继续执行。

这里咱们用一个例子来讲明这个问题,用一个线程不停往里面放入数据,另外一个线程不停从里面取数据,当队列满时放数据线程释放锁进入等待状态同时唤醒等待中取数据线程进行取数据;同理当队列中数据为空时取数据线程释放锁进入等待状态,同时唤醒放数据线程进行放数据操做。

//构造一个阻塞队列
class MyBlockList{

    private String[] items ;
    private volatile int count;
    private Lock lock = new ReentrantLock();

    private Condition canPut = lock.newCondition();
    private Condition canTake = lock.newCondition();

    public MyBlockList(int len){
        items = new String[len];//初始化时肯定队列长度
        count = 0;//元素数量
    }

    public String take(){
        try {
            lock.lockInterruptibly();
            if(count == 0){//若是没有元素进行等待
                System.out.println("队列为空不能拿了");
                canTake.await();
                System.out.println("能够放被唤醒");
            }
            count--;
            String result = items[count];
            System.out.println("取走一个剩:"+count);
            Thread.sleep(1000);
            canPut.signal();//拿走了一个,发出能够放信号
            return result;
        } catch (InterruptedException e) {
            canTake.signal();
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return "";
    }

    public void put(String value){
        try {
            lock.lockInterruptibly();
            if(count == items.length){//若是已经满了则不能放只能等收到能够放信号
                System.out.println("队列满不能放了");
                canPut.await();
                System.out.println("能够取被唤醒");
            }
            count++;
            items[count-1] = value;
            System.out.println("放入一个:当前有"+count);
            Thread.sleep(1000);
            canTake.signal();//从新放了一个,发出取信号
        } catch (InterruptedException e) {
            e.printStackTrace();
            canPut.signal();
        } finally {
            lock.unlock();
        }
    }
}

在main方法中开两个线程进行存取数据操做

public static void main(String[] args) {
   final MyBlockList queue = new MyBlockList(5);

    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true){
                queue.take();
            }

        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {

            while (true){
                queue.put("hello world !");
            }
        }
    }).start();

}

7. Semaphore信号量

信号量对锁的概念进行了扩展,它能够限定对某一资源的访问最大线程数。

举个例子,就好比说有一个资源只能给5个客户端访问,这样咱们就能够构造一个准入数为5的信号量,当一个客户端进入访问资源就标记一个信号量,当信号量都被标记完了则说明资源访问达到最大值,当有客户端结束访问,信号量就释放一个,其余排队等待的客户端就能够准许进入访问资源。

public static void main(String[] args) {
   final Semaphore semaphore = new Semaphore(5);

    ExecutorService service = Executors.newCachedThreadPool();

    for (int i=0;i<=20;i++){//总共有20个线程要访问,只放入5个

        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+"线程进入");
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"线程获取信号量");
                    Thread.sleep(5000);
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName()+"线程释放信号量");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        service.execute(run);
    }

    //执行完关闭线程池
    service.shutdown();
}

8. ThreadLock线程局部变量

ThreadLock为每一个线程提供变量的独立副本。

从起set方法能够看到,每一个线程都有一个单独的map用来存储该线程放入的数据

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

9. 锁性能和优化

1. 多线程会形成额外的开销因此多线程的使用并非线程越多越好。

2. 避免死锁

    形成死锁必需要知足如下四个条件:互斥条件、请求与保持条件、不剥夺条件、循环等待条件。

3. 减少锁持有时间

4. 减少锁粒度(ConcurrentHashMap)

5. 读写分离锁来替换独占锁(ReadWriteLock)

6. 锁分离(LinkedBlockQueue)

7. 重入锁(ReentrantLock)和内部锁(synchronized)

8. 锁粗化:过于频繁对锁的请求、同步、释放会使系统资源开销大大增长反而会是系统性能降低

    频繁对锁请求:

for(int i=0;i<100;i++){
    synchronized (lock){
        //do sth
    }
}

    锁粗化以后:

synchronized (lock){
    for (int i=0;i<100;i++){
        //do sth
    }
}

9. 自旋锁

    由于线程的挂起、恢复是须要较多系统资源的,若是这段时间开销比锁等待所须要的时间开销要大不少,所以在这种状况下咱们有可能情愿进行锁等待也不肯意去把线程挂起,而后在得到锁时再恢复。

    所以JVM引入了自选锁,自旋锁可使线程在没有取的锁时,不被挂起,而转而去执行空循环(即所谓的自旋)若干个空循环后线程得到锁,则继续执行,若是没有得到锁才会被挂起。这样使得线程被挂起的概率相对减小。(对锁竞争不激烈,锁占用时间比较短的并发操做有积极做用)。

    JVM虚拟机提供-XX:+UseSprinning参数来开启自选锁,使用-XX:PreBlockSpin参数设置自旋锁的等待次数。

10. 消除锁

    JVM在编译时进行逃逸分析(对上下文扫描)能够消除那些没必要要的锁。好比在方法内部使用StringBuffer进行字符串操做(该操做就不会产生线程间锁竞争)。

    能够经过JVM参数配置,是否进行逃逸分析和锁消除

    开启:-server -XX:-DoEscapeAnalysis -XX:-EliminateLocks

    关闭:-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks

11. 锁偏向

     若是程序没有竞争,则取消以前已经取得锁同步操做。当某一锁被线程获取以后,便进入偏向模式,当线程再次请求这个锁时,无需再进行相关的同步操做。

     JVM中能够经过配置参数开启或关闭锁偏向

     开启:+XX:UseBiasedLocking

     关闭:-XX:UseBiasedLocking

相关文章
相关标签/搜索