一、Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 能够确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty至关简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。java
二、目前netty有3个版本netty三、netty四、netty5。3个版本的内容有所不一样。neety3是核心的代码介绍。相对于netty四、和netty5的复杂性来讲。netty3的源码是值得学习的。我这里解析了netty3的一些源码,仅供你们理解,也是为了方便你们理解作了不少简化。不表明做者的开发思路。编程
三、咱们先来看一张图(这张图是我在学习源码的时候扣的,哈哈)数组
1、传统NIO流安全
1)一个线程里面,存在一个selector,固然这个selector也承担起看大门和服务客人的工做。服务器
2)这里无论多少客户端进来,都是这个selector来处理。这样就就加大了这个服务员的工做量网络
3)为了加入线程池,让多个selector同时工做,当时目的性都是同样的。并发
4)虽然看大门的和服务客人的都是服务员,可是仍是存在差异的。为了更好的处理多个线程的问题。因此这里netty就诞生了。app
2、netty框架框架
理解:异步
1)netty3的框架也是基于nio流作出来的。因此这里会详细介绍netty3框架的思路
2)将看门的服务员和服务客人的服务员分开。造成两块(也就是2个线程池,也就是后面的boss和worker)
3)当一个客人来的时候,首先boss,进行接待。而后boss分配工做给worker,这个,在两个线程池的工做下,有条不乱。
4)原理:就是将看大门的selector和服务客人的selector分开。而后经过boss线程池,下发任务给对应的worker
四、netty3源码分析
1)加入对应的jar包。我这里为了了解源码用的是netty3的包。
<dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>3.10.6.Final</version> </dependency>
2)目录结构
说明:
a、NettyBoss、NettyWork是针对于selector作区分。虽然他们不少共性,我这里为了好理解,并无作抽象类(忽略开发思路)。
b、ThreadHandle是用来初始化线程池和对应的接口。
c、Start为启动类
3)NettyBoss(看大门的服务员,第一种线程selector)
package com.troy.application.netty; import java.io.IOException; import java.nio.channels.*; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; public class NettyBoss { //线程池 public final Executor executor; //boss选择器 protected Selector selector; //原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其余线程的执行 protected final AtomicBoolean wakenUp = new AtomicBoolean(); //队列,线程安全队列。 public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); //线程处理,这里主要是拿到work的线程池 protected ThreadHandle threadHandle; //初始化 public NettyBoss(Executor executor,ThreadHandle threadHandle) { //赋值 this.executor = executor; this.threadHandle = threadHandle; try { //每个线程选择器 this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } //从线程中获取一个线程执行如下内容 executor.execute(() -> { while (true) { try { //这里的目前就是排除其余线程同事执行,false由于这里处于阻塞状态,不用开启 wakenUp.set(false); //选择器阻塞 selector.select(); //运行队列中的任务 while (true) { final Runnable task = taskQueue.poll(); if (task == null) { break; } //若是任务存在开始运行 task.run(); } //对进来的进行处理 this.process(selector); } catch (Exception e) { e.printStackTrace(); } } }); } public void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 新客户端 SocketChannel channel = server.accept(); // 设置为非阻塞 channel.configureBlocking(false); // 获取一个worker NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() % threadHandle.workeres.length)]; // 注册新客户端接入任务 Runnable runnable = () -> { try { //将客户端注册到selector中 channel.register(nextworker.selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } }; //添加到work的队列中 nextworker.taskQueue.add(runnable); if (nextworker.selector != null) { //这里的目前就是开启执行过程 if (nextworker.wakenUp.compareAndSet(false, true)) { //放开本次阻塞,进行下一步执行 nextworker.selector.wakeup(); } } else { //任务完成移除线程 taskQueue.remove(runnable); } System.out.println("新客户端连接"); } } }
解释:
a、初始化的时候,赋值线程池,和线程处理类(线程处理类目的是获取worker的工做线程)
b、executor为线程池的执行过程。
c、selector.select()为造成阻塞,wakenUp为了线程安全考核。在接入客户端的时候用selector.wakeup()来放开本次阻塞(很重要)。
d、而后在worker安全队列中执行对应工做。(taskQueue的目前在boss和worker中的做用都是为了考虑线程安全,这里采用线程安全队列的目的是为了避免直接操做其余线程)
e、wakenUp.compareAndSet(false, true),这里是考虑并发问题。在本线程运行的时候,其余线程处于等待状态。这里也是为了线程安全考虑。
4)NettyWork(服务客人的服务员,第二种selector)
package com.troy.application.netty; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; public class NettyWork { //线程池 public final Executor executor; //boss选择器 protected Selector selector; //原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其余线程的执行 protected final AtomicBoolean wakenUp = new AtomicBoolean(); //队列,线程安全队列。 public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); //初始化 public NettyWork(Executor executor) { this.executor = executor; try { //每个work也须要一个选择器用来管理通道 this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } //从线程池中获取一个线程开始执行 executor.execute(() -> { while (true) { try { //阻塞状态排除问题 wakenUp.set(false); //阻塞 selector.select(); //处理work任务 while (true) { final Runnable task = taskQueue.poll(); if (task == null) { break; } //存在work任务开始执行 task.run(); } //处理任务 this.process(selector); } catch (Exception e) { e.printStackTrace(); } } }); } public void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 移除,防止重复处理 ite.remove(); // 获得事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 数据总长度 int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); //读取数据 try { ret = channel.read(buffer); failure = false; } catch (Exception e) { // ignore } //判断是否链接已断开 if (ret <= 0 || failure) { key.cancel(); System.out.println("客户端断开链接"); }else{ System.out.println("收到数据:" + new String(buffer.array())); //回写数据 ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer);// 将消息回送给客户端 } } } }
解释:
a、worker的执行方式基本上面和boss的方式是同样的,只不够是处理方式不同
b、这里须要注意的是,都是考虑线程队列执行。
3)ThreadHandle(线程处理,这里主要是启动须要的东西)
package com.troy.application.netty; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; public class ThreadHandle { public final AtomicInteger bossIndex = new AtomicInteger(); public static NettyBoss[] bosses; public final AtomicInteger workerIndex = new AtomicInteger(); public static NettyWork[] workeres; public ThreadHandle(ExecutorService boss,ExecutorService work) { this.bosses = new NettyBoss[1]; //初始化boss线程池 for (int i = 0; i < bosses.length; i++) { bosses[i] = new NettyBoss(boss,this); } this.workeres = new NettyWork[Runtime.getRuntime().availableProcessors() * 2]; //初始化work线程池 for (int i = 0; i < workeres.length; i++) { workeres[i] = new NettyWork(work); } } public void bind(InetSocketAddress inetSocketAddress) { try { // 得到一个ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 设置通道为非阻塞 serverChannel.configureBlocking(false); // 将该通道对应的ServerSocket绑定到port端口 serverChannel.socket().bind(inetSocketAddress); //获取一个boss线程 NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)]; //向boss注册一个ServerSocket通道 Runnable runnable = () -> { try { //注册serverChannel到selector serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } }; //加入任务队列 nextBoss.taskQueue.add(runnable); if (nextBoss.selector != null) { //排除其余任务处理 if (nextBoss.wakenUp.compareAndSet(false, true)) { //放开阻塞 nextBoss.selector.wakeup(); } } else { //移除任务 nextBoss.taskQueue.remove(runnable); } } catch (Exception e) { e.printStackTrace(); } } }
解释:
a、这里采用数组的形式,主要目的是考虑多个看门的,和多个服务客人的线程。为了好控制,好选择,哪个来执行。
b、端口的注册,在NettyBoss里面进行初始化的的原理都是同样的。
4)start
package com.troy.application.netty; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Start { public static void main(String[] args) { //声明线程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService work = Executors.newCachedThreadPool(); //初始化线程池 ThreadHandle threadHandle = new ThreadHandle(boss,work); //声明端口 threadHandle.bind(new InetSocketAddress(9000)); System.out.println("start"); } }
说明一下流程
a、初始化boss和work。让boss线程池加入设定第一种boss的selector,而且处于阻塞状态。work的初始化也基本上是同样的,只不过换成了第二种selector线程池,处于阻塞状态。
b、当线程处理类初始化监听端口的时候。就是选择boss中其中一个selector。声明一个线程先监听,加入boss的线程安全队列中。而后放开boss阻塞,向下执行。线程执行会监听对应端口并阻塞。
c、当一个客户端接入的时候,boss中的selector会监听到对应端口。而后选择work线程中的一个selector给work分派任务。
d、最后work中的selector来处理事务。
四、源码下载:https://pan.baidu.com/s/1pKIxuMf
五、本代码只是用于理解netty的实现过程,不表明开发思路。其中我为了简化代码,作了不少调整。目的就是压缩代码,方便理解。