socket,又称套接字,是在不一样的进程间进行网络通信的一种协议、约定或者说是规范。java
对于socket编程,它更多的时候像是基于TCP/UDP等协议作的一层封装或者说抽象,是一套系统所提供的用于进行网络通讯相关编程的接口。linux
咱们以linux操做系统提供的基本api为例,了解创建一个socket通讯的基本流程:编程
能够看到本质上,socket是对tcp链接(固然也有多是udp等其余链接)协议,在编程层面上的简化和抽象。api
首先,咱们从只发送和接收一次消息的socket基础代码开始:数组
服务端:bash
package com.marklux.socket.base;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* The very basic socket server that only listen one single message.
*/
public class BaseSocketServer {
private ServerSocket server;
private Socket socket;
private int port;
private InputStream inputStream;
private static final int MAX_BUFFER_SIZE = 1024;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public BaseSocketServer(int port) {
this.port = port;
}
public void runServerSingle() throws IOException {
this.server = new ServerSocket(this.port);
System.out.println("base socket server started.");
// the code will block here till the request come.
this.socket = server.accept();
this.inputStream = this.socket.getInputStream();
byte[] readBytes = new byte[MAX_BUFFER_SIZE];
int msgLen;
StringBuilder stringBuilder = new StringBuilder();
while ((msgLen = inputStream.read(readBytes)) != -1) {
stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8"));
}
System.out.println("get message from client: " + stringBuilder);
inputStream.close();
socket.close();
server.close();
}
public static void main(String[] args) {
BaseSocketServer bs = new BaseSocketServer(9799);
try {
bs.runServerSingle();
}catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
客户端:服务器
package com.marklux.socket.base;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
/**
* The very basic socket client that only send one single message.
*/
public class BaseSocketClient {
private String serverHost;
private int serverPort;
private Socket socket;
private OutputStream outputStream;
public BaseSocketClient(String host, int port) {
this.serverHost = host;
this.serverPort = port;
}
public void connetServer() throws IOException {
this.socket = new Socket(this.serverHost, this.serverPort);
this.outputStream = socket.getOutputStream();
// why the output stream?
}
public void sendSingle(String message) throws IOException {
try {
this.outputStream.write(message.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
}
this.outputStream.close();
this.socket.close();
}
public static void main(String[] args) {
BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799);
try {
bc.connetServer();
bc.sendSingle("Hi from mark.");
}catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
先运行服务端,再运行客户端,就能够看到效果。网络
MAX_BUFFER_SIZE
的byte数组做为缓冲区,而后从输入流中取出字节放置到缓冲区,再从缓冲区中取出字节构建到字符串中去,这在输入流文件很大时很是有用,事实上,后面要讲到的NIO也是基于这种思路实现的。上面的例子只实现了一次单向的通讯,这显然有点浪费通道。socket链接支持全双工的双向通讯(底层是tcp),下面的例子中,服务端在收到客户端的消息后,将返回给客户端一个回执。多线程
而且咱们使用了一些java.io包装好的方法,来简化整个通讯的流程(由于消息长度不大,再也不使用缓冲区)。并发
服务端:
public void runServer() throws IOException {
this.serverSocket = new ServerSocket(port);
this.socket = serverSocket.accept();
this.inputStream = socket.getInputStream();
String message = new String(inputStream.readAllBytes(), "UTF-8");
System.out.println("received message: " + message);
this.socket.shutdownInput(); // 告诉客户端接收已经完毕,以后只能发送
// write the receipt.
this.outputStream = this.socket.getOutputStream();
String receipt = "We received your message: " + message;
outputStream.write(receipt.getBytes("UTF-8"));
this.outputStream.close();
this.socket.close();
}
复制代码
客户端:
public void sendMessage(String message) throws IOException {
this.socket = new Socket(host,port);
this.outputStream = socket.getOutputStream();
this.outputStream.write(message.getBytes("UTF-8"));
this.socket.shutdownOutput(); // 告诉服务器,全部的发送动做已经结束,以后只能接收
this.inputStream = socket.getInputStream();
String receipt = new String(inputStream.readAllBytes(), "UTF-8");
System.out.println("got receipt: " + receipt);
this.inputStream.close();
this.socket.close();
}
复制代码
注意这里咱们在服务端接受到消息以及客户端发送消息后,分别调用了shutdownInput()
和shutdownOutput()
而不是直接close对应的stream,这是由于在关闭任何一个stream,都会直接致使socket的关闭,也就没法进行后面回执的发送了。
可是注意,调用shutdownInput()
和shutdownOutput()
以后,对应的流也会被关闭,不能再次向socket发送/写入了。
刚才的两个例子中,每次打开流,都只能进行一次写入/读取操做,结束后对应流被关闭,就没法再次写入/读取了。
在这种状况下,若是要发送两次消息,就不得不创建两个socket,既耗资源又麻烦。其实咱们彻底能够不关闭对应的流,只要分次写入消息就能够了。
可是这样的话,咱们就必须面对另外一个问题:如何判断一次消息发送的结束呢?
最简单的办法是使用一些特殊的符号来标记一次发送完成,服务端只要读到对应的符号就能够完成一次读取,而后进行相关的处理操做。
下面的例子中咱们使用换行符\n
来标记一次发送的结束,服务端每接收到一个消息,就打印一次,而且使用了Scanner来简化操做:
服务端:
public void runServer() throws IOException {
this.server = new ServerSocket(this.port);
System.out.println("base socket server started.");
this.socket = server.accept();
// the code will block here till the request come.
this.inputStream = this.socket.getInputStream();
Scanner sc = new Scanner(this.inputStream);
while (sc.hasNextLine()) {
System.out.println("get info from client: " + sc.nextLine());
} // 循环接收并输出消息内容
this.inputStream.close();
socket.close();
}
复制代码
客户端:
public void connetServer() throws IOException {
this.socket = new Socket(this.serverHost, this.serverPort);
this.outputStream = socket.getOutputStream();
}
public void send(String message) throws IOException {
String sendMsg = message + "\n"; // we mark \n as a end of line.
try {
this.outputStream.write(sendMsg.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println(e.getMessage());
}
// this.outputStream.close();
// this.socket.shutdownOutput();
}
public static void main(String[] args) {
CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799);
try {
cc.connetServer();
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String line = sc.nextLine();
cc.send(line);
}
}catch (IOException e) {
e.printStackTrace();
}
}
复制代码
运行后效果是,客户端每输入一行文字按下回车后,服务端就会打印出对应的消息读取记录。
回到原点,咱们之因此很差定位消息何时结束,是由于咱们不可以肯定每次消息的长度。
那么其实能够先将消息的长度发送出去,当服务端知道消息的长度后,就可以完成一次消息的接收了。
总的来讲,发送一次消息变成了两个步骤
最后的问题就是,“发送消息的长度”这一步骤所发送的字节量必须是固定的,不然咱们仍然会陷入僵局。
通常来讲,咱们可使用固定的字节数来保存消息的长度,好比规定前2个字节就是消息的长度,不过这样咱们可以传送的消息最大长度也就被固定死了,以2个字节为例,咱们发送的消息最大长度不超过2^16个字节即64K。
若是你了解一些字符的编码,就会知道,其实咱们可使用变长的空间来储存消息的长度,好比:
第一个字节首位为0:即0XXXXXXX,表示长度就一个字节,最大128,表示128B
第一个字节首位为110,那么附带后面一个字节表示长度:即110XXXXX 10XXXXXX,最大2048,表示2K
第一个字节首位为1110,那么附带后面二个字节表示长度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K
依次类推
复制代码
固然这样实现起来会麻烦一些,所以下面的例子里咱们仍然使用固定的两个字节来记录消息的长度。
服务端:
public void runServer() throws IOException {
this.serverSocket = new ServerSocket(this.port);
this.socket = serverSocket.accept();
this.inputStream = socket.getInputStream();
byte[] bytes;
while (true) {
// 先读第一个字节
int first = inputStream.read();
if (first == -1) {
// 若是是-1,说明输入流已经被关闭了,也就不须要继续监听了
this.socket.close();
break;
}
// 读取第二个字节
int second = inputStream.read();
int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度
bytes = new byte[length]; // 构建指定长度的字节大小来储存消息便可
inputStream.read(bytes);
System.out.println("receive message: " + new String(bytes,"UTF-8"));
}
}
复制代码
客户端:
public void connetServer() throws IOException {
this.socket = new Socket(host,port);
this.outputStream = socket.getOutputStream();
}
public void sendMessage(String message) throws IOException {
// 首先要把message转换成bytes以便处理
byte[] bytes = message.getBytes("UTF-8");
// 接下来传输两个字节的长度,依然使用移位实现
int length = bytes.length;
this.outputStream.write(length >> 8); // write默认一次只传输一个字节
this.outputStream.write(length);
// 传输完长度后,再正式传送消息
this.outputStream.write(bytes);
}
public static void main(String[] args) {
LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);
try {
lc.connetServer();
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()) {
lc.sendMessage(sc.nextLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}
复制代码
在考虑服务端处理多链接以前,咱们先考虑使用多线程改造一下原有的一对一对话实例。
在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说咱们并无实现真正的互相对话,这主要是由于消息的发送和接收这两个动做并不能同时进行,所以咱们须要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另外一个则负责监听socket并将接受到的消息显示。
出于简单考虑,咱们直接让主线程负责键盘监听和消息发送,同时另外开启一个线程用于拉取消息并显示。
消息拉取线程 ListenThread.java
public class ListenThread implements Runnable {
private Socket socket;
private InputStream inputStream;
public ListenThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() throws RuntimeException{
try {
this.inputStream = socket.getInputStream();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
while (true) {
try {
int first = this.inputStream.read();
if (first == -1) {
// 输入流已经被关闭,无需继续读取
throw new RuntimeException("disconnected.");
}
int second = this.inputStream.read();
int msgLength = (first<<8) + second;
byte[] readBuffer = new byte[msgLength];
this.inputStream.read(readBuffer);
System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8"));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
}
复制代码
主线程,启动时由用户选择是做为server仍是client:
public class ChatSocket {
private String host;
private int port;
private Socket socket;
private ServerSocket serverSocket;
private OutputStream outputStream;
// 以服务端形式启动,建立会话
public void runAsServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
System.out.println("[log] server started at port " + port);
// 等待客户端的加入
this.socket = serverSocket.accept();
System.out.println("[log] successful connected with " + socket.getInetAddress());
// 启动监听线程
Thread listenThread = new Thread(new ListenThread(this.socket));
listenThread.start();
waitAndSend();
}
// 以客户端形式启动,加入会话
public void runAsClient(String host, int port) throws IOException {
this.socket = new Socket(host, port);
System.out.println("[log] successful connected to server " + socket.getInetAddress());
Thread listenThread = new Thread(new ListenThread(this.socket));
listenThread.start();
waitAndSend();
}
public void waitAndSend() throws IOException {
this.outputStream = this.socket.getOutputStream();
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()) {
this.sendMessage(sc.nextLine());
}
}
public void sendMessage(String message) throws IOException {
byte[] msgBytes = message.getBytes("UTF-8");
int length = msgBytes.length;
outputStream.write(length>>8);
outputStream.write(length);
outputStream.write(msgBytes);
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
ChatSocket chatSocket = new ChatSocket();
System.out.println("select connect type: 1 for server and 2 for client");
int type = Integer.parseInt(scanner.nextLine().toString());
if (type == 1) {
System.out.print("input server port: ");
int port = scanner.nextInt();
try {
chatSocket.runAsServer(port);
} catch (IOException e) {
e.printStackTrace();
}
}else if (type == 2) {
System.out.print("input server host: ");
String host = scanner.nextLine();
System.out.print("input server port: ");
int port = scanner.nextInt();
try {
chatSocket.runAsClient(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
做为服务端,若是一次只跟一个客户端创建socket链接,未免显得太过浪费资源,所以咱们彻底可让服务端和多个客户端创建多个socket。
那么既然要处理多个链接,就不得不面对并发问题了(固然,你也能够写循环轮流处理)。咱们可使用多线程来处理并发,不过线程的建立和销毁都会消耗大量的资源和时间,因此最好一步到位,用一个线程池来实现。
下面给出一个示范性质的服务端代码:
public class SocketServer {
public static void main(String args[]) throws Exception {
// 监听指定的端口
int port = 55533;
ServerSocket server = new ServerSocket(port);
// server将一直等待链接的到来
System.out.println("server将一直等待链接的到来");
//若是使用多线程,那就须要线程池,防止并发太高时建立过多线程耗尽资源
ExecutorService threadPool = Executors.newFixedThreadPool(100);
while (true) {
Socket socket = server.accept();
Runnable runnable=()->{
try {
// 创建好链接后,从socket中获取输入流,并创建缓冲区进行读取
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int len;
StringBuilder sb = new StringBuilder();
while ((len = inputStream.read(bytes)) != -1) {
// 注意指定编码格式,发送方和接收方必定要统一,建议使用UTF-8
sb.append(new String(bytes, 0, len, "UTF-8"));
}
System.out.println("get message from client: " + sb);
inputStream.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
};
threadPool.submit(runnable);
}
}
}
复制代码
我想你不难发现一个问题,那就是当socket链接成功创建后,若是中途发生异常致使其中一方断开链接,此时另外一方是没法发现的,只有在再次尝试发送/接收消息才会由于抛出异常而退出。
简单的说,就是咱们维持的socket链接,是一个长链接,但咱们没有保证它的时效性,上一秒它可能仍是能够用的,可是下一秒就不必定了。
保证链接随时可用的最多见方法就是定时发送心跳包,来检测链接是否正常。这对于实时性要求很高的服务而言,仍是很是重要的(好比消息推送)。
大致的方案以下:
使用心跳包必然会增长带宽和性能的负担,对于普通的应用咱们其实并无必要使用这种方案,若是消息发送时抛出了链接异常,直接尝试从新链接就行了。
跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。
总的来讲,链接是否要保活,如何保活,须要根据具体的业务场景灵活地思考和定制。