上一篇的前言我都忘了随便说两句了hhhjava
由于Kafka的源码阅读是须要对Java NIO知识有必定的了解的,因此怎么说,若是以为本身对于Java这块算是比较熟悉,一样做为插曲篇的这篇是能够直接忽略。由于这篇也不会涉及什么重难点,主要仍是过过基础,让后面的源码篇读起来更加通畅。api
Java New IO是从Java1.4版本开始引入的一个新的IO api,能够替代以往的标准IO,NIO相比原来的IO有一样的做用和目的,可是使用的方式彻底不同,NIO是面向缓冲区的,基于通道的IO操做,这也让它比传统IO有着更为高效的读写。数组
IO | NIO |
---|---|
面向流 | 面向缓冲区 |
阻塞IO | 非阻塞IO |
无 | 选择器 |
如下用图来简单理解一下,在传统IO中当App要对网络,磁盘中的文件进行读写的时候,它们必须创建一个链接,流究竟是一个什么样的概念呢,咱们能够先把它想象成自来水,家里要用自来水,须要有水管,让水从水管过来到家里,起到一个运输的做用。服务器
因此当咱们文件中的数据须要输入到App里面时,它们就会创建一个输入的管道。而当咱们的App有数据须要写入到文件系统的时候,就会创建一个输出的管道,这两条管道就是咱们的输入流和输出流。那水历来没有逆流而上的呀,因此它们都是单向管道。这么一讲,是否是就很好懂了呢😁?网络
也是一样的文件系统和App,不过此时把流换成了一个channel,如今咱们能够先认为它就是一条铁道,那咱们知道铁道自己是不能传递货物的呀,因此咱们须要一个载具---火车(也就是缓冲区),App须要的数据就由这个名叫缓冲区的载具运输过来。那火车是能够开过来,也能够开回去的,因此NIO是双向传输的。app
NIO的核心在于,通道(channel)和缓冲区(buffer)两个。通道是打开到IO设备的链接。使用时须要获取用于链接IO设备的通道以及用于容纳数据的缓冲区,而后经过操做缓冲区对数据进行处理。(其实就是上面那张图的事儿,或者一句话就是一个负责传输,一个负责存储)。dom
缓冲区是Java.nio包定义好的,全部缓冲区都是Buffer抽象类的子类。Buffer根据数据类型不一样,经常使用子类分别是基本数据类型除了Boolean外的xxxBuffer(IntBuffer,DoubleBuffer···等)。不一样的Buffer类它们的管理方式都是相同的,获取对象的方法都是性能
// 建立一个容量为capacity的xxx类型的Buffer对象
static xxxBuffer allocate(int capacity)
复制代码
并且缓冲区提供了两个核心方法:get()和put(),put方法是将数据存入到缓冲区,而get是获取缓冲区的数据。测试
此时咱们用代码看一下大数据
public class BufferTest {
@Test
public void testBuffer(){
// 建立缓冲区对象
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
}
}
复制代码
点进去ByteBuffer,会看到这个东西是继承了Buffer类的
public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>
复制代码
此时继续点进去Buffer类,第一眼看到的是有几个自带的属性
表示Buffer的最大数据容量,这个值不能为负。并且建立后是不能更改的。
第一个不能读取或写入的数据的索引,位于此索引后的数据不可读写。这个数值不能为负且不能超过capacity,如上图中第三个缓冲区,在下标为5以后的数据块均不能读写,那limit为5
下一个要读取或写入的数据的索引,这个数值不能为负且不能超过capacity,如图中第二个缓冲区,前面5块写完成,此时第6个数据块的下标为5,因此position为5
mark是一个索引,经过Buffer的mark()方法指定Buffer中一个特定的position后,能够经过reset()方法重置到这个position,这个经过代码来解释会比较好说明
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
运行结果:0,10,10
复制代码
String str = "abcde";
byteBuffer.put(str.getBytes());
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
运行结果:5,10,10
"abcde"长度为5,position已经变化,其它不变
复制代码
byteBuffer.flip();
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());
运行结果:0,10,5
复制代码
此时position变成为0了,由于一开始的5,是由于这时候要写的是下标为5的数据块,而转换成读模式后,第一个读的明显是下标为0的数据块呀。limit的数值也变成了5,由于当前能读到的数据从下标为5开始就木有了,因此limit为5
byte[] array = new byte[byteBuffer.limit()];
byteBuffer.get(array);
System.out.println(new String(array,0,array.length));
运行结果:abcde
复制代码
byte[] array = new byte[byteBuffer.limit()];
byteBuffer.get(array,0,2);
System.out.println(new String(array,0,2));
System.out.println(byteBuffer.position());
byteBuffer.mark();
byteBuffer.get(array,2,2);
System.out.println(new String(array,2,2));
System.out.println(byteBuffer.position());
byteBuffer.reset();
System.out.println(byteBuffer.position());
运行结果:ab,2,cd,4,2
复制代码
其实很简单,就是第一次读取的时候,只是读取了前面两个字符,而后此时position的结果为2,而后再读取后两个,position为4,但是由于我在读取前面2个的时候进行了一个mark操做,它就自动回到我mark以前的那个读取位置而已,就是这么简单
rewind()方法,可重复读,clear()清空缓冲区,不过这个方法的清空缓冲区,是一种被遗忘的状态,就是说,数据仍然还存于缓冲区中,但是自动忽略掉了。此时再次读取数据,是仍是能够get()到的。hasRemaining()方法就是表示剩余可操做的数据量还有多少,好比刚刚的mark的那个例子中,我reset回去以后,剩余的可操做数据就是3,由于我只读了ab,还有cde这三个。
非直接缓冲区:经过allocate()方法来分配缓冲区。将缓冲区创建在JVM的内存中。
直接缓冲区:经过allocateDirect()方法分配缓冲区,将缓冲区创建在物理内存中。效率更高。
应用程序想要在磁盘中读取数据时,首先它发起请求,让物理磁盘先把它的数据读到内核地址空间当中,以后这个内核空间再将这个数据copy一份到用户地址空间去。而后数据才能经过read()方法将数据返回个应用程序。而应用程序须要写数据进去,也是同理,先写到用户地址空间,而后copy到内核地址空间,再写入磁盘。此时不难发现,这个copy的操做显得十分的多余,因此非直接缓冲区的效率相对来讲会低一些。
直接缓冲区就真的顾名思义很是直接了,写入的时候,写到物理内存映射文件中,再由它写入物理磁盘,读取也是磁盘把数据读到这个文件而后再由它读取到应用程序中便可。没有了copy的中间过程。
由java.nio.channels包定义,表示IO源与目标打开的连接,它自己不存在直接访问数据的能力,只能和Buffer进行交互
传统的IO由cpu来全权负责,此时这个设计在有大量文件读取操做时,CPU的利用率会被拉的很是低,由于IO操做把CPU的资源都抢占了。
在这种背景下进行了一些优化,把对cpu的链接取消,转为DMA(直接内存存取)的方式。固然DMA这个操做自己也是须要CPU进行调度的。不过这个损耗天然就会比大量的IO要小的多。
此时,就出现了通道这个概念,它是一个彻底独立的处理器。专门用来负责文件的IO操做。
Java为Channel接口提供的主要实现类:
FileChannel:用于读取,写入,映射和操做文件的通道
DatagramChannel:经过UDP读写网络中的数据通道
SocketChannel:经过TCP读写网络中的数据通道
ServerSocketChannel:能够监听新进来的TCP链接,对每个新进来的链接
都会建立一个SocketChannel
复制代码
获取channel的一种方式是对支持通道的对象调用getChannel()方法,支持类以下
FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
复制代码
获取的其余方式是使用Files类的静态方法newByteChannel()获取字节通道。再或者是经过通道的静态方法open()打开并返回指定通道。
// 建立输入输出流对象
FileInputStream fileInputStream = new FileInputStream("testPic.jpg");
FileOutputStream fileOutputStream = new FileOutputStream("testPic2.jpg");
// 经过流对象获取通道channel
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = fileOutputStream.getChannel();
// 建立指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 将通道中的数据写入到缓冲区中
while (inChannel.read(byteBuffer) != -1){
// 切换成读取模式
byteBuffer.flip();
// 将缓冲区中的数据写到输出通道
outChannel.write(byteBuffer);
// 清空缓冲区
byteBuffer.clear();
}
//回收资源(这里为了省时间直接抛出去了,反正这段不过重要)
outChannel.close();
inChannel.close();
fileInputStream.close();
fileOutputStream.close();
运行结果:就天然是复制了一个testPic2出来啦
复制代码
由于代码自己不难,注释已经写得比较详细,就不展开了
FileChannel inChannel = FileChannel.open(Paths.get("testPic.jpg",StandardOpenOption.READ));
FileChannel outChannel = FileChannel.
open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 进行内存映射
MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMapBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
// 对缓冲区进行数据的读写操做
byte[] array = new byte[inMappedBuffer.limit()];
inMappedBuffer.get(array);
outMapBuffer.put(array);
// 回收资源
inChannel.close();
outChannel.close();
复制代码
若是须要看一下它们两个的时间差,本身用最常规的系统时间来瞧瞧就好,在这里就再也不加上了。
传统的IO流都是阻塞式的,当一个线程调用read或者write时,该线程被阻塞,直到数据被读取或者写入,该线程在此期间都是不能执行其余任务的,所以,在完成网络通讯进行IO操做时,线程被阻塞,因此服务器端必须为每一个客户端提供一个独立线程进行处理,当服务器端须要处理大量客户端时,性能将会急剧降低。
NIO是非阻塞的,当线程从某通道进行读写数据时,若没有数据可用,该线程能够进行其余任务。线程一般将非阻塞IO的空闲时间用于在其余通道上执行IO操做,因此单独的线程能够管理多个输入和输出通道。所以NIO可让服务器端使用一个或有限几个线程来同时处理链接到服务器端的全部客户端。
这个选择器其实就是在客户端和服务端之间引入一个通道的注册器,好比如今个人客户端要像服务端传输数据了,客户端会给选择器去发送一个channel的注册请求,注册完成后,Selector就会去监控这个channel的IO状态(读写,链接)。只有当通道中的数据彻底准备就绪,Selector才会将数据分配到服务端的某个线程去处理。
这种非阻塞性的流程就能够更好地去使用CPU的资源。提升CPU的工做效率。这个能够用收快递来讲明。若是你一开始就告诉我半小时后过来取快递,而我在这时候已经到目的地了,我有可能就原地不动站着等半个小时。这个期间啥地都去不了,但是你是到了以后,才打电话告诉我过来取,那我就有了更多的自由时间。
如今咱们来演示一下阻塞性IO的网络通讯
这个代码你们能够尝试这删除sChannel.shutdownOutput(),此时会发如今启动好server,运行client程序的时候,程序也会阻塞,这是由于这时服务端并没有法肯定你是否已经发送完成数据了,因此client端也产生了阻塞,双方就一直僵持。
还有一种方法是解阻塞,以后进行阐述。
// 1.获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("你的IP地址",9898));
// 2.建立文件通道
FileChannel inChannel = FileChannel.open(Paths.get("C:/Users/Administrator/Desktop/testPic.jpg"),StandardOpenOption.READ);
// 3.分配指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 4.发送数据,须要读取文件
while (inChannel.read(byteBuffer) != -1){
byteBuffer.flip();
// 将buffer的数据写入到通道中
sChannel.write(byteBuffer);
byteBuffer.clear();
}
// 主动告诉服务端,数据已经发送完毕
sChannel.shutdownOutput();
while (sChannel.read(byteBuffer) != -1){
byteBuffer.flip();
System.out.println("接收服务端数据成功···");
byteBuffer.clear();
}
// 5.关闭通道
inChannel.close();
sChannel.close();
复制代码
// 1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 建立一个输出通道,将读取到的数据写入到输出通道中,保存为testPic2
FileChannel outChannel = FileChannel.open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
// 2.绑定端口
ssChannel.bind(new InetSocketAddress(9898));
// 3.等待客户端链接,链接成功时会获得一个通道
SocketChannel sChannel = ssChannel.accept();
// 4.建立缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 5.接收客户端的数据存储到本地
while (sChannel.read(byteBuffer) != -1){
byteBuffer.flip();
outChannel.write(byteBuffer);
byteBuffer.clear();
}
// 发送反馈给客户端
// 向缓冲区中写入应答信息
byteBuffer.put("服务端接收数据成功".getBytes());
byteBuffer.flip();
sChannel.write(byteBuffer);
// 关闭通道
sChannel.close();
outChannel.close();
byteBuffer.clear();
复制代码
而后再当咱们的客户端运行起来,就会进行copy操做
使用NIO完成网络通讯须要三个核心对象:
channel:java.nio.channels.Channel接口,SocketChannel,ServerSocketChannel,DatagramChannel
管道相关:Pipe.SinkChannel,Pine.SourceChannel
buffer:负责存储数据
Selector:其中Selector是SelectableChannel的多路复用器,主要是用于监控SelectableChannel的IO状态
// 1.获取通道,默认是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("192.168.80.1",9898));
// 1.1 将阻塞的套接字变成非阻塞
sChannel.configureBlocking(false);
// 2.建立指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 3.发送数据给服务端,直接将数据存储到缓冲区
byteBuffer.put(new Date().toString().getBytes());
// 4.将缓冲区的数据写入到sChannel
byteBuffer.flip();
sChannel.write(byteBuffer);
byteBuffer.clear();
// 关闭
sChannel.close();
复制代码
代码的注释中已经解释了整个过程的作法,这里就不一一展开了。
// 1.获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2.将阻塞的套接字设置为非阻塞的
ssChannel.configureBlocking(false);
// 3.绑定端口号
ssChannel.bind(new InetSocketAddress(9898));
// 4.建立选择器对象
Selector selector = Selector.open();
// 5.将通道注册到选择器上(这里的第二个参数为selectionKey),下面有解释
// 此时选择器就开始监听这个通道的接收时间,此时接收工做准备就绪,才开始下一步的操做
ssChannel.register(selector,SelectionKey.OP_ACCEPT);
// 6.经过轮询的方式获取选择器上准备就绪的事件
// 若是大于0,至少有一个SelectionKey准备就绪
while (selector.select() > 0){
// 7.获取当前选择器中全部注册的selectionKey(已经准备就绪的监听事件)
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
// 迭代获取已经准备就绪的选择键
while (selectionKeyIterator.hasNext()){
// 8.获取已经准备就绪的事件
SelectionKey selectionKey = selectionKeyIterator.next();
if (selectionKey.isAcceptable()){
// 9.调用accept方法
SocketChannel sChannel = ssChannel.accept();
// 将sChannel设置为非阻塞
// 再次强调,整个过程不能有任何一条阻塞通道
sChannel.configureBlocking(false);
// 进行数据接收工做,并且把sChannel也注册上选择器让选择器来监听
sChannel.register(selector,SelectionKey.OP_READ);
}else if (selectionKey.isReadable()){
// 若是读状态已经准备就绪,就开始读取数据
// 10.获取当前选择器上读状态准备就绪的通道
SocketChannel sChannel = (SocketChannel) selectionKey.channel();
// 11.读取客户端发送的数据,须要先建立缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 12.读取缓冲区的数据
while (sChannel.read(byteBuffer) > 0){
byteBuffer.flip();
// 这里sChannel.read(byteBuffer)就是这个字节数组的长度
System.out.println(new String(byteBuffer.array(),0,sChannel.read(byteBuffer)));
// 清空缓冲区
byteBuffer.clear();
}
}
// 当selectionKey使用完毕须要移除,不然会一直优先
selectionKeyIterator.remove();
}
}
复制代码
当调用register方法将通道注册到选择器时,选择器对通道的监听事件须要经过第二个参数ops决定
读:SelectionKey.OP_READ(1)
写:SelectionKey.OP_WRITE(4)
链接:SelectionKey.OP_CONNECT(8)
接收:SelectionKey.OP_ACCEPT(16)
复制代码
若注册时不只仅只有一个监听事件,则须要用位或操做符链接
int selectionKeySet = SelectionKey.OP_READ|SelectionKey.OP_WRITE
复制代码
而关于这个selectionKey,它表示着SelectableChannel和Selectr之间的注册关系。它也有一系列对应的方法
引入Scanner接收输入信息,不过请注意,在测试代码中输入IDEA须要进行一些设置,具体作法是在Help-Edit Custom VM Option中加入一行
-Deditable.java.test.console=true
复制代码
这样就能够输入了。
// 1.获取通道,默认是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("192.168.80.1",9898));
// 1.1 将阻塞的套接字变成非阻塞
sChannel.configureBlocking(false);
// 2.建立指定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String str = scanner.next();
// 3.发送数据给服务端,直接将数据存储到缓冲区
byteBuffer.put((new Date().toString()+str).getBytes());
// 4.将缓冲区的数据写入到sChannel
byteBuffer.flip();
sChannel.write(byteBuffer);
byteBuffer.clear();
}
// 关闭
sChannel.close();
复制代码
这样就完成了一个问答模式的网络通讯。
Java NIO中的管道是两个线程之间的单向数据链接,Pipe有一个source管道和一个sink管道,数据会被写到sink,从source中获取
// 1.获取管道
Pipe pipe = Pipe.open();
// 2.建立缓冲区对象
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 3.获取sink通道
Pipe.SinkChannel sinkChannel = pipe.sink();
byteBuffer.put("经过单向管道传输数据".getBytes());
// 4.将数据写入sinkChannel
byteBuffer.flip();
sinkChannel.write(byteBuffer);
// 5.读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
// 6.读取sourceChannel中的数据放入到缓冲区
byteBuffer.flip();
sourceChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(),0,sourceChannel.read(byteBuffer)));
sourceChannel.close();
sinkChannel.close();
运行结果就是打印了咱们的那串字符"经过单向管道传输数据",没啥
复制代码
大体地把NIO的一些基础知识给列举了一下,内容看似不少其实并无涉及太难的知识点,都是循序渐进地执行而已。其实若是要深抠的话,仍是有不少其余的知识点的,好比NIO2的Path,Paths和Files。这里就再也不列举说明了。感兴趣的朋友能够自行去了解一下。