本文介绍操做系统I/O工做原理,Java I/O设计,基本使用,开源项目中实现高性能I/O常见方法和实现,完全搞懂高性能I/O之道html
在介绍I/O原理以前,先重温几个基础概念:java
操做系统:管理计算机硬件与软件资源的系统软件
内核:操做系统的核心软件,负责管理系统的进程、内存、设备驱动程序、文件和网络系统等等,为应用程序提供对计算机硬件的安全访问服务git
为了不用户进程直接操做内核,保证内核安全,操做系统将内存寻址空间划分为两部分:
内核空间(Kernel-space),供内核程序使用
用户空间(User-space),供用户进程使用
为了安全,内核空间和用户空间是隔离的,即便用户的程序崩溃了,内核也不受影响github
计算机中的数据是基于随着时间变换高低电压信号传输的,这些数据信号接二连三,有着固定的传输方向,相似水管中水的流动,所以抽象数据流(I/O流)的概念:指一组有顺序的、有起点和终点的字节集合,apache
抽象出数据流的做用:实现程序逻辑与底层硬件解耦,经过引入数据流做为程序与硬件设备之间的抽象层,面向通用的数据流输入输出接口编程,而不是具体硬件特性,程序和底层硬件能够独立灵活替换和扩展编程
典型I/O读写磁盘工做原理以下:数组
tips: DMA:全称叫直接内存存取(Direct Memory Access),是一种容许外围设备(硬件子系统)直接访问系统主内存的机制。基于 DMA 访问方式,系统主内存与硬件设备的数据传输能够省去CPU 的全程调度缓存
值得注意的是:安全
这里先以最经典的阻塞式I/O模型介绍:性能优化
tips:recvfrom,经socket接收数据的函数
值得注意的是:
Java中对数据流进行具体化和实现,关于Java数据流通常关注如下几个点:
(1) 流的方向
从外部到程序,称为输入流;从程序到外部,称为输出流
(2) 流的数据单位
程序以字节做为最小读写数据单元,称为字节流,以字符做为最小读写数据单元,称为字符流
(3) 流的功能角色
从/向一个特定的IO设备(如磁盘,网络)或者存储对象(如内存数组)读/写数据的流,称为节点流;
对一个已有流进行链接和封装,经过封装后的流来实现数据的读/写功能,称为处理流(或称为过滤流);
java.io包下有一堆I/O操做类,初学时看了容易搞不懂,其实仔细观察其中仍是有规律:
这些I/O操做类都是在继承4个基本抽象流的基础上,要么是节点流,要么是处理流
java.io包中包含了流式I/O所须要的全部类,java.io包中有四个基本抽象流,分别处理字节流和字符流:
节点流I/O类名由节点流类型 + 抽象流类型组成,常见节点类型有:
节点流的建立一般是在构造函数传入数据源,例如:
FileReader reader = new FileReader(new File("file.txt")); FileWriter writer = new FileWriter(new File("file.txt"));
处理流I/O类名由对已有流封装的功能 + 抽象流类型组成,常见功能有:
处理流的应用了适配器/装饰模式,转换/扩展已有流,处理流的建立一般是在构造函数传入已有的节点流或处理流:
FileOutputStream fileOutputStream = new FileOutputStream("file.txt"); // 扩展提供缓冲写 BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream); // 扩展提供提供基本数据类型写 DataOutputStream out = new DataOutputStream(bufferedOutputStream);
Java NIO(New I/O)是一个能够替代标准Java I/O API的IO API(从Java 1.4开始),Java NIO提供了与标准I/O不一样的I/O工做方式,目的是为了解决标准 I/O存在的如下问题:
标准I/O处理,完成一次完整的数据读写,至少须要从底层硬件读到内核空间,再读到用户文件,又从用户空间写入内核空间,再写入底层硬件
此外,底层经过write、read等函数进行I/O系统调用时,须要传入数据所在缓冲区起始地址和长度
因为JVM GC的存在,致使对象在堆中的位置每每会发生移动,移动后传入系统函数的地址参数就不是真正的缓冲区地址了
可能致使读写出错,为了解决上面的问题,使用标准I/O进行系统调用时,还会额外致使一次数据拷贝:把数据从JVM的堆内拷贝到堆外的连续空间内存(堆外内存)
因此总共经历6次数据拷贝,执行效率较低
传统的网络I/O处理中,因为请求创建链接(connect),读取网络I/O数据(read),发送数据(send)等操做是线程阻塞的
// 等待链接 Socket socket = serverSocket.accept(); // 链接已创建,读取请求消息 StringBuilder req = new StringBuilder(); byte[] recvByteBuf = new byte[1024]; int len; while ((len = socket.getInputStream().read(recvByteBuf)) != -1) { req.append(new String(recvByteBuf, 0, len, StandardCharsets.UTF_8)); } // 写入返回消息 socket.getOutputStream().write(("server response msg".getBytes())); socket.shutdownOutput();
以上面服务端程序为例,当请求链接已创建,读取请求消息,服务端调用read方法时,客户端数据可能还没就绪(例如客户端数据还在写入中或者传输中),线程须要在read方法阻塞等待直到数据就绪
为了实现服务端并发响应,每一个链接须要独立的线程单独处理,当并发请求量大时为了维护链接,内存、线程切换开销过大
Java NIO核心三大核心组件是Buffer(缓冲区)、Channel(通道)、Selector
Buffer提供了经常使用于I/O操做的字节缓冲区,常见的缓存区有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型: byte, char, double, float, int, long, short,下面介绍主要以最经常使用的ByteBuffer为例,Buffer底层基于Java堆外内存
堆外内存是指与堆内存相对应的,把内存对象分配在JVM堆之外的内存,这些内存直接受操做系统管理(而不是虚拟机,相比堆内内存,I/O操做中使用堆外内存的优点在于:
ByteBuffer底层的分配和释放基于malloc和free函数,对外allocateDirect方法能够申请分配堆外内存,并返回继承ByteBuffer类的DirectByteBuffer对象:
public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); }
堆外内存的回收基于DirectByteBuffer的成员变量Cleaner类,提供clean方法能够用于主动回收,Netty中大部分堆外内存经过记录定位Cleaner的存在,主动调用clean方法来回收;
另外,当DirectByteBuffer对象被GC时,关联的堆外内存也会被回收
tips: JVM参数不建议设置-XX:+DisableExplicitGC,由于部分依赖Java NIO的框架(例如Netty)在内存异常耗尽时,会主动调用System.gc(),触发Full GC,回收DirectByteBuffer对象,做为回收堆外内存的最后保障机制,设置该参数以后会致使在该状况下堆外内存得不到清理
堆外内存基于基础ByteBuffer类的DirectByteBuffer类成员变量:Cleaner对象,这个Cleaner对象会在合适的时候执行unsafe.freeMemory(address),从而回收这块堆外内存
Buffer能够见到理解为一组基本数据类型,存储地址连续的的数组,支持读写操做,对应读模式和写模式,经过几个变量来保存这个数据的当前位置状态:capacity、 position、 limit:
Channel(通道)的概念能够类比I/O流对象,NIO中I/O操做主要基于Channel:
从Channel进行数据读取 :建立一个缓冲区,而后请求Channel读取数据
从Channel进行数据写入 :建立一个缓冲区,填充数据,请求Channel写入数据
Channel和流很是类似,主要有如下几点区别:
Java NIO中最重要的几个Channel的实现:
基于标准I/O中,咱们第一步可能要像下面这样获取输入流,按字节把磁盘上的数据读取到程序中,再进行下一步操做,而在NIO编程中,须要先获取Channel,再进行读写
FileInputStream fileInputStream = new FileInputStream("test.txt"); FileChannel channel = fileInputStream.channel();
tips: FileChannel仅能运行在阻塞模式下,文件异步处理的 I/O 是在JDK 1.7 才被加入的 java.nio.channels.AsynchronousFileChannel
// server socket channel: ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 9091)); while (true) { SocketChannel socketChannel = serverSocketChannel.accept(); ByteBuffer buffer = ByteBuffer.allocateDirect(1024); int readBytes = socketChannel.read(buffer); if (readBytes > 0) { // 从写数据到buffer翻转为从buffer读数据 buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body = new String(bytes, StandardCharsets.UTF_8); System.out.println("server 收到:" + body); } }
Selector(选择器) ,它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。实现单线程管理多个Channel,也就是能够管理多个网络链接
Selector核心在于基于操做系统提供的I/O复用功能,单个线程能够同时监视多个链接描述符,一旦某个链接就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做,常见有select、poll、epoll等不一样实现
Java NIO Selector基本工做原理以下:
示例以下,完整可运行代码已经上传github(https://github.com/caison/caison-blog-demo):
Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(9091)); // 配置通道为非阻塞模式 serverSocketChannel.configureBlocking(false); // 注册服务端的socket-accept事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // selector.select()会一直阻塞,直到有channel相关操做就绪 selector.select(); // SelectionKey关联的channel都有就绪事件 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 服务端socket-accept if (key.isAcceptable()) { // 获取客户端链接的channel SocketChannel clientSocketChannel = serverSocketChannel.accept(); // 设置为非阻塞模式 clientSocketChannel.configureBlocking(false); // 注册监听该客户端channel可读事件,并为channel关联新分配的buffer clientSocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024)); } // channel可读 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); int bytesRead; StringBuilder reqMsg = new StringBuilder(); while ((bytesRead = socketChannel.read(buf)) > 0) { // 从buf写模式切换为读模式 buf.flip(); int bufRemain = buf.remaining(); byte[] bytes = new byte[bufRemain]; buf.get(bytes, 0, bytesRead); // 这里当数据包大于byteBuffer长度,有可能有粘包/拆包问题 reqMsg.append(new String(bytes, StandardCharsets.UTF_8)); buf.clear(); } System.out.println("服务端收到报文:" + reqMsg.toString()); if (bytesRead == -1) { byte[] bytes = "[这是服务回的报文的报文]".getBytes(StandardCharsets.UTF_8); int length; for (int offset = 0; offset < bytes.length; offset += length) { length = Math.min(buf.capacity(), bytes.length - offset); buf.clear(); buf.put(bytes, offset, length); buf.flip(); socketChannel.write(buf); } socketChannel.close(); } } // Selector不会本身从已selectedKeys中移除SelectionKey实例 // 必须在处理完通道时本身移除 下次该channel变成就绪时,Selector会再次将其放入selectedKeys中 keyIterator.remove(); } }
tips: Java NIO基于Selector实现高性能网络I/O这块使用起来比较繁琐,使用不友好,通常业界使用基于Java NIO进行封装优化,扩展丰富功能的Netty框架来优雅实现
下面结合业界热门开源项目介绍高性能I/O的优化
零拷贝(zero copy)技术,用于在数据读写中减小甚至彻底避免没必要要的CPU拷贝,减小内存带宽的占用,提升执行效率,零拷贝有几种不一样的实现原理,下面介绍常见开源项目中零拷贝实现
Kafka基于Linux 2.1内核提供,并在2.4 内核改进的的sendfile函数 + 硬件提供的DMA Gather Copy实现零拷贝,将文件经过socket传送
函数经过一次系统调用完成了文件的传送,减小了原来read/write方式的模式切换。同时减小了数据的copy, sendfile的详细过程以下:
基本流程以下:
相比传统的I/O方式,sendfile + DMA Gather Copy方式实现的零拷贝,数据拷贝次数从4次降为2次,系统调用从2次降为1次,用户进程上下文切换次数从4次变成2次DMA Copy,大大提升处理效率
Kafka底层基于java.nio包下的FileChannel的transferTo:
public abstract long transferTo(long position, long count, WritableByteChannel target)
transferTo将FileChannel关联的文件发送到指定channel,当Comsumer消费数据,Kafka Server基于FileChannel将文件中的消息数据发送到SocketChannel
RocketMQ基于mmap + write的方式实现零拷贝:
mmap() 能够将内核中缓冲区的地址与用户空间的缓冲区进行映射,实现数据共享,省去了将数据从内核缓冲区拷贝到用户缓冲区
tmp_buf = mmap(file, len); write(socket, tmp_buf, len);
mmap + write 实现零拷贝的基本流程以下:
RocketMQ中消息基于mmap实现存储和加载的逻辑写在org.apache.rocketmq.store.MappedFile中,内部实现基于nio提供的java.nio.MappedByteBuffer,基于FileChannel的map方法获得mmap的缓冲区:
// 初始化 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
查询CommitLog的消息时,基于mappedByteBuffer偏移量pos,数据大小size查询:
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { int readPosition = getReadPosition(); // ...各类安全校验 // 返回mappedByteBuffer视图 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); }
tips: transientStorePoolEnable机制
Java NIO mmap的部份内存并非常驻内存,能够被置换到交换内存(虚拟内存),RocketMQ为了提升消息发送的性能,引入了内存锁定机制,即将最近须要操做的CommitLog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable
所以,MappedFile数据保存CommitLog刷盘有2种方式:
RocketMQ 基于 mmap+write 实现零拷贝,适用于业务级消息这种小块文件的数据持久化和传输
Kafka 基于 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输
tips: Kafka 的索引文件使用的是 mmap+write 方式,数据文件发送网络使用的是 sendfile 方式
Netty 的零拷贝分为两种:
Netty中对Java NIO功能封装优化以后,实现I/O多路复用代码优雅了不少:
// 建立mainReactor NioEventLoopGroup boosGroup = new NioEventLoopGroup(); // 建立工做线程组 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap // 组装NioEventLoopGroup .group(boosGroup, workerGroup) // 设置channel类型为NIO类型 .channel(NioServerSocketChannel.class) // 设置链接配置参数 .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) // 配置入站、出站事件handler .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { // 配置入站、出站事件channel ch.pipeline().addLast(...); ch.pipeline().addLast(...); } }); // 绑定端口 int port = 8080; serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { System.out.println(new Date() + ": 端口[" + port + "]绑定成功!"); } else { System.err.println("端口[" + port + "]绑定失败!"); } });
页缓存(PageCache)是操做系统对文件的缓存,用来减小对磁盘的 I/O 操做,以页为单位的,内容就是磁盘上的物理块,页缓存能帮助程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要缘由就是因为OS使用PageCache机制对读写访问操做进行了性能优化:
页缓存读取策略:当进程发起一个读操做 (好比,进程发起一个 read() 系统调用),它首先会检查须要的数据是否在页缓存中:
页缓存写策略:当进程发起write系统调用写数据到文件中,先写到页缓存,而后方法返回。此时数据尚未真正的保存到文件中去,Linux 仅仅将页缓存中的这一页数据标记为“脏”,而且被加入到脏页链表中
而后,由flusher 回写线程周期性将脏页链表中的页写到磁盘,让磁盘中的数据和内存中保持一致,最后清理“脏”标识。在如下三种状况下,脏页会被写回磁盘:
RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,而且是顺序读取,在page cache机制的预读取做用下,Consume Queue文件的读性能几乎接近读内存,即便在有消息堆积状况下也不会影响性能,提供了2种消息刷盘策略:
Kafka实现消息高性能读写也利用了页缓存,这里再也不展开
《深刻理解Linux内核 —— Daniel P.Bovet》
RocketMQ 消息存储流程 —— Zhao Kun(赵坤)
更多精彩,欢迎关注公众号 分布式系统架构