近来在学习Java NIO网络开发知识,写了一个基于Java NIO的多人在线聊天工具MyChat练练手。源码公开在Coding上:html
https://coding.net/u/hust_wsh/p/MyChat/git ,开发环境是Ubuntu14.04+Eclipse Mars+JDK1.8。git
编写一个基于Java NIO的多人在线聊天工具,须要如下几方面的知识:客户端服务器模型,Java NIO中的Selector,Channel,ByteBuffer,Collections以及序列化和反序列化的知识。下面来对照源码逐一剖析MyChat的源码构成:服务器
一.服务器网络
为了便于实时分析服务器在线人数和聊天室列表,须要在服务器端提供一个交互接口,也就是获取System.in的输入,执行相应的操做,以下所示: 多线程
System.out.println("===输入选择项==="); System.out.println("1.获取用户列表;2.获取聊天室列表;3.获取指定聊天室成员;4.关闭服务器"); boolean isExit=false; Scanner scanner=new Scanner(System.in);
因为主线程须要运行交互界面,这样一来执行与客户端交互任务的代码就要放在另一个线程中了:函数
ChatServer server=new ChatServer(); Thread serverThread=new Thread(server,"聊天服务器"); serverThread.setDaemon(true);//后台进程 serverThread.start();
接下来将分别介绍服务器端的实现类ChatServer的关键成员:工具
private Selector mSelector=null;//用于注册全部链接到服务器的SocketChannel对象
//保存全部用户的Map private Map<String,UserEntity> mUsers=Collections.synchronizedMap(new HashMap<String,UserEntity>());
//保存全部聊天室的Map
private Map<String,ChatRoom> mRooms=Collections.synchronizedMap(new HashMap<String,ChatRoom>());//聊天室
第一个成员变量mSelector是一个Selector对象,用于管理全部链接到服务器的Channel,为了管理多个通道的读写,要将不一样的通道注册到一个Selector对象上。每一个通道分配有一个SelectionKey。而后程序能够询问这个Selector对象,哪些通道已经准备就绪能够无阻塞的完成你但愿完成的操做,能够请求Selector对象返回相应的键集合。经过调用Selector类的惟一构造函数:静态工厂方法open()来建立新的选择器,并经过register()方法注册通道。性能
mSelector=Selector.open(); ServerSocketChannel server=ServerSocketChannel.open(); InetSocketAddress isa=new InetSocketAddress(mHost, mPort); server.bind(isa);//绑定指定端口 server.configureBlocking(false); server.register(mSelector, SelectionKey.OP_ACCEPT); System.out.println("服务器在"+mPort+"端口启动成功");
注册成功后,就能够经过Selector的select()方法查询已经就绪的通道。学习
while(mSelector.select()>0) { Iterator<SelectionKey> iterator=mSelector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey sk=iterator.next(); iterator.remove();
select()方法用于查询注册到Selector上的待处理的就绪Channel,是一个阻塞方法,直到至少有一个注册的Channel准备好以后就能够进行处理。SelectionKey对象至关于通道的指针,能够保存通道的链接状态。Selector对象的selectedKeys()方法能够返回全部注册Channel的SelectionKey。接下来能够经过isAccetable(),isReadable(),isWritable()等方法测试该键能进行的操做。测试
ServerSocketChannel类只有一个目的:接受入站链接。经过注册到Selector对象来获取入站链接通知,以下所示:
if(sk.isAcceptable()) { SocketChannel sc=server.accept();//开始接收客户端链接 sc.configureBlocking(false); sc.register(mSelector, SelectionKey.OP_READ); sk.interestOps(SelectionKey.OP_ACCEPT); }
接下来能够经过sk.isReadable()进入处理客户端数据的代码块:
if(sk.isReadable())//有数据 { SocketChannel sc=(SocketChannel)sk.channel(); ByteBuffer buffer=ByteBuffer.allocate(1024); ByteArrayOutputStream boStream=new ByteArrayOutputStream(); try { while(sc.read(buffer)>0)//TODO:性能问题 { buffer.flip(); boStream.write(Arrays.copyOfRange(buffer.array(), 0, buffer.limit())); } byte[] frame=boStream.toByteArray(); boStream.close();
为了能进一步讲明白为何须要上面这种方式读取客户端信息,这里先插入讲解一下服务器和客户端交互的信使类Message。为了提高扩展性,我定义了一个Serializable类Message,用于服务器和客户端之间进行交互(如登陆,返回结果,建立聊天室等)。Message类的定义以下:
1 class Message implements Serializable 2 { 3 private static final long serialVersionUID = 1L; 4 private Map<FieldType,String> fields=new HashMap<>();//TODO:泛型支持,任意消息类型,包括文本,图片,语音,视频,文件等 5 private Commands command; 6 public Message(Commands command) 7 { 8 this.command=command; 9 } 10 public Commands getCommand() 11 { 12 return this.command; 13 } 14 public Message set(FieldType key,String value) 15 { 16 if(key!=null&&value!=null) 17 { 18 fields.put(key,value); 19 } 20 return this; 21 } 22 public String get(FieldType key) 23 { 24 return fields.get(key); 25 } 26 27 public byte[] toBytes() 28 { 29 return SerializeHelper.serialize(this); 30 } 31 32 public ByteBuffer wrap() 33 { 34 byte[] frame=toBytes(); 35 return ByteBuffer.wrap(frame); 36 } 37 }
其中有两个关键的成员:一个Map型的用于保存数据的field成员和一个枚举类型的用于代表命令类型的command成员。其中Command枚举定义以下:
enum Commands{ LOG_IN, LOG_OUT, QUERY_USERS, QUERY_ALL_CHAT_ROOMS, QUERY_MY_CHAT_ROOMS, QUERY_ROOM_MEMBERS, HEART_BEAT, MSG_P2P,//我的对我的的消息 MSG_P2R,//聊天室消息 CREATE_CHAT_ROOM, JOIN_CHAT_ROOM, LEAVE_CHAT_ROOM, SET_USER_NAME; };
另外,为了指名携带数据的类型,定义了一个FieldType枚举,以下:
enum FieldType{ USER_ID, USER_NAME, PASS_WD, PEER_ID,//单聊对象的ID ROOM_ID,//聊天室ID USER_LIST,//用户列表 ROOM_LIST_ALL,//全部房间列表 ROOM_LIST_ME,//个人聊天室列表 ROOM_MEMBERS,//用户列表 MSG_TXT, RESPONSE_STATUS, ENCODING; };
这样一来,服务器和客户端就能够经过这种可序列化的Message相互通讯了。具体就是客户端将要发送给服务器的数据封装在Message对象中后,经过SocketChanne发送到服务器,服务器收到数据后经过反序列化获取原始的Message对象,并根据command成员来判断接收到的是什么类型的Message,如登陆,点对点消息等。
Message msg=(Message)SerializeHelper.deSerialize(frame); if(msg!=null) { String userId=msg.get(FieldType.USER_ID); switch (msg.getCommand()) { case LOG_IN: { System.out.println("用户"+userId+"请求登陆..."); Message message=new Message(Commands.LOG_IN); //TODO:检查用户名密码,暂时没有注册功能,就只检测用户名是否重复 if(!mUsers.containsKey(userId)) { message.set(FieldType.RESPONSE_STATUS,"成功"); System.out.println("用户"+userId+"登陆成功"); UserEntity user=new UserEntity(userId,sc); mUsers.put(userId,user); } else { message.set(FieldType.RESPONSE_STATUS,"该账号已经登陆"); } //发送登陆结果 sendRawMessage(sc, message); break; }
这里出现的mUsers对象,就是我要介绍的服务器端第二个重要的成员变量,mUsers是一个用Collections.synchronizedSet封装的支持多线程访问的HashSet,用于保存[用户ID->用户对象]的映射。所谓用户对象就是另外定义的一个用于保存用户基本信息的类,其中包含了用户的id,passwd,对应的SocketChannel和所加入的聊天室集合。以下所示:
class UserEntity{ private String mUserId; private String mPassWd; private SocketChannel mSocketChannel; private Set<String> mJoinedRooms=Collections.synchronizedSet(new HashSet<String>());
服务器端还有一个重要的成员变量,用于保存服务器端全部聊天室的集合,也是一个用Collections.synchronizedSet封装的HashSet,用于保存[聊天室ID->聊天室对象]的映射。聊天室对象是专门定义的一个保存聊天室基本信息的类,其中包含了聊天室id,聊天室成员集合。以下所示:
final class ChatRoom { private String mRoomId=null; private Set<String> mUsers=Collections.synchronizedSet(new HashSet<String>());
到此,服务器端的代码基本剖析完毕,接下来咱们看看客户端的代码。
二.客户端
客户端的代码相对服务器来讲要简单许多,一个典型的NIO客户端程序链接服务器流程以下所示:
mSelector=Selector.open(); InetSocketAddress remote=new InetSocketAddress(host, port); mSocketChannel=SocketChannel.open(remote); mSocketChannel.configureBlocking(false); mSocketChannel.register(mSelector, SelectionKey.OP_READ);
其中注册Selector的接口几乎与服务器一致,除了传递给register方法的第二个参数不一样。注册完通道后就能够向服务器发送登陆请求了:
Message message=new Message(Commands.LOG_IN); message.set(FieldType.USER_ID, userid); message.set(FieldType.PASS_WD, passwd); sendRawMessage(message);
其中sendRawMessage是一个私有方法,用于将Message序列化后使用ByteBuffer经过SocketChannel发送到服务器端,具体代码以下:
private void sendRawMessage(Message message) { if(mSocketChannel!=null&&message!=null) { try { mSocketChannel.write(message.wrap()); } catch (Exception e) { e.printStackTrace(); } } }
我为Message类设计了一个wrap()方法能够将Message序列化后的byte[]包装成ByteBuffer返回,从而能够直接做为SocketChannel.write()方法的参数。具体代码能够参考文章开头的Git仓库。
与服务器同样,客户端须要接收用户输入,从而也将与服务器交互的部分放在单独的线程运行。我将这个线程类放在ChatClient类的内部做为嵌套类,这样能够直接访问外部类的成员变量,为线程之间通讯提供便利。
三.实例分析
介绍完服务器和客户端的设计以后,下面以建立聊天室为例详细介绍客户端和服务器端的通讯流程。
当客户端登陆到服务器中后,服务器会保存客户端的用户ID以及对应的SocketChannel信息,客户端经过一条建立聊天室的Message向服务器申请建立聊天室:
Message message=new Message(Commands.CREATE_CHAT_ROOM); message.set(FieldType.USER_ID,mUserId ); message.set(FieldType.ROOM_ID, roomId); sendRawMessage(message);
如上所示,该Message的命令字是Commands.CREATE_CHAT_ROOM,包含了两个域,分别是建立者的ID和待建立的房间ID(这里为了设计简便,将ID和名称等同为一个概念,实际中ID应该是一个惟一的整型量,名称是聊天室的名字,能够重复)。服务器端经过反序列化Message,并提取对应的命令字进入对应的处理逻辑:
case CREATE_CHAT_ROOM: { System.out.println("用户"+userId+"请求建立聊天室"); String roomId=msg.get(FieldType.ROOM_ID); Message message=new Message(Commands.CREATE_CHAT_ROOM); if(!StringHelper.isNullOrTrimEmpty(roomId)) { if(!mRooms.containsKey(roomId)) { ChatRoom room=new ChatRoom(roomId); room.addUser(userId); mRooms.put(roomId, room); UserEntity user=mUsers.get(userId); if(user!=null) user.joinRoom(roomId); message.set(FieldType.RESPONSE_STATUS, "成功"); } else { message.set(FieldType.RESPONSE_STATUS, "建立失败,已存在同名聊天室"); } } else//返回错误消息 { message.set(FieldType.RESPONSE_STATUS, "建立失败,聊天室名称不能为空"); } sendRawMessage(sc, message); break; }
咱们来仔细分析下上面的代码。首先从Message中提取到了userId和roomId,而后判断服务器端mRooms集合是否已经存在同名聊天室,若是不存在,则建立一个新的聊天室:ChatRoom room=new Chat(roomId)。并将建立者本人加入到聊天室用户列表中:room.addUser(userId)。同时,为了方便查找用户加入的全部聊天室,还将该聊天室的ID经过UserEntity的joinRoom()方法保存到了UserEntity的聊天室集合中,最后将表示正确结果的Message发送给请求客户端;反之若是已经存在同名聊天室,则将包含错误信息的Message发送给客户端。而客户端负责与服务器端交互的线程则经过反序列化Message获取操做结果,并显示给用户。
为了更加直观地展现MyChat的工做流程,将终端运行的结果整了几张截图附在下面:
客户端1:
客户端2:
服务器端:
本文为原创,转载请声明:转载自hust_wsh的技术博客:http://www.cnblogs.com/hust_wsh/p/5166001.html