手把手教你写 Socket 长链接

本文由玉刚说写做平台[1]提供写做赞助java

原做者:水晶虾饺[2]git

版权声明:本文版权归微信公众号 玉刚说 全部,未经许可,不得以任何形式转载github

本篇咱们先简单了解一下 TCP/IP,而后经过实现一个 echo 服务器来学习 Java 的 Socket API。最后咱们聊聊偏高级一点点的 socket 长链接和协议设计。面试

TCP/IP 协议简介

IP

首先咱们看 IP(Internet Protocol)协议。IP 协议提供了主机和主机间的通讯。shell

为了完成不一样主机的通讯,咱们须要某种方式来惟一标识一台主机,这个标识,就是著名的IP地址。经过IP地址,IP 协议就可以帮咱们把一个数据包发送给对方。数据库

TCP

前面咱们说过,IP 协议提供了主机和主机间的通讯。TCP 协议在 IP 协议提供的主机间通讯功能的基础上,完成这两个主机上进程对进程的通讯。编程

有了 IP,不一样主机就可以交换数据。可是,计算机收到数据后,并不知道这个数据属于哪一个进程(简单讲,进程就是一个正在运行的应用程序)。TCP 的做用就在于,让咱们可以知道这个数据属于哪一个进程,从而完成进程间的通讯。bash

为了标识数据属于哪一个进程,咱们给须要进行 TCP 通讯的进程分配一个惟一的数字来标识它。这个数字,就是咱们常说的端口号服务器

TCP 的全称是 Transmission Control Protocol,你们对它说得最多的,大概就是面向链接的特性了。之因此说它是有链接的,是说在进行通讯前,通讯双方须要先通过一个三次握手的过程。三次握手完成后,链接便创建了。这时候咱们才能够开始发送/接收数据。(与之相对的是 UDP,不须要通过握手,就能够直接发送数据)。微信

下面咱们简单了解一下三次握手的过程。

tcp-three-way-handshake

  1. 首先,客户向服务端发送一个 SYN,假设此时 sequence number 为 x。这个 x 是由操做系统根据必定的规则生成的,不妨认为它是一个随机数。
  2. 服务端收到 SYN 后,会向客户端再发送一个 SYN,此时服务器的 seq number = y。与此同时,会 ACK x+1,告诉客户端“已经收到了 SYN,能够发送数据了”。
  3. 客户端收到服务器的 SYN 后,回复一个 ACK y+1,这个 ACK 则是告诉服务器,SYN 已经收到,服务器能够发送数据了。

通过这 3 步,TCP 链接就创建了。这里须要注意的有三点:

  1. 链接是由客户端主动发起的
  2. 在第 3 步客户端向服务器回复 ACK 的时候,TCP 协议是容许咱们携带数据的。之因此作不到,是 API 的限制致使的。
  3. TCP 协议还容许 “四次握手” 的发生,一样的,因为 API 的限制,这个极端的状况并不会发生。

TCP/IP 相关的理论知识咱们就先了解到这里。关于 TCP,还有诸如可靠性、流量控制、拥塞控制等很是有趣的特性,强烈推荐读者看一看 Richard 的名著《TCP/IP 详解 - 卷1》(注意,是第1版,不是第2版)。

下面咱们看一些偏实战的东西。

Socket 基本用法

Socket 是 TCP 层的封装,经过 socket,咱们就能进行 TCP 通讯。

在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户链接的 ServerSocket 和用于通讯的 Socket。使用 socket 的步骤以下:

  1. 建立 ServerSocket 并监听客户链接
  2. 使用 Socket 链接服务端
  3. 经过 Socket 获取输入输出流进行通讯

下面,咱们经过实现一个简单的 echo 服务来学习 socket 的使用。所谓的 echo 服务,就是客户端向服务端写入任意数据,服务器都将数据原封不动地写回给客户端。

1. 建立 ServerSocket 并监听客户链接

public class EchoServer {

    private final ServerSocket mServerSocket;

    public EchoServer(int port) throws IOException {
        // 1. 建立一个 ServerSocket 并监听端口 port
        mServerSocket = new ServerSocket(port);
    }

    public void run() throws IOException {
        // 2. 开始接受客户链接
        Socket client = mServerSocket.accept();
        handleClient(client);
    }

    private void handleClient(Socket socket) {
        // 3. 使用 socket 进行通讯 ...
    }


    public static void main(String[] argv) {
        try {
            EchoServer server = new EchoServer(9877);
            server.run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

2. 使用 Socket 链接服务端

public class EchoClient {

    private final Socket mSocket;

    public EchoClient(String host, int port) throws IOException {
        // 建立 socket 并链接服务器
        mSocket = new Socket(host, port);
    }

    public void run() {
        // 和服务端进行通讯
    }


    public static void main(String[] argv) {
        try {
            // 因为服务端运行在同一主机,这里咱们使用 localhost
            EchoClient client = new EchoClient("localhost", 9877);
            client.run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

3. 经过 socket.getInputStream()/getOutputStream() 获取输入/输出流进行通讯

首先,咱们来实现服务端:

public class EchoServer {
    // ...

    private void handleClient(Socket socket) throws IOException {
        InputStream in = socket.getInputStream();
        OutputStream out = socket.getOutputStream();
        byte[] buffer = new byte[1024];
        int n;
        while ((n = in.read(buffer)) > 0) {
            out.write(buffer, 0, n);
        }
    }
}
复制代码

能够看到,服务端的实现其实很简单,咱们不停地读取输入数据,而后写回给客户端。

下面咱们看看客户端。

public class EchoClient {
    // ...

    public void run() throws IOException {
        Thread readerThread = new Thread(this::readResponse);
        readerThread.start();

        OutputStream out = mSocket.getOutputStream();
        byte[] buffer = new byte[1024];
        int n;
        while ((n = System.in.read(buffer)) > 0) {
            out.write(buffer, 0, n);
        }
    }

    private void readResponse() {
        try {
            InputStream in = mSocket.getInputStream();
            byte[] buffer = new byte[1024];
            int n;
            while ((n = in.read(buffer)) > 0) {
                System.out.write(buffer, 0, n);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

客户端会稍微复杂一点点,在读取用户输入的同时,咱们又想读取服务器的响应。因此,这里建立了一个线程来读服务器的响应。

不熟悉 lambda 的读者,能够把 Thread readerThread = new Thread(this::readResponse) 换成下面这个代码:

Thread readerThread = new Thread(new Runnable() {
    @Override
    public void run() {
        readResponse();
    }
});
复制代码

打开两个 terminal 分别执行以下命令:

$ javac EchoServer.java
$ java EchoServer
复制代码
$ javac EchoClient.java
$ java EchoClient
hello Server
hello Server
foo
foo
复制代码

在客户端,咱们会看到,输入的全部字符都打印了出来。

最后须要注意的有几点:

  1. 在上面的代码中,咱们全部的异常都没有处理。实际应用中,在发生异常时,须要关闭 socket,并根据实际业务作一些错误处理工做
  2. 在客户端,咱们没有中止 readThread。实际应用中,咱们能够经过关闭 socket 来让线程从阻塞读中返回。推荐读者阅读《Java并发编程实战》
  3. 咱们的服务端只处理了一个客户链接。若是须要同时处理多个客户端,能够建立线程来处理请求。这个做为练习留给读者来彻底。

Socket、ServerSocket 傻傻分不清楚

在进入这一节的主题前,读者不妨先考虑一个问题:在上一节的实例中,咱们运行 echo 服务后,在客户端链接成功时,一个有多少个 socket 存在?

答案是 3 个 socket。客户端一个,服务端有两个。跟这个问题的答案直接关联的是本节的主题——SocketServerSocket 的区别是什么。

眼尖的读者,可能会注意到在上一节我是这样描述他们的:

在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户链接的 ServerSocket 和用于通讯的 Socket

注意,我只说 ServerSocket 是用于监听客户链接,而没有说它也能够用来通讯。下面咱们来详细了解一下他们的区别。

注:如下描述使用的是 UNIX/Linux 系统的 API

首先,咱们建立 ServerSocket 后,内核会建立一个 socket。这个 socket 既能够拿来监听客户链接,也能够链接远端的服务。因为 ServerSocket 是用来监听客户链接的,紧接着它就会对内核建立的这个 socket 调用 listen 函数。这样一来,这个 socket 就成了所谓的 listening socket,它开始监听客户的链接。

接下来,咱们的客户端建立一个 Socket,一样的,内核也建立一个 socket 实例。内核建立的这个 socket 跟 ServerSocket 一开始建立的那个没有什么区别。不一样的是,接下来 Socket 会对它执行 connect,发起对服务端的链接。前面咱们说过,socket API 实际上是 TCP 层的封装,因此 connect 后,内核会发送一个 SYN 给服务端。

如今,咱们切换角色到服务端。服务端的主机在收到这个 SYN 后,会建立一个新的 socket,这个新建立的 socket 跟客户端继续执行三次握手过程。

三次握手完成后,咱们执行的 serverSocket.accept() 会返回一个 Socket 实例,这个 socket 就是上一步内核自动帮咱们建立的。

因此说,在一个客户端链接的状况下,其实有 3 个 socket。

关于内核自动建立的这个 socket,还有一个颇有意思的地方。它的端口号跟 ServerSocket 是一毛同样的。咦!!不是说,一个端口只能绑定一个 socket 吗?其实这个说法并不够准确。

前面我说的TCP 经过端口号来区分数据属于哪一个进程的说法,在 socket 的实现里须要改一改。Socket 并不只仅使用端口号来区别不一样的 socket 实例,而是使用 <peer addr:peer port, local addr:local port> 这个四元组。

在上面的例子中,咱们的 ServerSocket 长这样:<*:*, *:9877>。意思是,能够接受任何的客户端,和本地任何 IP。

accept 返回的 Socket 则是这样: <127.0.0.1:xxxx, 127.0.0.1:9877>,其中xxxx 是客户端的端口号。

若是数据是发送给一个已链接的 socket,内核会找到一个彻底匹配的实例,因此数据准确发送给了对端。

若是是客户端要发起链接,这时候只有 <*:*, *:9877> 会匹配成功,因此 SYN 也准确发送给了监听套接字。

Socket/ServerSocket 的区别咱们就讲到这里。若是读者以为不过瘾,能够参考《TCP/IP 详解》卷一、卷2。

Socket 长链接的实现

背景知识

Socket 长链接,指的是在客户和服务端之间保持一个 socket 链接长时间不断开。

比较熟悉 Socket 的读者,可能知道有这样一个 API:

socket.setKeepAlive(true);
复制代码

嗯……keep alive,“保持活着”,这个应该就是让 TCP 不断开的意思。那么,咱们要实现一个 socket 的长链接,只须要这一个调用便可。

遗憾的是,生活并不老是那么美好。对于 4.4BSD 的实现来讲,Socket 的这个 keep alive 选项若是打开而且两个小时内没有通讯,那么底层会发一个心跳,看看对方是否是还活着。

注意,两个小时才会发一次。也就是说,在没有实际数据通讯的时候,我把网线拔了,你的应用程序要通过两个小时才会知道。

在说明若是实现长链接前,咱们先来理一理咱们面临的问题。假定如今有一对已经链接的 socket,在如下状况发生时候,socket 将再也不可用:

  1. 某一端关闭是 socket(这不是废话吗)。主动关闭的一方会发送 FIN,通知对方要关闭 TCP 链接。在这种状况下,另外一端若是去读 socket,将会读到 EoF(End of File)。因而咱们知道对方关闭了 socket。
  2. 应用程序奔溃。此时 socket 会由内核关闭,结果跟状况1同样。
  3. 系统奔溃。这时候系统是来不及发送 FIN 的,由于它已经跪了。此时对方没法得知这一状况。对方在尝试读取数据时,最后会返回 read time out。若是写数据,则是 host unreachable 之类的错误。
  4. 电缆被挖断、网线被拔。跟状况3差很少,若是没有对 socket 进行读写,两边都不知道发生了事故。跟状况3不一样的是,若是咱们把网线接回去,socket 依旧能够正常使用。

在上面的几种情形中,有一个共同点就是,只要去读、写 socket,只要 socket 链接不正常,咱们就可以知道。基于这一点,要实现一个 socket 长链接,咱们须要作的就是不断地给对方写数据,而后读取对方的数据,也就是所谓的心跳。只要心还在跳,socket 就是活的。写数据的间隔,须要根据实际的应用需求来决定。

心跳包不是实际的业务数据,根据通讯协议的不一样,须要作不一样的处理。

比方说,咱们使用 JSON 进行通讯,那么,咱们能够加一个 type 字段,表面这个 JSON 是心跳仍是业务数据。

{
    "type": 0,  // 0 表示心跳

    // ...
}
复制代码

使用二进制协议的状况相似。要求就是,咱们可以区别一个数据包是心跳仍是真实数据。这样,咱们便实现了一个 socket 长链接。

实现示例

这一小节咱们一块儿来实现一个带长链接的 Android echo 客户端。

首先了接口部分:

public final class LongLiveSocket {

    /** * 错误回调 */
    public interface ErrorCallback {
        /** * 若是须要重连,返回 true */
        boolean onError();
    }

    /** * 读数据回调 */
    public interface DataCallback {
        void onData(byte[] data, int offset, int len);
    }

    /** * 写数据回调 */
    public interface WritingCallback {
        void onSuccess();
        void onFail(byte[] data, int offset, int len);
    }


    public LongLiveSocket(String host, int port, DataCallback dataCallback, ErrorCallback errorCallback) {
    }

    public void write(byte[] data, WritingCallback callback) {
    }

    public void write(byte[] data, int offset, int len, WritingCallback callback) {
    }

    public void close() {
    }
}
复制代码

咱们这个支持长链接的类就叫 LongLiveSocket 好了。若是在 socket 断开后须要重连,只须要在对应的接口里面返回 true 便可(在真实场景里,咱们还须要让客户设置重连的等待时间,还有读写、链接的 timeout等。为了简单,这里就直接不支持了。

另外须要注意的一点是,若是要作一个完整的库,须要同时提供阻塞式和回调式API。一样因为篇幅缘由,这里直接省掉了。

首先咱们看看 write() 方法:

public void write(byte[] data, int offset, int len, WritingCallback callback) {
    mWriterHandler.post(() -> {
        Socket socket = getSocket();
        if (socket == null) {
            // initSocket 失败而客户说不须要重连,但客户又叫咱们给他发送数据
            throw new IllegalStateException("Socket not initialized");
        }
        try {
            OutputStream outputStream = socket.getOutputStream();
            DataOutputStream out = new DataOutputStream(outputStream);
            out.writeInt(len);
            out.write(data, offset, len);
            callback.onSuccess();
        } catch (IOException e) {
            Log.e(TAG, "write: ", e);
            // 关闭 socket,避免资源泄露
            closeSocket();
            // 这里咱们把发生失败的数据返回给客户端,这样客户能够更方便地从新发送数据
            callback.onFail(data, offset, len);
            if (!closed() && mErrorCallback.onError()) {
                // 重连
                initSocket();
            }
        }
    });
}
复制代码

因为咱们须要定时写心跳,这里使用一个 HandlerThread 来处理写请求。通讯使用的协议,只是简单地在用户数据前加一个 len 字段,用于肯定消息的长度。

下面咱们看心跳的发送:

private final Runnable mHeartBeatTask = new Runnable() {
    private byte[] mHeartBeat = new byte[0];

    @Override
    public void run() {
        // 咱们使用长度为 0 的数据做为 heart beat
        write(mHeartBeat, new WritingCallback() {
            @Override
            public void onSuccess() {
                // 每隔 HEART_BEAT_INTERVAL_MILLIS 发送一次
                mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
                mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
            }

            @Override
            public void onFail(byte[] data, int offset, int len) {
                // nop
                // write() 方法会处理失败
            }
        });
    }
};

private final Runnable mHeartBeatTimeoutTask = () -> {
    Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
    closeSocket();
};
复制代码

发送心跳使用咱们上面实现的 write() 方法。在发送成功后,post delay 一个 timeout task,若是到期后还没收到服务器的响应,咱们将认为 socket 出现异常,这里直接关闭 socket。最后是对心跳的处理:

int nbyte = in.readInt();
if (nbyte == 0) {
    Log.i(TAG, "readResponse: heart beat received");
    mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);
}
复制代码

因为用户数据的长度老是会大于 1,这里咱们就使用 len == 0 的数据做为心跳。收到心跳后,移除 mHeartBeatTimeoutTask

剩余代码跟咱们的主题没有太大关系,读者在这里[3]能够找到完整的代码或者本身完成这个例子。

最后须要说明的是,若是想节省资源,在有客户发送数据的时候能够省略 heart beat。

咱们对读出错时候的处理,可能也存在一些争议。读出错后,咱们只是关闭了 socket。socket 须要等到下一次写动做发生时,才会从新链接。实际应用中,若是这是一个问题,在读出错后能够直接开始重连。这种状况下,还须要一些额外的同步,避免重复建立 socket。heart beat timeout 的状况相似。

跟 TCP/IP 学协议设计

若是仅仅是为了使用是 socket,咱们大能够不去理会协议的细节。之因此推荐你们去看一看《TCP/IP 详解》,是由于它们有太多值得学习的地方。不少咱们工做中遇到的问题,均可以在这里找到答案。

如下每个小节的标题都是一个小问题,建议读者独立思考一下,再继续往下看。若是你发现你的答案比个人更好,请必定发送邮件到 ljtong64 AT gmail DOT com 告诉我。

协议版本如何升级?

有这么一句流行的话:这个世界惟一不变的,就是变化。当咱们对协议版本进行升级的时候,正确识别不一样版本的协议对软件的兼容很是重要。那么,咱们如何设计协议,才可以为未来的版本升级作准备呢?

答案能够在 IP 协议找到。

IP 协议的第一个字段叫 version,目前使用的是 4 或 6,分别表示 IPv4 和 IPv6。因为这个字段在协议的开头,接收端收到数据后,只要根据第一个字段的值就可以判断这个数据包是 IPv4 仍是 IPv6。

再强调一下,这个字段在两个版本的IP协议都位于第一个字段,为了作兼容处理,对应的这个字段必须位于同一位置。文本协议(如,JSON、HTML)的状况相似。

如何发送不定长数据的数据包

举个例子,咱们用微信发送一条消息。这条消息的长度是不肯定的,而且每条消息都有它的边界。咱们如何来处理这个边界呢?

仍是同样,看看 IP。IP 的头部有个 header length 和 data length 两个字段。经过添加一个 len 域,咱们就可以把数据根据应用逻辑分开。

跟这个相对的,还有另外一个方案,那就是在数据的末尾放置终止符。比方说,想 C 语言的字符串那样,咱们在每一个数据的末尾放一个 \0 做为终止符,用以标识一条消息的尾部。这个方法带来的问题是,用户的数据也可能存在 \0。此时,咱们就须要对用户的数据进行转义。比方说,把用户数据的全部 \0 都变成 \0\0。读消息的过程总,若是遇到 \0\0,那它就表明 \0,若是只有一个 \0,那就是消息尾部。

使用 len 字段的好处是,咱们不须要对数据进行转义。读取数据的时候,只要根据 len 字段,一次性把数据都读进来就好,效率会更高一些。

终止符的方案虽然要求咱们对数据进行扫描,可是若是咱们可能从任意地方开始读取数据,就须要这个终止符来肯定哪里才是消息的开头了。

固然,这两个方法不是互斥的,能够一块儿使用。

上传多个文件,只有全部文件都上传成功时才算成功

如今咱们有一个需求,须要一次上传多个文件到服务器,只有在全部文件都上传成功的状况下,才算成功。咱们该如何来实现呢?

IP 在数据报过大的时候,会把一个数据报拆分红多个,并设置一个 MF (more fragments)位,表示这个包只是被拆分后的数据的一部分。

好,咱们也学一学 IP。这里,咱们能够给每一个文件从 0 开始编号。上传文件的同时,也携带这个编号,并额外附带一个 MF 标志。除了编号最大的文件,全部文件的 MF 标志都置位。由于 MF 没有置位的是最后一个文件,服务器就能够根据这个得出总共有多少个文件。

另外一种不使用 MF 标志的方法是,咱们在上传文件前,就告诉服务器总共有多少个文件。

若是读者对数据库比较熟悉,学数据库用事务来处理,也是能够的。这里就不展开讨论了。

如何保证数据的有序性

这里讲一个我曾经遇到过的面试题。如今有一个任务队列,多个工做线程从中取出任务并执行,执行结果放到一个结果队列中。先要求,放入结果队列的时候,顺序顺序须要跟从工做队列取出时的同样(也就是说,先取出的任务,执行结果须要先放入结果队列)。

咱们看看 TCP/IP 是怎么处理的。IP 在发送数据的时候,不一样数据报到达对端的时间是不肯定的,后面发送的数据有可能较先到达。TCP 为了解决这个问题,给所发送数据的每一个字节都赋了一个序列号,经过这个序列号,TCP 就可以把数据按原顺序从新组装。

同样,咱们也给每一个任务赋一个值,根据进入工做队列的顺序依次递增。工做线程完成任务后,在将结果放入结果队列前,先检查要放入对象的写一个序列号是否是跟本身的任务相同,若是不一样,这个结果就不能放进去。此时,最简单的作法是等待,知道下一个能够放入队列的结果是本身所执行的那一个。可是,这个线程就没办法继续处理任务了。

更好的方法是,咱们维护多一个结果队列的缓冲,这个缓冲里面的数据按序列号从小到大排序。工做线程要将结果放入,有两种可能:

  1. 刚刚完成的任务恰好是下一个,将这个结果放入队列。而后从缓冲的头部开始,将全部能够放入结果队列的数据都放进去。
  2. 所完成的任务不能放入结果队列,这个时候就插入结果队列。而后,跟上一种状况同样,须要检查缓冲。

若是测试代表,这个结果缓冲的数据很少,那么使用普通的链表就能够。若是数据比较多,可使用一个最小堆。

如何保证对方收到了消息

咱们说,TCP 提供了可靠的传输。这样不就可以保证对方收到消息了吗?

很遗憾,其实不能。在咱们往 socket 写入的数据,只要对端的内核收到后,就会返回 ACK,此时,socket 就认为数据已经写入成功。然而要注意的是,这里只是对方所运行的系统的内核成功收到了数据,并不表示应用程序已经成功处理了数据。

解决办法仍是同样,咱们学 TCP,添加一个应用层的 APP ACK。应用接收到消息并处理成功后,发送一个 APP ACK 给对方。

有了 APP ACK,咱们须要处理的另外一个问题是,若是对方真的没有收到,须要怎么作?

TCP 发送数据的时候,消息同样可能丢失。TCP 发送数据后,若是长时间没有收到对方的 ACK,就假设数据已经丢失,并从新发送。

咱们也同样,若是长时间没有收到 APP ACK,就假设数据丢失,从新发送一个。

附:

[1] renyugang.io/post/75

[2] jekton.github.io

[3] github.com/Jekton/Echo

欢迎关注微信公众号,接收第一手技术干货
相关文章
相关标签/搜索