操做系统中的经典问题——生产者消费者问题(两种方式实现)

操做系统中的经典问题——生产者消费者问题(两种方式实现)

一、问题引入:什么是生产者消费者问题?

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要做用是生成必定量的数据放到缓冲区中,而后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
.java

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。一样,也可让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据以后,再唤醒消费者。一般采用进程间通讯的方法解决该问题。若是解决方法不够完善,则容易出现死锁的状况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒本身。该问题也能被推广到多个生产者和消费者的情形。
多线程

二、问题分析

该问题须要注意的几点:

  1. 在缓冲区为空时,消费者不能再进行消费
  2. 在缓冲区为满时,生产者不能再进行生产
  3. 在一个线程进行生产或消费时,其他线程不能再进行生产或消费等操做,即保持线程间的同步
  4. 注意条件变量与互斥锁的顺序

因为前两点缘由,所以须要保持线程间的同步,即一个线程消费(或生产)完,其余线程才能进行竞争CPU,得到消费(或生产)的机会。对于这一点,可使用条件变量进行线程间的同步:生产者线程在product以前,须要wait直至获取本身所需的信号量以后,才会进行product的操做;一样,对于消费者线程,在consume以前须要wait直到没有线程在访问共享区(缓冲区),再进行consume的操做,以后再解锁并唤醒其余可用阻塞线程。
性能

在访问共享区资源时,为避免多个线程同时访问资源形成混乱,须要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。测试

三、第一类:使用synchronized关键字来进行实现

3.一、生产者、消费者问题,归根到底也都是线程间的通讯问题。一个处于活动状态,一个处于等待唤醒状态。

3.二、这里设共享的资源为一个int数,调用方法进行加1、减一的操做模拟操做系统对共享资源的分配。

3.三、实现对操做的资源进行共享,主要有三步,判断等待、业务、通知,所以能够将资源类的代码实现以下:(代码若是看不懂,详情请看注释,注释详尽)

/**资源类**/
class Data {
    private int number = 0;

    /**
     * 判断等待、业务、通知
     */

    //+1
    public synchronized void increment() throws InterruptedException {
        if (number != 0) {
            // 等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其余线程,我+1完毕了
        this.notifyAll();
    }
    //-1
    public synchronized void decrement() throws InterruptedException {
        if (number == 0) {
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其余线程,我-1完毕了
        this.notifyAll();
    }
}

3.四、编写两个线程,进行测试,看是否能有序的进行资源的共享

package com.xgp.pc;

/**
 * 线程之间的通讯问题:生产者和消费者问题! 等待唤醒 通知
 * 线程交替问题 A  B 操做同一个变量  num = 0
 * A num+1
 * B num-1
 * @author 薛国鹏
 */
@SuppressWarnings("all")
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"B").start();
    }
}

运行结果:

A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0

进程完成,退出码 0

3.五、有运行结果能够看出,当系统中只有两个线程时,可以有序的进行进行对临界资源的共享。此时,将两个线程改成4个线程(两个增长操做,两个减小操做)再进行测试

package com.xgp.pc;

/**
 * 线程之间的通讯问题:生产者和消费者问题! 等待唤醒 通知
 * 线程交替问题 A  B 操做同一个变量  num = 0
 * A num+1
 * B num-1
 * @author 薛国鹏
 */
@SuppressWarnings("all")
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"B").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"C").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"D").start();
    }
}

运行结果为:

A=>1
B=>0
A=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
D=>0
C=>1
D=>0

进程完成,退出码 0

由运行的结果能够看出,当有四个线程时,此时的资源分配共享出现了问题。

3.六、分析:

假如:当一个减法操做结束时,此时会通知唤醒其余三个线程我减一完毕了,此时共享资源的值为0,另外一个减一线程判断的number=0,天然会进行等待,改线程确定不是问题所在。两个加一线程,判断的结果是number=1,因而都被唤醒了,当第一个加一线程完成加一操做后,第二个加一线程随即跟随其后完成加一操做。问题的关键就是,当第一个加一线程完成加一操做后,第二个线程由于以前的判断被唤醒了,并且后面并无其余判断使他沉睡了,所以在值发生改变后,仍然进行了加一操做。因此,咱们得限制各个进行随时随刻得检测共享资源得变化,在发生变化时当即判断是否唤醒仍是等待,所以代码中不该该使用if判断,而应该使用while进行循环判断,虽然,这在必定程度上会占用必定得系统性能。改进后的代码以下:

/**资源类**/
class Data {
    private int number = 0;

    /**
     * 判断等待、业务、通知
     */

    //+1
    public synchronized void increment() throws InterruptedException {
        while (number != 0) {
            // 等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其余线程,我+1完毕了
        this.notifyAll();
    }
    //-1
    public synchronized void decrement() throws InterruptedException {
        while (number == 0) {
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其余线程,我-1完毕了
        this.notifyAll();
    }
}

此时,开启四个线程进行测试的结果正常

A=>1
B=>0
A=>1
B=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
D=>0
C=>1
D=>0

进程完成,退出码 0

四、使用java的JUC来实现

package com.xgp.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@SuppressWarnings("all")
public class B {
    public static void main(String[] args) {
        Data2 data = new Data2();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"B").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"C").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"D").start();
    }
}

/**资源类**/
class Data2 {
    private int number = 0;

    /**
     * 判断等待、业务、通知
     */
    Lock lock = new ReentrantLock();
    //锁监视器(取代了对象监视器的使用)
    Condition condition = lock.newCondition();

    //+1
    public void increment() throws InterruptedException {
        lock.lock();
        try {
            while (number != 0) {
                // 等待
                condition.await();      //等待
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其余线程,我+1完毕了
            condition.signalAll();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //-1
    public void decrement() throws InterruptedException {
        lock.lock();

        try {
            while (number == 0) {
                //等待
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其余线程,我-1完毕了
            condition.signalAll();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }
}

运行结果:

A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0

进程完成,退出码 0

经过该方式也一样能够实现资源的共享,而且此方式还有许多优势,能过将控制权更大程度的交给代码的编写者。至关于汽车中的手动挡,虽然缺乏自动,可是会的话开的更快。

五、使用java的JUC来实现点对点唤醒

package com.xgp.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("all")
public class C {
    public static void main(String[] args) {

        Data3 data = new Data3();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.printA();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.printB();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        },"B").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.printC();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        },"C").start();
    }
}


class Data3 {

    private int number = 1;

    private Lock lock = new ReentrantLock();
    //锁监视器(取代了对象监视器的使用)
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void printA() {
        lock.lock();

        try {
            while (number != 1) {
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName());
            //唤醒指定的B
            number = 2;
            condition2.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printB() {
        lock.lock();

        try {
            while (number != 2) {
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName());
            //唤醒指定的B
            number = 3;
            condition3.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printC() {
        lock.lock();

        try {
            while (number != 3) {
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName());
            //唤醒指定的B
            number = 1;
            condition1.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

运行结果:

A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C

进程完成,退出码 0
相关文章
相关标签/搜索