雨露均沾的OkHttp—WebSocket长链接的使用&源码解析

前言

最近老板又来新需求了,要作一个物联网相关的app,其中有个需求是客户端须要收发服务器不按期发出的消息。
心里OS:
🤔 这咋整呢?经过接口轮询?定时访问接口,有数据就更新?
🤔 不行不行,这样浪费资源了,还耗电,会致使不少请求都是无效的网络操做。
🤔 那就长链接呗?WebSocket协议好像不错,经过握手创建长链接后,能够随时收发服务器的消息。那就它了!
🤔 怎么集成呢?正好前段时间复习OkHttp源码的时候发现了它是支持Websocket协议的,那就用它试试吧!(戏好多,演不下去了🤮)java

开淦!git

WebSocket介绍

先简单介绍下WebSocket
咱们都知道Http是处于应用层的一个通讯协议,可是只支持单向主动通讯,作不到服务器主动向客户端推送消息。并且Http是无状态的,即每次通讯都没有关联性,致使跟服务器关系不紧密。github

为了解决和服务器长时间通讯的痛点呢,HTML5规范引出了WebSocket协议(知道这名字咋来的吧,人家HTML5规范引出的,随爸姓),是一种创建在TCP协议基础上的全双工通讯的协议。他跟Http同属于应用层协议,下层仍是须要经过TCP创建链接。web

可是,WebSocketTCP链接创建后,还要经过Http进行一次握手,也就是经过Http发送一条GET请求消息给服务器,告诉服务器我要创建WebSocket链接了,你准备好哦,具体作法就是在头部信息中添加相关参数。而后服务器响应我知道了,而且将链接协议改为WebSocket,开始创建长链接。面试

这里贴上请求头和响应头信息,从网上找了一张图:服务器

3851594110877_.pic.jpg

简单说明下参数:websocket

  • URL通常是以ws或者wss开头,ws对应Websocket协议,wss对应在TLS之上的WebSocket。相似于HttpHttps的关系。
  • 请求方法为GET方法。
  • Connection:Upgrade,表示客户端要链接升级,不用Http协议。
  • Upgrade:websocket, 表示客户端要升级创建Websocket链接。
  • Sec-Websocket-Key:key, 这个key是随机生成的,服务器会经过这个参数验证该请求是否有效。
  • Sec-WebSocket-Version:13, websocket使用的协议,通常就是13。
  • Sec-webSocket-Extension:permessage-deflate,客户端指定的一些扩展协议,好比这里permessage-deflate就是WebSocket的一种压缩协议。
  • 响应码101,表示响应协议升级,后续的数据交互都按照Upgradet指定的WebSocket协议来。

OkHttp实现

添加OkHttp依赖

implementation("com.squareup.okhttp3:okhttp:4.7.2")

实现代码

首先是初始化OkHttpClientWebSocket实例:网络

/**
     * 初始化WebSocket
     */
    public void init() {
        mWbSocketUrl = "ws://echo.websocket.org";
        mClient = new OkHttpClient.Builder()
                .pingInterval(10, TimeUnit.SECONDS)
                .build();
        Request request = new Request.Builder()
                .url(mWbSocketUrl)
                .build();
        mWebSocket = mClient.newWebSocket(request, new WsListener());
    }

这里主要是配置了OkHttp的一些参数,以及WebSocket的链接地址。其中newWebSocket方法就是进行WebSocket的初始化和链接。app

这里要注意的点是pingInterval方法的配置,这个方法主要是用来设置WebSocket链接的保活。
相信作过长链接的同窗都知道,一个长链接通常要隔几秒发送一条消息告诉服务器我在线,而服务器也会回复一个消息表示收到了,这样就确认了链接正常,客户端和服务器端都在线。less

若是服务器没有按时收到这个消息那么服务器可能就会主动关闭这个链接,节约资源。
客户端没有正常收到这个返回的消息,也会作一些相似重连的操做,因此这个保活消息很是重要。

咱们称这个消息叫做心跳包,通常用PING,PONG表示,像乒乓球同样,一来一回。
因此这里的pingInterval就是设置心跳包发送的间隔时间,设置了这个方法以后,OkHttp就会自动帮咱们发送心跳包事件,也就是ping包。当间隔时间到了,没有收到pong包的话,监听事件中的onFailure方法就会被调用,此时咱们就能够进行重连。

可是因为实际业务需求不同,以及okhttp中心跳包事件给予咱们权限较少,因此咱们也能够本身完成心跳包事件,即在WebSocket链接成功以后,开始定时发送ping包,在下一次发送ping包以前检查上一个pong包是否收到,若是没收到,就视为异常,开始重连。感兴趣的同窗能够看看文末的相关源码。

创建链接后,咱们就能够正常发送和读取消息了,也就是在上文WsListener监听事件中表现:

//监听事件,用于收消息,监听链接的状态
    class WsListener extends WebSocketListener {
        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosed(webSocket, code, reason);
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosing(webSocket, code, reason);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
            super.onFailure(webSocket, t, response);
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            super.onMessage(webSocket, text);
            Log.e(TAG, "客户端收到消息:" + text);
            onWSDataChanged(DATE_NORMAL, text);
           //测试发消息
            webSocket.send("我是客户端,你好啊");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
            super.onMessage(webSocket, bytes);
        }

        @Override
        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            super.onOpen(webSocket, response);
            Log.e(TAG,"链接成功!");
        }
    }
    
    
    //发送String消息
    public void send(final String message) {
        if (mWebSocket != null) {
            mWebSocket.send(message);
        }
    }
    
    /**
     * 发送byte消息
     * @param message
     */
    public void send(final ByteString message) {
        if (mWebSocket != null) {
            mWebSocket.send(message);
        }
    }    

    //主动断开链接
    public void disconnect(int code, String reason) {
        if (mWebSocket != null)
            mWebSocket.close(code, reason);
    }

这里要注意,回调的方法都是在子线程回调的,若是须要更新UI,须要切换到主线程。

基本操做就这么多,仍是很简单的吧,初始化Websocket——链接——链接成功——收发消息。

其中WebSocket类是一个操做接口,主要提供了如下几个方法

  • send(text: String) 发送一个String类型的消息
  • send(bytes: ByteString) 发送一个二进制类型的消息
  • close(code: Int, reason: String?) 关闭WebSocket链接

若是有同窗想测试下WebSocket的功能可是又没有实际的服务器,怎么办呢?
其实OkHttp官方有一个MockWebSocket服务,能够用来模拟服务端,下面咱们一块儿试一下:

模拟服务器

首先集成MockWebSocket服务库:

implementation 'com.squareup.okhttp3:mockwebserver:4.7.2'

而后就能够新建MockWebServer,并加入MockResponse做为接收消息的响应。

MockWebServer mMockWebServer = new MockWebServer();
        MockResponse response = new MockResponse()
                .withWebSocketUpgrade(new WebSocketListener() {
                    @Override
                    public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
                        super.onOpen(webSocket, response);
                        //有客户端链接时回调
                        Log.e(TAG, "服务器收到客户端链接成功回调:");
                        mWebSocket = webSocket;
                        mWebSocket.send("我是服务器,你好呀");
                    }

                    @Override
                    public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
                        super.onMessage(webSocket, text);

                        Log.e(TAG, "服务器收到消息:" + text);
                    }

                    @Override
                    public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
                        super.onClosed(webSocket, code, reason);
                        Log.e(TAG, "onClosed:");
                    }
                });

        mMockWebServer.enqueue(response);

这里服务器端在收到客户端链接成功消息后,给客户端发送了一条消息。
要注意的是这段代码要在子线程执行,由于主线程不能进行网络操做。

而后就能够去初始化Websocket客户端了:

//获取链接url,初始化websocket客户端
        String websocketUrl = "ws://" + mMockWebServer.getHostName() + ":" + mMockWebServer.getPort() + "/";
        WSManager.getInstance().init(websocketUrl);

ok,运行项目

//运行结果
    E/jimu: mWbSocketUrl=ws://localhost:38355/
    E/jimu: 服务器收到客户端链接成功回调:
    E/jimu: 链接成功!
    E/jimu: 客户端收到消息:我是服务器,你好呀
    E/jimu: 服务器收到消息:我是客户端,你好啊

相关的WebSocket管理类和模拟服务器类我也上传到github了,有须要的同窗能够文末自取。

源码解析

WebSocket整个流程无非三个功能:链接,接收消息,发送消息。下面咱们就从这三个方面分析下具体是怎么实现的。

链接

经过上面的代码咱们得知,WebSocket链接是经过newWebSocket方法。直接点进去看这个方法:

override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
    val webSocket = RealWebSocket(
        taskRunner = TaskRunner.INSTANCE,
        originalRequest = request,
        listener = listener,
        random = Random(),
        pingIntervalMillis = pingIntervalMillis.toLong(),
        extensions = null, // Always null for clients.
        minimumDeflateSize = minWebSocketMessageToCompress
    )
    webSocket.connect(this)
    return webSocket
  }

这里作了两件事:

  • 初始化RealWebSocket,主要是设置了一些参数(好比pingIntervalMillis心跳包时间间隔,还有监听事件之类的)
  • connect 方法进行WebSocket链接

继续查看connect方法:

connect(WebSocket链接握手)

fun connect(client: OkHttpClient) {
    //***
    val webSocketClient = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build()
    val request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Extensions", "permessage-deflate")
        .build()
    call = RealCall(webSocketClient, request, forWebSocket = true)
    call!!.enqueue(object : Callback {
      override fun onResponse(call: Call, response: Response) {
        
        //获得数据流
        val streams: Streams
        try {
          checkUpgradeSuccess(response, exchange)
          streams = exchange!!.newWebSocketStreams()
        } 
        
        //***
        // Process all web socket messages.
        try {
          val name = "$okHttpName WebSocket ${request.url.redact()}"
          initReaderAndWriter(name, streams)
          listener.onOpen(this@RealWebSocket, response)
          loopReader()
        } catch (e: Exception) {
          failWebSocket(e, null)
        }
      }
    })
  }

上一篇使用篇文章中说过,Websocket链接须要一次Http协议的握手,而后才能把协议升级成WebSocket。因此这段代码就体现出这个功能了。

首先就new了一个用来进行Http链接的request,其中Header的参数就表示我要进行WebSocket链接了,参数解析以下:

  • Connection:Upgrade,表示客户端要链接升级
  • Upgrade:websocket, 表示客户端要升级创建Websocket链接
  • Sec-Websocket-Key:key, 这个key是随机生成的,服务器会经过这个参数验证该请求是否有效
  • Sec-WebSocket-Version:13, websocket使用的版本,通常就是13
  • Sec-webSocket-Extension:permessage-deflate,客户端指定的一些扩展协议,好比这里permessage-deflate就是WebSocket的一种压缩协议。

Header设置好以后,就调用了callenqueue方法,这个方法你们应该都很熟悉吧,OkHttp里面对于Http请求的异步请求就是这个方法。
至此,握手结束,服务器返回响应码101,表示协议升级。

而后咱们继续看看获取服务器响应以后又作了什么?
在发送Http请求成功以后,onResponse响应方法里面主要表现为四个处理逻辑:

  • Http流转换成WebSocket流,获得Streams对象,这个流后面会转化成输入流和输出流,也就是进行发送和读取的操做流
  • listener.onOpen(this@RealWebSocket, response),回调了接口WebSocketListeneronOpen方法,告诉用户WebSocket已经链接
  • initReaderAndWriter(name, streams)
  • loopReader()

前两个逻辑仍是比较好理解,主要是后两个方法,咱们分别解析下。
首先看initReaderAndWriter方法。

initReaderAndWriter(初始化输入流输出流)

//RealWebSocket.kt

  @Throws(IOException::class)
  fun initReaderAndWriter(name: String, streams: Streams) {
    val extensions = this.extensions!!
    synchronized(this) {
      //***
      
      //写数据,发送数据的工具类
      this.writer = WebSocketWriter()
      
      //设置心跳包事件
      if (pingIntervalMillis != 0L) {
        val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
        taskQueue.schedule("$name ping", pingIntervalNanos) {
          writePingFrame()
          return@schedule pingIntervalNanos
        }
      }
      //***
    }

		//***
		
		//读取数据的工具类
    reader = WebSocketReader(     
      ***
      frameCallback = this,
      ***
    )
  }
  
  internal fun writePingFrame() {
   //***
    try {
      writer.writePing(ByteString.EMPTY)
    } catch (e: IOException) {
      failWebSocket(e, null)
    }
  }

这个方法主要干了两件事:

  • 实例化输出流输入流工具类,也就是WebSocketWriterWebSocketReader,用来处理数据的收发。
  • 设置心跳包事件。若是pingIntervalMillis参数不为0,就经过计时器,每隔pingIntervalNanos发送一个ping消息。其中writePingFrame方法就是发送了ping帧数据。

接收消息处理消息

loopReader

接着看看这个loopReader方法是干什么的,看这个名字咱们大胆猜想下,难道这个方法就是用来循环读取数据的?去代码里找找答案:

fun loopReader() {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader!!.processNextFrame()
    }
  }

代码很简单,一个while循环,循环条件是receivedCloseCode == -1的时候,作的事情是reader!!.processNextFrame()方法。继续:

//WebSocketWriter.kt
  fun processNextFrame() {
    //读取头部信息
    readHeader()
    if (isControlFrame) {
      //若是是控制帧,读取控制帧内容
      readControlFrame()
    } else {
      //读取普通消息内容
      readMessageFrame()
    }
  }
  
  //读取头部信息
  @Throws(IOException::class, ProtocolException::class)
  private fun readHeader() {
    if (closed) throw IOException("closed")
    
    try {
     //读取数据,获取数据帧的前8位
      b0 = source.readByte() and 0xff
    } finally {
      source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
    }    
    //***
    //获取数据帧的opcode(数据格式)
    opcode = b0 and B0_MASK_OPCODE
    //是否为最终帧
    isFinalFrame = b0 and B0_FLAG_FIN != 0
    //是否为控制帧(指令)
    isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0

    //判断最终帧,获取帧长度等等
  }  
  
  
  //读取控制帧(指令)
    @Throws(IOException::class)
  private fun readControlFrame() {
    if (frameLength > 0L) {
      source.readFully(controlFrameBuffer, frameLength)
    }

    when (opcode) {
      OPCODE_CONTROL_PING -> {
      //ping 帧
        frameCallback.onReadPing(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_PONG -> {
        //pong 帧
        frameCallback.onReadPong(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_CLOSE -> {
        //关闭 帧
        var code = CLOSE_NO_STATUS_CODE
        var reason = ""
        val bufferSize = controlFrameBuffer.size
        if (bufferSize == 1L) {
          throw ProtocolException("Malformed close payload length of 1.")
        } else if (bufferSize != 0L) {
          code = controlFrameBuffer.readShort().toInt()
          reason = controlFrameBuffer.readUtf8()
          val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
          if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
        }
        //回调onReadClose方法
        frameCallback.onReadClose(code, reason)
        closed = true
      }
    }
  }
  
  //读取普通消息
  @Throws(IOException::class)
  private fun readMessageFrame() {
    
    readMessage()

    if (readingCompressedMessage) {
      val messageInflater = this.messageInflater
          ?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
      messageInflater.inflate(messageFrameBuffer)
    }

    if (opcode == OPCODE_TEXT) {
      frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
    } else {
      frameCallback.onReadMessage(messageFrameBuffer.readByteString())
    }
  }

代码仍是比较直观,这个processNextFrame其实就是读取数据用的,首先读取头部信息,获取数据帧的类型,判断是否为控制帧,再分别去读取控制帧数据或者普通消息帧数据。

数据帧格式

问题来了,什么是数据头部信息,什么是控制帧
这里就要说下WebSocket的数据帧了,先附上一个数据帧格式:

0 1 2 3 4 5 6 7    0 1 2 3 4 5 6 7  0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
  +-+-+-+-+-------+  +-+-------------+ +-----------------------------+
  |F|R|R|R| OP    |  |M| LENGTH      |   Extended payload length
  |I|S|S|S| CODE  |  |A|             |  (if LENGTH=126)
  |N|V|V|V|       |  |S|             |
  | |1|2|3|       |  |K|             |
  +-+-+-+-+-------+  +-+-------------+
  |                      Extended payload length(if LENGTH=127)
  +                                  +-------------------------------
  |      Extended payload length     | Masking-key,if Mask set to 1
  +----------------------------------+-------------------------------
  |   Masking-key                    |       Data
  +----------------------------------+-------------------------------
  |                                Data
  +----------------------------------+-------------------------------

我认可,我懵逼了。
冷静冷静,一步一步分析下吧。

首先每一行表明4个字节,一共也就是32位数,哦,那也就是几个字节而已嘛,每一个字节有他本身的表明意义呗,这样想是否是就很简单了,下面来具体看看每一个字节。

第1个字节:

  • 第一位是FIN码,其实就是一个标示位,由于数据可能多帧操做嘛,因此多帧状况下,只有最后一帧的FIN设置成1,标示结束帧,前面全部帧设置为0。
  • 第二位到第四位是RSV码,通常通讯两端没有设置自定义协议,就默认为0。
  • 后四位是opcode,咱们叫它操做码。这个就是判断这个数据帧的类型了,通常有如下几个被定义好的类型:

1) 0x0 表示附加数据帧
2) 0x1 表示文本数据帧
3) 0x2 表示二进制数据帧
4) 0x3-7 保留用于将来的非控制帧
5) 0x8 表示链接关闭
6) 0x9 表示ping
7) 0xA 表示pong
8) 0xB-F 保留用于将来的非控制帧

是否是发现了些什么,这不就对应了咱们应用中的几种格式吗?2和3对应的是普通消息帧,包括了文本和二进制数据。567对应的就是控制帧格式,包括了close,ping,pong

第2个字节:

  • 第一位是Mask掩码,其实就是标识数据是否加密混淆,1表明数据通过掩码的,0是没有通过掩码的,若是是1的话,后续就会有4个字节表明掩码key,也就是数据帧中Masking-key所处的位置。
  • 后7位是LENGTH,用来标示数据长度。由于只有7位,因此最大只能储存1111111对应的十进制数127长度的数据,若是须要更大的数据,这个储存长度确定就不够了。
    因此规定来了,1) 小于126长度则数据用这七位表示实际长度。2) 若是长度设置为126,也就是二进制1111110,就表明取额外2个字节表示数据长度,共是16位表示数据长度。3) 若是长度设置为127,也就是二进制1111111,就表明取额外8个字节,共是64位表示数据长度。

须要注意的是LENGHT的三种状况在一个数据帧里面只会出现一种状况,不共存,因此在图中是用if表示。一样的,Masking-key也是当Mask为1的时候才存在。

因此也就有了数据帧里面的Extended payload length(LENGTH=126)所处的2个字节,以及Extended payload length(LENGTH=127)所处的8个字节。

最后的字节部分天然就是掩码key(Mask为1的时候才存在)和具体的传输数据了。
仍是有点晕吧😷,来张图总结下:
数据帧格式.jpeg

好了,了解了数据帧格式后,咱们再来读源码就清晰多了。
先看看怎么读的头部信息并解析的:

//取数据帧前8位数据
  b0 = source.readByte() and 0xff
  //获取数据帧的opcode(数据格式)
  opcode = b0 and B0_MASK_OPCODE(15)
  //是否为最终帧
  isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
  //是否为控制帧(指令)
  isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
  • 第一句获取头信息,and是按位与计算,and 0xff 意思就是按位与11111111,因此头部信息其实就是取了数据帧的前8位数据,一个字节。
  • 第二句获取opcodeand 15也就是按位与00001111,其实也就是取了后四位数据,恰好对应上opcode的位置,第一个字节的后四位。
  • 第三句获取是否为最终帧,刚才数据帧格式中说过,第一位FIN标识了是否为最后一帧数据,1表明结束帧,因此这里and 128 也就是按位与10000000,也就是取的第一位数。
  • 第四句获取是否为控制帧,and 8也就是按位与00001000,取得是第五位,也就是opcode的第一位,这是什么意思呢?咱们看看刚才的数据帧格式,发现从0x8开始就是所谓的控制帧了。0x8对应的二进制是1000,0x7对应的二进制是0111。发现了吧,若是为控制帧的时候,opcode第一位确定是为1的,因此这里就判断的第五位。

后面还有读取第二个字节的代码,你们能够本身沿着这个思路本身看看,包括了读取MASK,读取数据长度的三种长度等。

因此这个processNextFrame方法主要作了三件事:

  • readHeader方法中,判断了是否为控制帧,是否为结束帧,而后获取了Mask标识,帧长度等参数
  • readControlFrame方法中,主要处理了该帧数据为ping,pong,close三种状况,而且在收到close关闭帧的状况下,回调了onReadClose方法,这个待会要细看下。
  • readMessageFrame方法中,主要是读取了消息后,回调了onReadMessage方法。

至此能够发现,其实WebSocket传输数据并非一个简单的事,只是OkHttp都帮咱们封装好了,咱们只须要直接传输数据便可,感谢这些三方库为咱们开发做出的贡献,不知道何时我也能作出点贡献呢🤔。

对了,刚才说回调也很重要,接着看看。onReadCloseonReadMessage回调到哪了呢?还记得上文初始化WebSocketWriter的时候设置了回调接口吗。因此就是回调给RealWebSocket了:

//RealWebSocket.kt
  override fun onReadClose(code: Int, reason: String) {
    require(code != -1)

    var toClose: Streams? = null
    var readerToClose: WebSocketReader? = null
    var writerToClose: WebSocketWriter? = null
    synchronized(this) {
      check(receivedCloseCode == -1) { "already closed" }
      receivedCloseCode = code
      receivedCloseReason = reason 
      //...
    }

    try {
      listener.onClosing(this, code, reason)

      if (toClose != null) {
        listener.onClosed(this, code, reason)
      }
    } finally {
      toClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }
  
  @Throws(IOException::class)
  override fun onReadMessage(text: String) {
    listener.onMessage(this, text)
  }

  @Throws(IOException::class)
  override fun onReadMessage(bytes: ByteString) {
    listener.onMessage(this, bytes)
  }

onReadClose回调方法里面有个关键的参数,receivedCloseCode。还记得这个参数吗?上文中解析消息的循环条件就是receivedCloseCode == -1,因此当收到关闭帧的时候,receivedCloseCode就再也不等于-1(规定大于1000),也就再也不去读取解析消息了。这样整个流程就结束了。

其中还有一些WebSocketListener的回调,好比onClosing,onClosed,onMessage等,就直接回调给用户使用了。至此,接收消息处理消息说完了。

发消息

好了。接着说发送,看看send方法:

@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
    // ***
    // Enqueue the message frame.
    queueSize += data.size.toLong()
    messageAndCloseQueue.add(Message(formatOpcode, data))
    runWriter()
    return true
  }

首先,把要发送的data封装成Message对象,而后入队列messageAndCloseQueue。最后执行runWriter方法。这都不用猜了,runWriter确定就要开始发送消息了,继续看:

//RealWebSocket.kt
  private fun runWriter() {
    this.assertThreadHoldsLock()

    val writerTask = writerTask
    if (writerTask != null) {
      taskQueue.schedule(writerTask)
    }
  }
  
  private inner class WriterTask : Task("$name writer") {
    override fun runOnce(): Long {
      try {
        if (writeOneFrame()) return 0L
      } catch (e: IOException) {
        failWebSocket(e, null)
      }
      return -1L
    }
  }  
  
  //如下是schedule方法转到WriterTask的runOnce方法过程

  //TaskQueue.kt
  fun schedule(task: Task, delayNanos: Long = 0L) {
    synchronized(taskRunner) {
      if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
        taskRunner.kickCoordinator(this)
      }
    }
  }
  
  internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
    //***
    if (insertAt == -1) insertAt = futureTasks.size
    futureTasks.add(insertAt, task)

    // Impact the coordinator if we inserted at the front.
    return insertAt == 0
  }  

  //TaskRunner.kt
  internal fun kickCoordinator(taskQueue: TaskQueue) {
    this.assertThreadHoldsLock()
    
    if (taskQueue.activeTask == null) {
      if (taskQueue.futureTasks.isNotEmpty()) {
        readyQueues.addIfAbsent(taskQueue)
      } else {
        readyQueues.remove(taskQueue)
      }
    }    
    
    if (coordinatorWaiting) {
      backend.coordinatorNotify(this@TaskRunner)
    } else {
      backend.execute(runnable)
    }
  }  
  
  private val runnable: Runnable = object : Runnable {
    override fun run() {
      while (true) {
        val task = synchronized(this@TaskRunner) {
          awaitTaskToRun()
        } ?: return

        logElapsed(task, task.queue!!) {
          var completedNormally = false
          try {
            runTask(task)
            completedNormally = true
          } finally {
            // If the task is crashing start another thread to service the queues.
            if (!completedNormally) {
              backend.execute(this)
            }
          }
        }
      }
    }
  }
  
  private fun runTask(task: Task) {
    try {
      delayNanos = task.runOnce()
    } 
  }

代码有点长,这里是从runWriter开始跟的几个方法,拿到writerTask实例后,存到TaskQueuefutureTasks列表里,而后到runnable这里能够看到是一个while死循环,不断的从futureTasks中取出Task并执行runTask方法,直到Task为空,循环中止。

其中涉及到两个新的类:

  • TaskQueue类主要就是管理消息任务列表,保证按顺序执行
  • TaskRunner类主要就是作一些任务的具体操做,好比线程池里执行任务,记录消息任务的状态(准备发送的任务队列readyQueues,正在执行的任务队列busyQueues等等)

而每个Task最后都是执行到了WriterTaskrunOnce方法,也就是writeOneFrame方法:

internal fun writeOneFrame(): Boolean {
    synchronized(this@RealWebSocket) {
      if (failed) {
        return false // Failed web socket.
      }
      writer = this.writer
      pong = pongQueue.poll()
      if (pong == null) {
        messageOrClose = messageAndCloseQueue.poll()
        if (messageOrClose is Close) {
        } else if (messageOrClose == null) {
            return false // The queue is exhausted.
        }
      }
    }

   //发送消息逻辑,包括`pong`消息,普通消息,关闭消息
    try {
      if (pong != null) {
        writer!!.writePong(pong)
      } else if (messageOrClose is Message) {
        val message = messageOrClose as Message
        writer!!.writeMessageFrame(message.formatOpcode, message.data)
        synchronized(this) {
          queueSize -= message.data.size.toLong()
        }
      } else if (messageOrClose is Close) {
        val close = messageOrClose as Close
        writer!!.writeClose(close.code, close.reason)
        // We closed the writer: now both reader and writer are closed.
        if (streamsToClose != null) {
          listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
        }
      } 
      return true
    } finally {
      streamsToClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }

这里就会执行发送消息的逻辑了,主要有三种消息状况处理:

  • pong消息,这个主要是为服务器端准备的,发送给客户端回应心跳包。
  • 普通消息,就会把数据类型Opcode和具体数据发送过去
  • 关闭消息,其实当用户执行close方法关闭WebSocket的时候,也是发送了一条Close控制帧消息给服务器告知这个关闭需求,并带上code状态码reason关闭缘由,而后服务器端就会关闭当前链接。

好了。最后一步了,就是把这些数据组装成WebSocket数据帧并写入流,分红控制帧数据和普通消息数据帧

//写入(发送)控制帧
  private fun writeControlFrame(opcode: Int, payload: ByteString) {
    if (writerClosed) throw IOException("closed")
    
    val length = payload.size
    require(length <= PAYLOAD_BYTE_MAX) {
      "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
    }
    val b0 = B0_FLAG_FIN or opcode
    sinkBuffer.writeByte(b0)

    var b1 = length
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
      sinkBuffer.writeByte(b1)
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (length > 0) {
        val payloadStart = sinkBuffer.size
        sinkBuffer.write(payload)
        sinkBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(payloadStart)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    } else {
      sinkBuffer.writeByte(b1)
      sinkBuffer.write(payload)
    }

    sink.flush()
  }


  //写入(发送)普通消息数据帧
  @Throws(IOException::class)
  fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
    if (writerClosed) throw IOException("closed")

    messageBuffer.write(data)

    var b0 = formatOpcode or B0_FLAG_FIN
    val dataSize = messageBuffer.size
    sinkBuffer.writeByte(b0)

    var b1 = 0
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
    }
    when {
      dataSize <= PAYLOAD_BYTE_MAX -> {
        b1 = b1 or dataSize.toInt()
        sinkBuffer.writeByte(b1)
      }
      dataSize <= PAYLOAD_SHORT_MAX -> {
        b1 = b1 or PAYLOAD_SHORT
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeShort(dataSize.toInt())
      }
      else -> {
        b1 = b1 or PAYLOAD_LONG
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeLong(dataSize)
      }
    }

    if (isClient) {
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (dataSize > 0L) {
        messageBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(0L)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    }

    sinkBuffer.write(messageBuffer, dataSize)
    sink.emit()
  }

你们应该都能看懂了吧,其实就是组装数据帧,包括Opcode,mask,数据长度等等。两个方法的不一样就在于普通数据须要判断数据长度的三种状况,再组装数据帧。最后都会经过sinkBuffer写入到输出数据流。

终于,基本的流程说的差很少了。其中还有不少细节,同窗们能够本身花时间看看琢磨琢磨,好比Okio部分。仍是那句话,但愿你们有空本身也读一读相关源码,这样理解才能深入,并且你确定会发现不少我没说到的细节,欢迎你们讨论。我也会继续努力,最后你们给我加个油点个赞吧,感谢感谢。

总结

再来个图总结下吧!🎉
OkHttp-WebSocket源码.jpg

参考

OkHttp源码
《WebSocket协议翻译》

附件

OkHttp源码
WebSocket功能实现源码


个人公众号:码上积木,天天三问面试题,详细剖析,助你成为offer收割机。

谢谢你的阅读,若是你以为写的还行,就点个赞支持下吧!感谢!
你的一个👍,就是我分享的动力❤️。

相关文章
相关标签/搜索