Netty是一个很是优秀的java nio框架,这已无需多言。国庆时逛StackOverFlow,发现有人问如何用netty来支持Spring MVC,逛了一圈github并无找到有价值的分享。
正好,我一直都想本身实现一个web容器,因而本着重复造轮子的精神,写了一个Xcafe。html
⑴ 使用SpringMVC处理http请求
⑵ 静态资源缓存和直接返回数据
⑶ 本机session生成和缓存
⑷ 支持直接返回对象
⑸ 支持文件上传下载(MultipartFile)
⑹ 支持ServletOutPutStream写返回数据java
⑴ 使用Spring MVC处理WebSocket
⑵ 实现Xcafe MVC 框架(不使用java servlet api,而是彻底根据Netty http编解码的实现)
⑶ 支持根据配置选择使用Spring MVC 或 Xcafe MVC
⑷ 负载均衡
⑸ 分布式缓存、分布式Session
⑹ 异步消息发送git
考虑到大部分Web请求的业务逻辑都须要请求数据库、读写文件等操做,为了尽量多地接受链接,尽量多地处理请求,当前的实现是将全部的业务逻辑处理交由其它线程池去处理。github
这将会致使线程间通讯和频繁的线程切换。当将Xcafe用做负载均衡、缓存服务器或者处理其它简单的不耗时的请求时,这并非一个合理的选择。所以,将来将在初始化容器时能够根据配置选择在workerGroup处理仍是使用额外的ThreadPool。web
将来的异步消息处理和WebSocket协议实现后,若是通过测试有必要,将再在上面的线程模型基础上增长消息发送队列线程池,线程池中的每个线程负责维护一个消息发送队列,初步考虑使用java的fork/join框架最大限度地处理消息发送请求。线程模型将会演变成:算法
bossGroup负责链接
workerGroup负责编解码
BusinessThreadPool负责业务逻辑
MessageThreadPool负责消息发送数据库
![]() |
1. core 容器核心,链接管理和协议处理 2. cache 静态资源缓存 3. example 示例 输入如下网址 http://localhost:18898/test/index.do http://localhost:18898/3.html 可进行简单的演示 4. mvc 将来须要实现的mvc框架 5. util 一些经常使用的工具类 6. web 容器须要使用到的根据Java Servlet api的具体实现 7. views 静态资源和模板文件的目录,能够选择将其包含到Jar文件中 8. 配置文件 |
全类名:com.igeeksky.xcafe.core.HttpServer
初始化服务(线程组、端口、缓冲区大小等……),将来须要完善CoreContext类,根据配置文件(或BootInit.class)初始化服务器上下文,并根据配置启动不一样类型的服务。api
public class HttpServer { private static final Logger logger = LoggerFactory.getLogger(HttpServer.class); private static final boolean SSL = System.getProperty("ssl") != null; private static int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "18898")); private final int BACKLOG = 1024; private final int TIMEOUT = 300; private static boolean running = false; public static void main(String[] args) { //…………………… } private void doStart() throws CertificateException, SSLException { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); final ExecutorService executorService = Executors.newFixedThreadPool(10); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, BACKLOG) //设定最大链接队列 .option(ChannelOption.SO_RCVBUF, 1024 * 256) //设定数据接收缓冲区大小 .option(ChannelOption.SO_SNDBUF, 1024 * 256) //设定数据发送缓冲区大小 .childOption(ChannelOption.SO_KEEPALIVE, true) //是否保持链接 //传入附带异步线程池的channelHandler .childHandler(new HttpPipelineInitializer(executorService, sslCtx, TIMEOUT)); Channel channel = b.bind(PORT).sync().channel(); //绑定端口直到绑定完成 channel.closeFuture().sync(); //阻塞关闭操做 } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
全类名:com.igeeksky.xcafe.core.HttpPipelineInitializer缓存
public class HttpPipelineInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; private final int timeOut; private final ExecutorService executorService; public HttpPipelineInitializer(ExecutorService executorService, SslContext sslCtx, int timeOut){ this.executorService = executorService; this.sslCtx = sslCtx; this.timeOut = timeOut; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast("timeout", new ReadTimeoutHandler(timeOut)); pipeline.addLast("codec", new HttpServerCodec()); pipeline.addLast(new HttpContentCompressor(9)); pipeline.addLast("aggegator", new HttpObjectAggregator(1024 * 1024 * 1024)); pipeline.addLast("ServerInbound", new HttpServerInboundHandler(executorService)); } }
全类名:com.igeeksky.xcafe.core.handler.HttpServerInboundHandler
待完善:根据服务器上下文环境,可选择是否交由线程池异步处理业务逻辑服务器
public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(HttpServerInboundHandler.class); private ExecutorService executorService; public HttpServerInboundHandler(ExecutorService executorService){ this.executorService = executorService; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest req = (FullHttpRequest)msg; boolean isKeepAlive = HttpUtil.isKeepAlive(req); if(!prepare(ctx, req, isKeepAlive)){ return; } executorService.execute(new Runnable(){ @Override public void run() { HttpResponse response = HttpDispacher.INSTANCE.dispach(ctx, msg); doWriteAndFlush(ctx, isKeepAlive, (FullHttpResponse)response); req.content().release(); } }); } //为避免篇幅过长,省略其它代码,请登陆github或下载压缩文件查看………………… }
全类名:com.igeeksky.xcafe.core.dispacher.HttpDispacher
根据URL构建请求参数,根据方法类型将请求交给不一样的方法处理
public enum HttpDispacher { INSTANCE; private static final HttpActionAdapter action = HttpActionAdapter4Spring.INSTANCE; public HttpResponse dispach(ChannelHandlerContext ctx, Object msg){ //long start = System.currentTimeMillis(); //非HTTP请求处理 if (!(msg instanceof FullHttpRequest)) { return action.doNotHttpRequest(ctx, msg); } FullHttpRequest request = (FullHttpRequest) msg; HttpMethod method = request.method(); //Http请求方法为空处理 if(null == method){ return action.doNullHttpMethod(ctx, request); } //构建URI String uri = request.uri(); String[] temp = uri.split("\\?"); String shortUri = temp[0]; //根据URL构建请求参数 Map<String, String[]> parameters = getParameters(temp); if(method.equals(HttpMethod.GET)){ return action.doGet(ctx, request, shortUri, parameters); } else if(method.equals(HttpMethod.POST)){ return action.doPost(ctx, request, shortUri, parameters); } //为避免篇幅过长,省略其它代码,请登陆github或下载压缩文件查看………………… else{ return action.doUnContainMethod(ctx, request, shortUri, parameters); } } private Map<String, String[]> getParameters(String[] temp) { //为避免篇幅过长,省略其它代码,请登陆github或下载压缩文件查看………………… } }
全类名:com.igeeksky.xcafe.core.action.HttpActionAdapter4Spring
初始化SpringMVC环境,封装http请求/响应为Java Servlet API的具体实现,将请求交给具体的MVC框架去处理。今后类开始,进入真正的业务逻辑处理,后面的处理逻辑不该再与Netty代码发生关联。
拦截处理普通静态资源请求并实现缓存机制,也能够选择将所有请求交由MVC框架处理。
可扩展:能够根据不一样的MVC框架实现不一样的适配器。
待完善:静态资源请求处理类
public enum HttpActionAdapter4Spring implements HttpActionAdapter { INSTANCE; private static final Logger logger = LoggerFactory.getLogger(HttpActionAdapter4Spring.class); //Spring 应用上下文环境 private static final XmlWebApplicationContext wac = new XmlWebApplicationContext(); //Spring MVC的请求处理分发器(经过此类实现Spring MVC支持) private static final DispatcherServlet dispatcherServlet; //容器核心上下文环境配置 private static final XcafeCoreContext coreContext = new XcafeCoreContext(); //兼容Java Servlet API的servletContext private static final XcafeServletContext servletContext = new XcafeServletContext(); //静态资源缓存类 private static final ResourceCache CACHE = ResourceCacheDefault.INSTANCE; //Netty提供的数据处理工厂类(主要用于Multipart File解码的参数配置) private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MAXSIZE); //将静态资源请求交由容器处理 或 MVC框架处理 private static boolean isStaticSupport = true; static{ //初始化Spring上下文环境 //为避免篇幅过长,省略其它代码,请登陆github或下载压缩文件查看………………… } @Override public HttpResponse doGet(ChannelHandlerContext ctx, FullHttpRequest request, String requestURI, Map<String, String[]> parameters) { FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); //请求静态资源(可选择是否进入SpringMVC处理) if(isStaticSupport && !requestURI.endsWith(".do") && !requestURI.endsWith(".mo") && !requestURI.endsWith(".ws")){ return getStaticResource(request, resp, requestURI, parameters); } /* 请求动态资源 */ //Request包装类:将Netty的request包装成兼容Java Servlet API的request FullHttpRequestWrapper requestWrapper = new FullHttpRequestWrapper(servletContext, request, requestURI, parameters); //Response包装类:将Netty的response包装成兼容Java Servlet API的Response FullHttpResponseWrapper responseWrapper = new FullHttpResponseWrapper(ctx, resp); requestWrapper.setResponse(responseWrapper); //interceptors.doFilter(requestWrapper, responseWrapper); try { dispatcherServlet.service(requestWrapper, responseWrapper); } catch (ServletException | IOException e) { logger.error("", e); } return responseWrapper.getResponse(); } @Override public HttpResponse doPost(ChannelHandlerContext ctx, FullHttpRequest request, String requestURI, Map<String, String[]> parameters){ FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); //MultiPartRequest包装类:将Netty的request包装成兼容Java Servlet API的MultiPartRequest FullHttpRequestWrapper requestWrapper = new MultipartFullHttpRequestWrapper(servletContext,factory, request, requestURI, parameters); FullHttpResponseWrapper responseWrapper = new FullHttpResponseWrapper(ctx, resp); try { dispatcherServlet.service(requestWrapper, responseWrapper); } catch (ServletException | IOException e) { logger.error("", e); } return responseWrapper.getResponse(); } /** * <b>返回静态资源</b></br></br> * 待修改:新建http协议管理类,根据http协议完善请求和响应</br> */ private HttpResponse getStaticResource(FullHttpRequest request, FullHttpResponse resp, String requestURI, Map<String, String[]> parameters) { //省略代码,请登陆github或下载压缩文件查看…………………… } }
全类名:com.igeeksky.xcafe.cache.ResourceCacheDefault
采用单线程异步循环监听执行的无锁设计,实现了LRU,LFU,FIFO算法,与及综合性的LRFU算法。
待完善:根据配置文件设置参数
具体实现超过600行代码,为避免篇幅过长,请登陆github或下载压缩文件查看。
![]() |
1. FullHttpRequestWrapper 封装Netty 的Http请求,转换成兼容Java Servlet API的Request。 FullHttpResponseWrapper如上。 2. MultipartFullHttpRequestWrapper 封装Netty 的Http请求,转换成兼容Java Servlet API的MultipartRequest 3. XcafeMultipartFile 封装Netty的FileUpload,转换成兼容Java Servlet API的MultipartFile 4. XcafeServletOutputStream 封装Netty的Bytebuf,将其转换成兼容Java Servlet API的ServletOutputStream实现流方式写数据。 XcafeServletInputStream如上。 5. LocalSessionManager 工具类:实现Session的本地缓存和读取。分布式缓存待后续再实现 6. websocket支持待后续再处理 |
Xcafe当前只是一个尝试性的Web容器,若是要成长为一个稍稍成熟的项目,仍须要完善不少细节并通过大量的测试。若是你有任何建议和问题,请联系我。
Github:
https://github.com/tonylau08/xcafe