day 4 Socket 和 NIO Netty

 Scoket通讯--------这是一个例子,能够在这个例子的基础上进行相应的拓展,核心也是在多线程任务上进行修改javascript

package cn.itcast.bigdata.socket;

import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class ServiceServer { public static void main(String[] args) throws Exception { // 建立一个serversocket,绑定到本机的8899端口上 ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress("localhost", 8899)); // 接受客户端的链接请求;accept是一个阻塞方法,会一直等待,到有客户端请求链接才返回 while (true) { Socket socket = server.accept(); new Thread(new ServiceServerTask(socket)).start(); } } }
package cn.itcast.bigdata.socket;

import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; public class ServiceClient { public static void main(String[] args) throws Exception { /*ServiceIterface service = ProxyUtils.getProxy(ServiceIterface.class,"methodA",hostname,port); Result = service.methodA(parameters);*/ // 向服务器发出请求创建链接 Socket socket = new Socket("localhost", 8899); // 从socket中获取输入输出流 InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); PrintWriter pw = new PrintWriter(outputStream); pw.println("hello"); pw.flush(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); String result = br.readLine(); System.out.println(result); inputStream.close(); outputStream.close(); socket.close(); } }
package cn.itcast.bigdata.socket;

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; public class ServiceServerTask implements Runnable{ Socket socket ; InputStream in=null; OutputStream out = null; public ServiceServerTask(Socket socket) { this.socket = socket; } //业务逻辑:跟客户端进行数据交互  @Override public void run() { try { //从socket链接中获取到与client之间的网络通讯输入输出流 in = socket.getInputStream(); out = socket.getOutputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(in)); //从网络通讯输入流中读取客户端发送过来的数据 //注意:socketinputstream的读数据的方法都是阻塞的 String param = br.readLine();// 按理说这里应该改成多行的就是while(true) /** * 做业: * 将如下业务调用逻辑写成更加通用的:能够根据客户端发过来的调用类名、调用方法名、调用该参数来灵活调用 * * 《反射》 * */ GetDataServiceImpl getDataServiceImpl = new GetDataServiceImpl(); String result = getDataServiceImpl.getData(param); //将调用结果写到sokect的输出流中,以发送给客户端 PrintWriter pw = new PrintWriter(out); pw.println(result); pw.flush(); } catch (IOException e) { e.printStackTrace(); }finally{ try { in.close(); out.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
package cn.itcast.bigdata.socket;

public class GetDataServiceImpl { public String getData(String param){ return "ok-"+param; } }

PS:这是传统的调用方式,当线程多了之后就不同了。 NIO 非阻塞IO,是新的 可是不知道怎么使用,如今知道了 NIO就是使用Netty框架,性能高不少

PS : RPC是一种远程调用的协议,有不少的实现,WebService就是一种实现。
-------------------------------------自定义RPC的框架-

 
 
PS:1.正常一个应用,蓝线上面是用户须要配置的。配置好怎样交换呢?首先spring启动好后,会扫描注解,而后把javabean放在hashmap结构中。   
  2.当有客户端有请求过来时,传统是用socket通讯,可是这种方式性能会出现问题。如今一般使用Netty来解决这个问题(而不是使用传统的socket通讯)。
  3.客户端也是注解,他是经过动态代理包装整合向外暴漏接口。而后二者经过socket通讯。
   ---------------
  4.若是不该用zookeeper的话,这就是一个完成的sp ring的使用。使用zookeeper之后就能够 远程调用了。(由于注册了服务器的地址和端口)
PS:在实现上,服务器端经过spring自定义自定义注解 获取相应的 service, 而后经过 netty (NIO)进行异步传输,使用zookeeper进行 使用RPC服务器配置。
PS :netty是为了解决传统的阻塞的问题,netty是nio的一种实现。
 
 

 ----------------------------------------------------------------------------html

 

  

PS:传统的要在内核之间交互java

PS :Netty是一个很庞大的体系, 若是精通netty的话,薪水是普通屌丝的好几倍。hadoop、spark都是有用netty
PS:
0.首先从操做系统的底层是支持异步IO的,可是以前的异步IO是C++的多,Java的异步IO少,源生的socket是同步的性能很差,急需异步框架出现
1.传统BIO模式是这样的,来一个请求,建立一个线程;

       后来出现了线程池或者使用消息队列来实现一个线程或多个线程处理N个请求的模型,其实底层仍是同步IO,称之为“伪异步IO ,这样其实仍是会出现问题,若是队列满的话,仍是会出现链接超时。只能用NIO出手了。web

 
NIO

 PS :  低负载、低并发应该使用同步阻塞;  高并发进行   NIOspring

1.类库的简介: buffer、channel、selectorbootstrap

2.看了源码之后,发现NIO的源码比较复杂浏览器

总结: 由于原声NIO java代码编写起来比较费劲,因此不太鼓励用java原声代码(须要对多线程比较熟悉),服务器

而是推荐使用netty,由于netty就是高并发的解决方案websocket

PS : Netty打包部署也很简单就是一个jar文件,具体代码看下面网络

PS:

PS : 

PS:日常Java网络传输数据的时候比较慢,后来出现了几种框架

 

 

 
 


PS : 上面是netty的位置




PS :WebSocket(二)-WebSocket、Socket、TCP、HTTP区别

PS:WebSocket他是基于Tcp的新的,比传统的更好

PS  : 案例-----看慕课网的培训教程代码

package com.imooc.netty;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 存储整个工程的全局配置
 * @author liuyazhuang
 *
 */
public class NettyConfig {
    
    /**
     * 存储每个客户端接入进来时的channel对象
     */
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
}
package com.imooc.netty;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

/**
 * 接收/处理/响应客户端websocket请求的核心业务处理类
 * @author liuyazhuang
 *
 */
public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> {
    
    private WebSocketServerHandshaker handshaker;
    private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
    //客户端与服务端建立链接的时候调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyConfig.group.add(ctx.channel());//group像是集合类
        System.out.println("客户端与服务端链接开启...");
    }

    //客户端与服务端断开链接的时候调用
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyConfig.group.remove(ctx.channel());
        System.out.println("客户端与服务端链接关闭...");
    }

    //服务端接收客户端发送过来的数据结束以后调用
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
    }

    //工程出现异常的时候调用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    //服务端处理客户端websocket请求的核心方法
    @Override
    protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
        //处理客户端向服务端发起http握手请求的业务
        if (msg instanceof FullHttpRequest) {
            handHttpRequest(context,  (FullHttpRequest)msg);
        }else if (msg instanceof WebSocketFrame) { //处理websocket链接业务
            handWebsocketFrame(context, (WebSocketFrame)msg);
        }
    }
    
    /**
     * 处理客户端与服务端以前的websocket业务
     * @param ctx
     * @param frame
     */
    private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
        //判断是不是关闭websocket的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
        }
        //判断是不是ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        
        //判断是不是二进制消息,若是是二进制消息,抛出异常
        if( ! (frame instanceof TextWebSocketFrame) ){
            System.out.println("目前咱们不支持二进制消息");
            throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息");
        }
        //返回应答消息
        //获取客户端向服务端发送的消息
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("服务端收到客户端的消息====>>>" + request);
        if(request.equals("CXLL")){
            request = "正在办理查询流量";
        }
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() 
                                                                                                + ctx.channel().id() 
                                                                                                + " ===>>> " 
                                                                                                + request);
        //群发,服务端向每一个链接上来的客户端                群发          消息
        NettyConfig.group.writeAndFlush(tws);//响应到浏览器中的内容!!!!
    }
    /**
     * 处理客户端向服务端发起http握手请求的业务
     * @param ctx
     * @param req
     */
    private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        if (!req.getDecoderResult().isSuccess() 
                || ! ("websocket".equals(req.headers().get("Upgrade")))) {//不是websocket的请求
            sendHttpResponse(ctx, req, 
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                WEB_SOCKET_URL, null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        }else{
            handshaker.handshake(ctx.channel(), req);
        }
    }
    
    /**
     * 服务端向客户端响应消息
     * @param ctx
     * @param req
     * @param res
     */
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
            DefaultFullHttpResponse res){
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        //服务端向客户端发送数据
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
}
package com.imooc.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 初始化链接时候的各个组件
 * @author liuyazhuang
 *
 */
public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel e) throws Exception {
        e.pipeline().addLast("http-codec", new HttpServerCodec());
        e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        e.pipeline().addLast("handler", new MyWebSocketHandler());
    }

}
package com.imooc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 程序的入口,负责启动应用
 * @author liuyazhuang
 *
 */
public class Main {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new MyWebSocketChannelHandler());
            System.out.println("服务端开启等待客户端链接....");
            Channel ch = b.bind(8888).sync().channel();
            ch.closeFuture().sync();
            
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            //优雅的退出程序
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
        <title>WebSocket客户端</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }

        if(window.WebSocket){
            socket = new WebSocket("ws://localhost:8888/websocket");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseContent');
                ta.value += event.data + "\r\n";
            };

            socket.onopen = function(event){
                var ta = document.getElementById('responseContent');
                ta.value = "你当前的浏览器支持WebSocket,请进行后续操做\r\n";
            };

            socket.onclose = function(event){
                var ta = document.getElementById('responseContent');
                ta.value = "";
                ta.value = "WebSocket链接已经关闭\r\n";
            };
        }else{
            alert("您的浏览器不支持WebSocket");
        }


        function send(message){
            if(!window.WebSocket){
                return;
            }
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket链接没有创建成功!!");
            }
        }
    </script>
    </head>
    <body>
        <form onSubmit="return false;">
            <input type = "text" name = "message" value = ""/>
            <br/><br/>
            <input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>
            <hr color="red"/>
            <h2>客户端接收到服务端返回的应答消息</h2>
            <textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>
        </form>
    </body>
</html>

------------------------------------------------------------------------------------------------------

Dubbo底层用的就是Netty
3.2. netty的helloworld 3.2.1. 下载netty包 • 下载netty包,下载地址http://netty.io/ 3.2.2. 服务端启动类 package com.netty.demo.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * • 配置服务器功能,如线程、端口 • 实现服务器处理程序,它包含业务逻辑,决定当有一个请求链接或接收数据时该作什么 * * @author wilson * */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup eventLoopGroup = null; try { //建立ServerBootstrap实例来引导绑定和启动服务器 ServerBootstrap serverBootstrap = new ServerBootstrap(); //建立NioEventLoopGroup对象来处理事件,如接受新链接、接收数据、写数据等等 eventLoopGroup = new NioEventLoopGroup(); //指定通道类型为NioServerSocketChannel,设置InetSocketAddress让服务器监听某个端口已等待客户端链接。 serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost",port).childHandler(new ChannelInitializer<Channel>() { //设置childHandler执行全部的链接请求 @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,而后服务器等待通道关闭,由于使用sync(),因此关闭操做也会被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync();//主要用于异步操做通知回调 System.out.println("开始监听,端口为:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync();//方法柱塞,等待服务端链路关闭后,Main函数才退出 } finally { eventLoopGroup.shutdownGracefully().sync(); //优雅的退出,释放全部的资源 } } public static void main(String[] args) throws Exception { new EchoServer(20000).start(); } } 3.2.3. 服务端回调方法 package com.netty.demo.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class EchoServerHandler extends ChannelInboundHandlerAdapter {// channelInboundHandlerAdapter 对于网络事件进行读写操做 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server 读取数据……"); //读取数据 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("接收客户端数据:" + body); //向客户端写数据 System.out.println("server向client发送数据"); String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("server 读取数据完毕.."); ctx.flush();//刷新后才将数据发出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 3.2.4. 客户端启动类 package com.netty.demo.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; /** * • 链接服务器 • 写数据到服务器 • 等待接受服务器返回相同的数据 • 关闭链接 * * @author wilson * */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup nioEventLoopGroup = null; try { //建立Bootstrap对象用来引导启动客户端 Bootstrap bootstrap = new Bootstrap(); //建立EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup能够理解为是一个线程池,这个线程池用来处理链接、接受数据、发送数据;专门用来网络数据的处理 nioEventLoopGroup = new NioEventLoopGroup(); //建立InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定链接的服务器地址 bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { //添加一个ChannelHandler,客户端成功链接服务器后就会被执行 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); // • 调用Bootstrap.connect()来链接服务器 ChannelFuture f = bootstrap.connect().sync(); // • 最后关闭EventLoopGroup来释放资源 f.channel().closeFuture().sync(); } finally { nioEventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 20000).start(); } } 3.2.5. 客户端回调方法 package com.netty.demo.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { //客户端链接服务器后被调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端链接服务器,开始发送数据……"); byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuf firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); ctx.writeAndFlush(firstMessage); } //• 从服务器接收到数据后调用 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client 读取server数据.."); //服务端返回消息后 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服务端数据为 :" + body); } //• 发生异常时被调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught.."); // 释放资源 ctx.close(); } }

 

 

 
  

 

 
   
  

 

 

 
  

 

 

 

PS:谈谈我的的理解

图上面是传统的IO交互方式,会出现io阻塞。而后又下图NIO,我也看不太明白具体是怎样的交互方式,可是在netty引入的地方我知道和传统IO流差很少,感受更方便一些。

速度应该也更快速一些。  后面的框架都是根据这个完成的。   还有就是不少hadoop的框架并不难,都是业务逻辑。

---------------------------------

PS : 这些实现是为了 讲解其实就是dubbo的简单实现,

相关文章
相关标签/搜索