Java线程间通讯与信号量

1. 信号量Semaphore

先说说Semaphore,Semaphore能够控制某个资源可被同时访问的个数,经过 acquire() 获取一个许可,若是没有就等待,而 release() 释放一个许可。通常用于控制并发线程数,及线程间互斥。另外重入锁 ReentrantLock 也能够实现该功能,但实现上要复杂些。
功能就相似厕全部5个坑,假若有10我的要上厕所,那么同时只能有多少我的去上厕所呢?同时只能有5我的可以占用,当5我的中 的任何一我的让开后,其中等待的另外5我的中又有一我的能够占用了。另外等待的5我的中能够是随机得到优先机会,也能够是按照先来后到的顺序得到机会。
单个信号量的Semaphore对象能够实现互斥锁的功能,而且能够是由一个线程得到了“锁”,再由另外一个线程释放“锁”,这可应用于死锁恢复的一些场合。java

例子:

/**
 * @Description:
 * @param @param args
 * @return void 返回类型
 */
public static void main(String[] args) {
    // 线程池
    ExecutorService exec = Executors.newCachedThreadPool();
    // 只能5个线程同时访问
    final Semaphore semp = new Semaphore(5);
    // 模拟20个客户端访问
    for (int index = 0; index < 20; index++) {
        final int NO = index;
        Runnable run = new Runnable() {
            public void run() {
                try {
                    // 获取许可
                    semp.acquire();
                    System.out.println("得到Accessing: " + NO);
                    Thread.sleep((long) (Math.random() * 10000));
                    // 访问完后,释放
                    semp.release();
                    System.out.println("剩余可用信号-----------------"
                            + semp.availablePermits());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        exec.execute(run);
    }
    // 退出线程池
    exec.shutdown();
}

输出结果(能够想一想为何会这样输出):

得到Accessing: 1
得到Accessing: 5
得到Accessing: 2
得到Accessing: 3
得到Accessing: 0
剩余可用信号-----------------1
得到Accessing: 4
剩余可用信号-----------------1
得到Accessing: 9
剩余可用信号-----------------1
得到Accessing: 8
剩余可用信号-----------------1
得到Accessing: 6
剩余可用信号-----------------1
得到Accessing: 10
剩余可用信号-----------------1
得到Accessing: 11
剩余可用信号-----------------1
得到Accessing: 12
剩余可用信号-----------------1
得到Accessing: 13
剩余可用信号-----------------1
得到Accessing: 7
剩余可用信号-----------------1
得到Accessing: 15
剩余可用信号-----------------1
得到Accessing: 16
剩余可用信号-----------------1
得到Accessing: 17
剩余可用信号-----------------1
得到Accessing: 14
剩余可用信号-----------------1
得到Accessing: 18
剩余可用信号-----------------1
得到Accessing: 19
剩余可用信号-----------------1
剩余可用信号-----------------2
剩余可用信号-----------------3
剩余可用信号-----------------4
剩余可用信号-----------------5

2. 使用PIPE做为线程间通讯桥梁

Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。一进一出。先做为初步了解怎么使用。
值得注意的是该类在java.nio.channels下,说明该类属于nio方式的数据通讯方式,那就使用Buffer来缓冲数据。并发

Pipe原理的图示:
Pipe原理图app

  • Pipe就是个空管子,这个空管子一头能够从管子里往外读,一头能够往管子里写
  • 操做流程:dom

    • 1.首先要有一个对象往这个空管子里面写。写到哪里呢?这个空管子是有一点空间的,就在这个管子里。

写的时候就是写到管子自己包含的这段空间里的。这段空间大小是1024个字节。函数

  • 2.而后另外一个对象才能将这个装满了的管子里的内容读出来。

上代码

package com.jx.test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class testPipe {

    /**
     * @Description:
     * @param @param args
     * @return void 返回类型
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        // 建立一个管道
        Pipe pipe = Pipe.open();
        final Pipe.SinkChannel psic = pipe.sink();// 要向管道写数据,须要访问sink通道
        final Pipe.SourceChannel psoc = pipe.source();// 从读取管道的数据,须要访问source通道

        Thread tPwriter = new Thread() {

            public void run() {
                try {
                    System.out.println("send.....");
                    // 建立一个线程,利用管道的写入口Pipe.SinkChannel类型的psic往管道里写入指定ByteBuffer的内容
                    int res = psic.write(ByteBuffer
                            .wrap("Hello,Pipe!测试通信.....".getBytes("utf-16BE")));
                    System.out.println("send size:" + res);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread tPreader = new Thread() {
            public void run() {
                int bbufferSize = 1024 * 2;
                ByteBuffer bbuffer = ByteBuffer.allocate(bbufferSize);
                try {
                    System.out.println("recive.....");
                    // 建立一个线程,利用管道的读入口Pipe.SourceChannel类型的psoc将管道里内容读到指定的ByteBuffer中                    
                    int res = psoc.read(bbuffer);//数据未
                     System.out.println("recive size:"+res+" Content:" + ByteBufferToString(bbuffer));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        tPwriter.start();
        tPreader.start();
    }

    /**
     *ByteBuffer--> String的转换函数
     */
    public static String ByteBufferToString(ByteBuffer content) {
        if (content == null || content.limit() <= 0
                || (content.limit() == content.remaining())) {
            System.out.println("不存在或内容为空!");
            return null;
        }
        int contentSize = content.limit() - content.remaining();
        StringBuffer resultStr = new StringBuffer();
        for (int i = 0; i < contentSize; i += 2) {
            resultStr.append(content.getChar(i));
        }
        return resultStr.toString();
    }

}
相关文章
相关标签/搜索