分布式系统之间通讯能够分为两种:java
分布式子系统之间须要通讯时,就发送消息。通常通讯的两个要点是:消息处理和消息传输。服务器
在Java中可基于Socket、ServerSocket来实现TCP/IP+BIO的系统通讯。网络
为了知足服务端能够同时接受多个请求,最简单的方法是生成多个Socket。但这样会产生两个问题:异步
为了解决上面的问题,一般采用链接池的方式来维护Socket。一方面能限制Socket的个数;另外一方面避免重复建立Socket带来的性能降低问题。这里有一个问题就是设置合适的相应超时时间。由于链接池中Socket个数是有限的,确定会形成激烈的竞争和等待。socket
//建立链接 Socket socket = new Socket(目标IP或域名, 目标端口); //BufferedReader用于读取服务端返回的数据 BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); //PrintWriter向服务器写入流 PrintWriter out = new PrintWriter(socket.getOutputStream(),true); //像服务端发送流 out.println("hello"); //阻塞读取服务端的返回信息 in.readLine();
//建立对本地端口的监听 PrintWriter out = new PrintWriter(socket.getOutputStream(),true); //向服务器发送字符串信息 out.println("hello"); //阻塞读取服务端的返回信息 in.readLine();
Java能够基于Clannel和Selector的相关类来实现TCP/IP+NIO方式的系统间通讯。Channel有SocketClannel和ServerSocketChannel两种。分布式
SocketChannel channel = SocketChannel.open(); //设置为非阻塞模式 channel.configureBlocking(false); //对于非阻塞模式,当即返回false,表示链接正在创建中 channel.connect(SocketAdress); Selector selector = Selector.open(); //向channel注册selector以及感兴趣的链接事件 channel.regester(selector,SelectionKey.OP_CONNECT); //阻塞至有感兴趣的IO事件发生,或到达超时时间 int nKeys = selector.select(超时时间【毫秒计】); //若是但愿一直等待知道有感兴趣的事件发生 //int nKeys = selector.select(); //若是但愿不阻塞直接返回当前是否有感兴趣的事件发生 //int nKeys = selector.selectNow(); //若是有感兴趣的事件 SelectionKey sKey = null; if(nKeys>0){ Set<SelectionKey> keys = selector.selectedKeys(); for(SelectionKey key:keys){ //对于发生链接的事件 if(key.isConnectable()){ SocketChannel sc = (SocketChannel)key.channel(); sc.configureBlocking(false); //注册感兴趣的IO读事件 sKey = sc.register(selector,SelectionKey.OP_READ); //完成链接的创建 sc.finishConnect(); } //有流可读取 else if(key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel sc = (SocketChannel) key.channel(); int readBytes = 0; try{ int ret = 0; try{ //读取目前可读取的值,此步为阻塞操做 while((ret=sc.read(buffer))>0){ readBytes += ret; } } fanally{ buffer.flip(); } } finally{ if(buffer!=null){ buffer.clear(); } } } //可写入流 else if(key.isWritable()){ //取消对OP_WRITE事件的注册 key.interestOps(key.interestOps() & (!SelectionKey.OP_WRITE)); SocketChannel sc = (SocketChannel) key.channel(); //此步为阻塞操做 int writtenedSize = sc.write(ByteBuffer); //如未写入,则继续注册感兴趣的OP_WRITE事件 if(writtenedSize==0){ key.interestOps(key.interestOps()|SelectionKey.OP_WRITE); } } } Selector.selectedKeys().clear(); } //对于要写入的流,可直接调用channel.write来完成。只有在未写入成功时才要注册OP_WRITE事件 int wSize = channel.write(ByteBuffer); if(wSize == 0){ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); }
ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket serverSocket = ssc.socket(); //绑定要监听的接口 serverSocket.bind(new InetSocketAdress(port)); ssc.configureBlocking(false); //注册感兴趣的链接创建事件 ssc.register(selector,SelectionKey.OP_ACCEPT);
Java对UDP/IP方式的网络数据传输一样采用Socket机制,只是UDP/IP下的Socket没有创建链接,所以没法双向通讯。若是须要双向通讯,必须两端都生成UDP Server。性能
Java中经过DatagramSocket和DatagramPacket来实现UDP/IP+BIO方式和系统间通讯。spa
因为UDP双端不创建链接,因此也就不存在竞争问题,只是最终读写流的动做是同步的。操作系统
//若是但愿双向通讯,必须启动一个监听端口承担服务器的职责 //若是不能绑定到指定端口,则抛出SocketException DatagramSocket serverSocket = new DatagramSocket(监听的端口); byte[] buffer = new byte[65507]; DatagramPacket receivePacket = new DatagramPacket(buffer,buffer.length); DatagramSocket socket = new DatagramSocket(); DatagramPacket packet = new DatagramPacket(datas,datas.length,server.length); //阻塞方式发送packet到指定的服务器和端口 socket.send(packet); //阻塞并同步读取流消息,若是读取的流消息比packet长,则删除更长的消息 //当链接不上目标地址和端口时,抛出PortUnreachableException DatagramSocket.setSoTimeout(超时时间--毫秒级); serverSocket.receive(receivePacket);
Java中能够经过DatagramClannel和ByteBuffer来实现UDP/IP方式的系统间通讯。rest
//读取流信息 DatagramChannel receiveChannel = DatagramChannel.open(); receiveChannel.configureBlocking(false); DatagramSocket socket = receiveChannel.socket(); socket.bind(new InetSocketAddress(rport)); Selector selector = Selector.open(); receiveChannel.register(selector, SelectionKey.OP_REEAD); //以后便可像TCP/IP+NIO中对selector遍历同样的方式进行流信息的读取 //... //写入流信息 DatagramChannel sendChannel = DatagramChannel.open(); sendChannel.configureBlocking(false); SocketAdress target = new InetSocketAdress("127.0.0.1",sport); sendChannel.connect(target); //阻塞写入流 sendChannel.write(ByteBuffer);