最近老板又来新需求了,要作一个物联网相关的app
,其中有个需求是客户端须要收发服务器不按期发出的消息。
心里OS:
🤔 这咋整呢?经过接口轮询?定时访问接口,有数据就更新?
🤔 不行不行,这样浪费资源了,还耗电,会致使不少请求都是无效的网络操做。
🤔 那就长链接呗?WebSocket协议
好像不错,经过握手创建长链接后,能够随时收发服务器的消息。那就它了!
🤔 怎么集成呢?正好前段时间复习OkHttp
源码的时候发现了它是支持Websocket
协议的,那就用它试试吧!(戏好多,演不下去了🤮)java
开淦!git
先简单介绍下WebSocket
。
咱们都知道Http是处于应用层的一个通讯协议
,可是只支持单向主动通讯,作不到服务器主动向客户端推送消息。并且Http是无状态
的,即每次通讯都没有关联性,致使跟服务器关系不紧密。github
为了解决和服务器长时间通讯的痛点呢,HTML5
规范引出了WebSocket
协议(知道这名字咋来的吧,人家HTML5
规范引出的,随爸姓),是一种创建在TCP
协议基础上的全双工通讯的协议。他跟Http
同属于应用层协议,下层仍是须要经过TCP创建链接。web
可是,WebSocket
在TCP
链接创建后,还要经过Http
进行一次握手,也就是经过Http
发送一条GET请求
消息给服务器,告诉服务器我要创建WebSocket链接
了,你准备好哦,具体作法就是在头部信息中添加相关参数。而后服务器响应我知道了,而且将链接协议改为WebSocket
,开始创建长链接。面试
这里贴上请求头和响应头信息,从网上找了一张图:服务器
简单说明下参数:websocket
ws
或者wss
开头,ws
对应Websocket
协议,wss
对应在TLS
之上的WebSocket
。相似于Http
和Https
的关系。Connection:Upgrade
,表示客户端要链接升级,不用Http协议。Upgrade:websocket
, 表示客户端要升级创建Websocket
链接。Sec-Websocket-Key:key
, 这个key是随机生成的,服务器会经过这个参数验证该请求是否有效。Sec-WebSocket-Version:13
, websocket使用的协议,通常就是13。Sec-webSocket-Extension:permessage-deflate
,客户端指定的一些扩展协议,好比这里permessage-deflate
就是WebSocket
的一种压缩协议。响应码101,
表示响应协议升级,后续的数据交互都按照Upgradet指定的WebSocket
协议来。implementation("com.squareup.okhttp3:okhttp:4.7.2")
首先是初始化OkHttpClient
和WebSocket
实例:网络
/** * 初始化WebSocket */ public void init() { mWbSocketUrl = "ws://echo.websocket.org"; mClient = new OkHttpClient.Builder() .pingInterval(10, TimeUnit.SECONDS) .build(); Request request = new Request.Builder() .url(mWbSocketUrl) .build(); mWebSocket = mClient.newWebSocket(request, new WsListener()); }
这里主要是配置了OkHttp
的一些参数,以及WebSocket
的链接地址。其中newWebSocket
方法就是进行WebSocket
的初始化和链接。app
这里要注意的点是pingInterval
方法的配置,这个方法主要是用来设置WebSocket
链接的保活。
相信作过长链接的同窗都知道,一个长链接通常要隔几秒发送一条消息告诉服务器我在线,而服务器也会回复一个消息表示收到了,这样就确认了链接正常,客户端和服务器端都在线。less
若是服务器没有按时收到
这个消息那么服务器可能就会主动关闭
这个链接,节约资源。
客户端没有正常收到
这个返回的消息,也会作一些相似重连的操做
,因此这个保活消息很是重要。
咱们称这个消息叫做心跳包
,通常用PING,PONG
表示,像乒乓球同样,一来一回。
因此这里的pingInterval
就是设置心跳包发送的间隔时间,设置了这个方法以后,OkHttp
就会自动帮咱们发送心跳包事件,也就是ping
包。当间隔时间到了,没有收到pong
包的话,监听事件中的onFailure
方法就会被调用,此时咱们就能够进行重连。
可是因为实际业务需求不同,以及okhttp
中心跳包事件给予咱们权限较少,因此咱们也能够本身完成心跳包事件,即在WebSocket
链接成功以后,开始定时发送ping
包,在下一次发送ping
包以前检查上一个pong
包是否收到,若是没收到,就视为异常,开始重连。感兴趣的同窗能够看看文末的相关源码。
创建链接后,咱们就能够正常发送和读取消息了,也就是在上文WsListener
监听事件中表现:
//监听事件,用于收消息,监听链接的状态 class WsListener extends WebSocketListener { @Override public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) { super.onClosed(webSocket, code, reason); } @Override public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) { super.onClosing(webSocket, code, reason); } @Override public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { super.onFailure(webSocket, t, response); } @Override public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { super.onMessage(webSocket, text); Log.e(TAG, "客户端收到消息:" + text); onWSDataChanged(DATE_NORMAL, text); //测试发消息 webSocket.send("我是客户端,你好啊"); } @Override public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) { super.onMessage(webSocket, bytes); } @Override public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { super.onOpen(webSocket, response); Log.e(TAG,"链接成功!"); } } //发送String消息 public void send(final String message) { if (mWebSocket != null) { mWebSocket.send(message); } } /** * 发送byte消息 * @param message */ public void send(final ByteString message) { if (mWebSocket != null) { mWebSocket.send(message); } } //主动断开链接 public void disconnect(int code, String reason) { if (mWebSocket != null) mWebSocket.close(code, reason); }
这里要注意,回调的方法都是在子线程回调的,若是须要更新UI
,须要切换到主线程。
基本操做就这么多,仍是很简单的吧,初始化Websocket
——链接——链接成功——收发消息。
其中WebSocket
类是一个操做接口,主要提供了如下几个方法
send(text: String)
发送一个String类型的消息send(bytes: ByteString)
发送一个二进制类型的消息close(code: Int, reason: String?)
关闭WebSocket链接若是有同窗想测试下WebSocket
的功能可是又没有实际的服务器,怎么办呢?
其实OkHttp
官方有一个MockWebSocket
服务,能够用来模拟服务端,下面咱们一块儿试一下:
首先集成MockWebSocket
服务库:
implementation 'com.squareup.okhttp3:mockwebserver:4.7.2'
而后就能够新建MockWebServer
,并加入MockResponse
做为接收消息的响应。
MockWebServer mMockWebServer = new MockWebServer(); MockResponse response = new MockResponse() .withWebSocketUpgrade(new WebSocketListener() { @Override public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { super.onOpen(webSocket, response); //有客户端链接时回调 Log.e(TAG, "服务器收到客户端链接成功回调:"); mWebSocket = webSocket; mWebSocket.send("我是服务器,你好呀"); } @Override public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { super.onMessage(webSocket, text); Log.e(TAG, "服务器收到消息:" + text); } @Override public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) { super.onClosed(webSocket, code, reason); Log.e(TAG, "onClosed:"); } }); mMockWebServer.enqueue(response);
这里服务器端在收到客户端链接成功消息后,给客户端发送了一条消息。
要注意的是这段代码要在子线程执行,由于主线程不能进行网络操做。
而后就能够去初始化Websocket
客户端了:
//获取链接url,初始化websocket客户端 String websocketUrl = "ws://" + mMockWebServer.getHostName() + ":" + mMockWebServer.getPort() + "/"; WSManager.getInstance().init(websocketUrl);
ok,运行项目
//运行结果 E/jimu: mWbSocketUrl=ws://localhost:38355/ E/jimu: 服务器收到客户端链接成功回调: E/jimu: 链接成功! E/jimu: 客户端收到消息:我是服务器,你好呀 E/jimu: 服务器收到消息:我是客户端,你好啊
相关的WebSocket
管理类和模拟服务器类我也上传到github
了,有须要的同窗能够文末自取。
WebSocket
整个流程无非三个功能:链接,接收消息,发送消息。下面咱们就从这三个方面
分析下具体是怎么实现的。
经过上面的代码咱们得知,WebSocket
链接是经过newWebSocket
方法。直接点进去看这个方法:
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket { val webSocket = RealWebSocket( taskRunner = TaskRunner.INSTANCE, originalRequest = request, listener = listener, random = Random(), pingIntervalMillis = pingIntervalMillis.toLong(), extensions = null, // Always null for clients. minimumDeflateSize = minWebSocketMessageToCompress ) webSocket.connect(this) return webSocket }
这里作了两件事:
RealWebSocket
,主要是设置了一些参数(好比pingIntervalMillis
心跳包时间间隔,还有监听事件之类的)connect
方法进行WebSocket
链接继续查看connect方法:
fun connect(client: OkHttpClient) { //*** val webSocketClient = client.newBuilder() .eventListener(EventListener.NONE) .protocols(ONLY_HTTP1) .build() val request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .header("Sec-WebSocket-Extensions", "permessage-deflate") .build() call = RealCall(webSocketClient, request, forWebSocket = true) call!!.enqueue(object : Callback { override fun onResponse(call: Call, response: Response) { //获得数据流 val streams: Streams try { checkUpgradeSuccess(response, exchange) streams = exchange!!.newWebSocketStreams() } //*** // Process all web socket messages. try { val name = "$okHttpName WebSocket ${request.url.redact()}" initReaderAndWriter(name, streams) listener.onOpen(this@RealWebSocket, response) loopReader() } catch (e: Exception) { failWebSocket(e, null) } } }) }
上一篇使用篇文章中说过,Websocket
链接须要一次Http
协议的握手,而后才能把协议升级成WebSocket
。因此这段代码就体现出这个功能了。
首先就new
了一个用来进行Http
链接的request
,其中Header
的参数就表示我要进行WebSocket
链接了,参数解析以下:
Connection:Upgrade
,表示客户端要链接升级Upgrade:websocket
, 表示客户端要升级创建Websocket链接Sec-Websocket-Key:key
, 这个key是随机生成的,服务器会经过这个参数验证该请求是否有效Sec-WebSocket-Version:13
, websocket使用的版本,通常就是13Sec-webSocket-Extension:permessage-deflate
,客户端指定的一些扩展协议,好比这里permessage-deflate
就是WebSocket
的一种压缩协议。Header
设置好以后,就调用了call
的enqueue
方法,这个方法你们应该都很熟悉吧,OkHttp
里面对于Http
请求的异步请求就是这个方法。
至此,握手结束,服务器返回响应码101
,表示协议升级。
而后咱们继续看看获取服务器响应以后又作了什么?
在发送Http
请求成功以后,onResponse
响应方法里面主要表现为四个处理逻辑:
Http
流转换成WebSocket
流,获得Streams
对象,这个流后面会转化成输入流和输出流,也就是进行发送和读取的操做流listener.onOpen(this@RealWebSocket, response)
,回调了接口WebSocketListener
的onOpen
方法,告诉用户WebSocket
已经链接initReaderAndWriter(name, streams)
loopReader()
前两个逻辑仍是比较好理解,主要是后两个方法,咱们分别解析下。
首先看initReaderAndWriter
方法。
//RealWebSocket.kt @Throws(IOException::class) fun initReaderAndWriter(name: String, streams: Streams) { val extensions = this.extensions!! synchronized(this) { //*** //写数据,发送数据的工具类 this.writer = WebSocketWriter() //设置心跳包事件 if (pingIntervalMillis != 0L) { val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis) taskQueue.schedule("$name ping", pingIntervalNanos) { writePingFrame() return@schedule pingIntervalNanos } } //*** } //*** //读取数据的工具类 reader = WebSocketReader( *** frameCallback = this, *** ) } internal fun writePingFrame() { //*** try { writer.writePing(ByteString.EMPTY) } catch (e: IOException) { failWebSocket(e, null) } }
这个方法主要干了两件事:
WebSocketWriter
和WebSocketReader
,用来处理数据的收发。pingIntervalMillis
参数不为0,就经过计时器,每隔pingIntervalNanos
发送一个ping
消息。其中writePingFrame
方法就是发送了ping
帧数据。接着看看这个loopReader
方法是干什么的,看这个名字咱们大胆猜想下,难道这个方法就是用来循环读取数据的?去代码里找找答案:
fun loopReader() { while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. reader!!.processNextFrame() } }
代码很简单,一个while
循环,循环条件是receivedCloseCode == -1
的时候,作的事情是reader!!.processNextFrame()
方法。继续:
//WebSocketWriter.kt fun processNextFrame() { //读取头部信息 readHeader() if (isControlFrame) { //若是是控制帧,读取控制帧内容 readControlFrame() } else { //读取普通消息内容 readMessageFrame() } } //读取头部信息 @Throws(IOException::class, ProtocolException::class) private fun readHeader() { if (closed) throw IOException("closed") try { //读取数据,获取数据帧的前8位 b0 = source.readByte() and 0xff } finally { source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS) } //*** //获取数据帧的opcode(数据格式) opcode = b0 and B0_MASK_OPCODE //是否为最终帧 isFinalFrame = b0 and B0_FLAG_FIN != 0 //是否为控制帧(指令) isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0 //判断最终帧,获取帧长度等等 } //读取控制帧(指令) @Throws(IOException::class) private fun readControlFrame() { if (frameLength > 0L) { source.readFully(controlFrameBuffer, frameLength) } when (opcode) { OPCODE_CONTROL_PING -> { //ping 帧 frameCallback.onReadPing(controlFrameBuffer.readByteString()) } OPCODE_CONTROL_PONG -> { //pong 帧 frameCallback.onReadPong(controlFrameBuffer.readByteString()) } OPCODE_CONTROL_CLOSE -> { //关闭 帧 var code = CLOSE_NO_STATUS_CODE var reason = "" val bufferSize = controlFrameBuffer.size if (bufferSize == 1L) { throw ProtocolException("Malformed close payload length of 1.") } else if (bufferSize != 0L) { code = controlFrameBuffer.readShort().toInt() reason = controlFrameBuffer.readUtf8() val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code) if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage) } //回调onReadClose方法 frameCallback.onReadClose(code, reason) closed = true } } } //读取普通消息 @Throws(IOException::class) private fun readMessageFrame() { readMessage() if (readingCompressedMessage) { val messageInflater = this.messageInflater ?: MessageInflater(noContextTakeover).also { this.messageInflater = it } messageInflater.inflate(messageFrameBuffer) } if (opcode == OPCODE_TEXT) { frameCallback.onReadMessage(messageFrameBuffer.readUtf8()) } else { frameCallback.onReadMessage(messageFrameBuffer.readByteString()) } }
代码仍是比较直观,这个processNextFrame
其实就是读取数据用的,首先读取头部信息,获取数据帧的类型,判断是否为控制帧,再分别去读取控制帧数据或者普通消息帧数据。
问题来了,什么是数据头部信息,什么是控制帧?
这里就要说下WebSocket
的数据帧了,先附上一个数据帧格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +-+-+-+-+-------+ +-+-------------+ +-----------------------------+ |F|R|R|R| OP | |M| LENGTH | Extended payload length |I|S|S|S| CODE | |A| | (if LENGTH=126) |N|V|V|V| | |S| | | |1|2|3| | |K| | +-+-+-+-+-------+ +-+-------------+ | Extended payload length(if LENGTH=127) + +------------------------------- | Extended payload length | Masking-key,if Mask set to 1 +----------------------------------+------------------------------- | Masking-key | Data +----------------------------------+------------------------------- | Data +----------------------------------+-------------------------------
我认可,我懵逼了。
冷静冷静,一步一步分析下吧。
首先每一行表明4个字节,一共也就是32位数,哦,那也就是几个字节而已嘛,每一个字节有他本身的表明意义呗,这样想是否是就很简单了,下面来具体看看每一个字节。
第1个字节:
FIN码
,其实就是一个标示位,由于数据可能多帧操做嘛,因此多帧状况下,只有最后一帧的FIN
设置成1,标示结束帧,前面全部帧设置为0。RSV码
,通常通讯两端没有设置自定义协议,就默认为0。opcode
,咱们叫它操做码。这个就是判断这个数据帧的类型了,通常有如下几个被定义好的类型:1) 0x0
表示附加数据帧
2) 0x1
表示文本数据帧
3) 0x2
表示二进制数据帧
4) 0x3-7
保留用于将来的非控制帧
5) 0x8
表示链接关闭
6) 0x9
表示ping
7) 0xA
表示pong
8) 0xB-F
保留用于将来的非控制帧
是否是发现了些什么,这不就对应了咱们应用中的几种格式吗?2和3
对应的是普通消息帧,包括了文本和二进制数据。567
对应的就是控制帧格式,包括了close,ping,pong
。
第2个字节:
Mask
掩码,其实就是标识数据是否加密混淆,1表明数据通过掩码的,0是没有通过掩码的,若是是1的话,后续就会有4个字节表明掩码key
,也就是数据帧中Masking-key
所处的位置。LENGTH
,用来标示数据长度。由于只有7位,因此最大只能储存1111111对应的十进制数127长度
的数据,若是须要更大的数据,这个储存长度确定就不够了。小于126长度
则数据用这七位表示实际长度。2) 若是长度设置为126
,也就是二进制1111110,就表明取额外2个字节
表示数据长度,共是16位表示数据长度。3) 若是长度设置为127
,也就是二进制1111111,就表明取额外8个字节
,共是64位表示数据长度。须要注意的是LENGHT的三种状况在一个数据帧里面只会出现一种状况,不共存,因此在图中是用if表示。一样的,Masking-key也是当Mask为1的时候才存在。
因此也就有了数据帧里面的Extended payload length(LENGTH=126)
所处的2个字节,以及Extended payload length(LENGTH=127)
所处的8个字节。
最后的字节部分天然就是掩码key
(Mask为1的时候才存在)和具体的传输数据
了。
仍是有点晕吧😷,来张图总结下:
好了,了解了数据帧格式后,咱们再来读源码就清晰多了。
先看看怎么读的头部信息
并解析的:
//取数据帧前8位数据 b0 = source.readByte() and 0xff //获取数据帧的opcode(数据格式) opcode = b0 and B0_MASK_OPCODE(15) //是否为最终帧 isFinalFrame = b0 and B0_FLAG_FIN(128) != 0 //是否为控制帧(指令) isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
and
是按位与计算,and 0xff
意思就是按位与11111111,因此头部信息其实就是取了数据帧的前8位数据
,一个字节。opcode
,and 15
也就是按位与00001111,其实也就是取了后四位数据,恰好对应上opcode
的位置,第一个字节的后四位。最终帧
,刚才数据帧格式中说过,第一位FIN
标识了是否为最后一帧数据,1表明结束帧,因此这里and 128
也就是按位与10000000,也就是取的第一位数。and 8
也就是按位与00001000,取得是第五位,也就是opcode
的第一位,这是什么意思呢?咱们看看刚才的数据帧格式,发现从0x8
开始就是所谓的控制帧了。0x8
对应的二进制是1000,0x7
对应的二进制是0111。发现了吧,若是为控制帧的时候,opcode
第一位确定是为1的,因此这里就判断的第五位。后面还有读取第二个字节的代码,你们能够本身沿着这个思路本身看看,包括了读取MASK
,读取数据长度的三种长度等。
因此这个processNextFrame
方法主要作了三件事:
readHeader
方法中,判断了是否为控制帧,是否为结束帧
,而后获取了Mask
标识,帧长度等参数readControlFrame
方法中,主要处理了该帧数据为ping,pong,close
三种状况,而且在收到close关闭帧
的状况下,回调了onReadClose
方法,这个待会要细看下。readMessageFrame
方法中,主要是读取了消息后,回调了onReadMessage方法。至此能够发现,其实WebSocket
传输数据并非一个简单的事,只是OkHttp
都帮咱们封装好了,咱们只须要直接传输数据便可,感谢这些三方库为咱们开发做出的贡献,不知道何时我也能作出点贡献呢🤔。
对了,刚才说回调也很重要,接着看看。onReadClose
和onReadMessage
回调到哪了呢?还记得上文初始化WebSocketWriter
的时候设置了回调接口吗。因此就是回调给RealWebSocket
了:
//RealWebSocket.kt override fun onReadClose(code: Int, reason: String) { require(code != -1) var toClose: Streams? = null var readerToClose: WebSocketReader? = null var writerToClose: WebSocketWriter? = null synchronized(this) { check(receivedCloseCode == -1) { "already closed" } receivedCloseCode = code receivedCloseReason = reason //... } try { listener.onClosing(this, code, reason) if (toClose != null) { listener.onClosed(this, code, reason) } } finally { toClose?.closeQuietly() readerToClose?.closeQuietly() writerToClose?.closeQuietly() } } @Throws(IOException::class) override fun onReadMessage(text: String) { listener.onMessage(this, text) } @Throws(IOException::class) override fun onReadMessage(bytes: ByteString) { listener.onMessage(this, bytes) }
onReadClose
回调方法里面有个关键的参数,receivedCloseCode
。还记得这个参数吗?上文中解析消息的循环条件就是receivedCloseCode == -1
,因此当收到关闭帧的时候,receivedCloseCode
就再也不等于-1(规定大于1000),也就再也不去读取解析消息了。这样整个流程就结束了。
其中还有一些WebSocketListener
的回调,好比onClosing,onClosed,onMessage
等,就直接回调给用户使用了。至此,接收消息处理消息说完了。
好了。接着说发送,看看send
方法:
@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean { // *** // Enqueue the message frame. queueSize += data.size.toLong() messageAndCloseQueue.add(Message(formatOpcode, data)) runWriter() return true }
首先,把要发送的data
封装成Message
对象,而后入队列messageAndCloseQueue
。最后执行runWriter
方法。这都不用猜了,runWriter
确定就要开始发送消息了,继续看:
//RealWebSocket.kt private fun runWriter() { this.assertThreadHoldsLock() val writerTask = writerTask if (writerTask != null) { taskQueue.schedule(writerTask) } } private inner class WriterTask : Task("$name writer") { override fun runOnce(): Long { try { if (writeOneFrame()) return 0L } catch (e: IOException) { failWebSocket(e, null) } return -1L } } //如下是schedule方法转到WriterTask的runOnce方法过程 //TaskQueue.kt fun schedule(task: Task, delayNanos: Long = 0L) { synchronized(taskRunner) { if (scheduleAndDecide(task, delayNanos, recurrence = false)) { taskRunner.kickCoordinator(this) } } } internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean { //*** if (insertAt == -1) insertAt = futureTasks.size futureTasks.add(insertAt, task) // Impact the coordinator if we inserted at the front. return insertAt == 0 } //TaskRunner.kt internal fun kickCoordinator(taskQueue: TaskQueue) { this.assertThreadHoldsLock() if (taskQueue.activeTask == null) { if (taskQueue.futureTasks.isNotEmpty()) { readyQueues.addIfAbsent(taskQueue) } else { readyQueues.remove(taskQueue) } } if (coordinatorWaiting) { backend.coordinatorNotify(this@TaskRunner) } else { backend.execute(runnable) } } private val runnable: Runnable = object : Runnable { override fun run() { while (true) { val task = synchronized(this@TaskRunner) { awaitTaskToRun() } ?: return logElapsed(task, task.queue!!) { var completedNormally = false try { runTask(task) completedNormally = true } finally { // If the task is crashing start another thread to service the queues. if (!completedNormally) { backend.execute(this) } } } } } } private fun runTask(task: Task) { try { delayNanos = task.runOnce() } }
代码有点长,这里是从runWriter
开始跟的几个方法,拿到writerTask
实例后,存到TaskQueue
的futureTasks列表
里,而后到runnable
这里能够看到是一个while
死循环,不断的从futureTasks
中取出Task
并执行runTask
方法,直到Task
为空,循环中止。
其中涉及到两个新的类:
TaskQueue类
主要就是管理消息任务列表,保证按顺序执行TaskRunner类
主要就是作一些任务的具体操做,好比线程池里执行任务,记录消息任务的状态(准备发送的任务队列readyQueues
,正在执行的任务队列busyQueues
等等)而每个Task最后都是执行到了WriterTask
的runOnce
方法,也就是writeOneFrame
方法:
internal fun writeOneFrame(): Boolean { synchronized(this@RealWebSocket) { if (failed) { return false // Failed web socket. } writer = this.writer pong = pongQueue.poll() if (pong == null) { messageOrClose = messageAndCloseQueue.poll() if (messageOrClose is Close) { } else if (messageOrClose == null) { return false // The queue is exhausted. } } } //发送消息逻辑,包括`pong`消息,普通消息,关闭消息 try { if (pong != null) { writer!!.writePong(pong) } else if (messageOrClose is Message) { val message = messageOrClose as Message writer!!.writeMessageFrame(message.formatOpcode, message.data) synchronized(this) { queueSize -= message.data.size.toLong() } } else if (messageOrClose is Close) { val close = messageOrClose as Close writer!!.writeClose(close.code, close.reason) // We closed the writer: now both reader and writer are closed. if (streamsToClose != null) { listener.onClosed(this, receivedCloseCode, receivedCloseReason!!) } } return true } finally { streamsToClose?.closeQuietly() readerToClose?.closeQuietly() writerToClose?.closeQuietly() } }
这里就会执行发送消息的逻辑了,主要有三种消息状况处理:
pong消息
,这个主要是为服务器端准备的,发送给客户端回应心跳包。普通消息
,就会把数据类型Opcode
和具体数据发送过去关闭消息
,其实当用户执行close
方法关闭WebSocket
的时候,也是发送了一条Close控制帧
消息给服务器告知这个关闭需求,并带上code状态码
和reason关闭缘由
,而后服务器端就会关闭当前链接。好了。最后一步了,就是把这些数据组装成WebSocket
数据帧并写入流,分红控制帧
数据和普通消息数据帧
:
//写入(发送)控制帧 private fun writeControlFrame(opcode: Int, payload: ByteString) { if (writerClosed) throw IOException("closed") val length = payload.size require(length <= PAYLOAD_BYTE_MAX) { "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX" } val b0 = B0_FLAG_FIN or opcode sinkBuffer.writeByte(b0) var b1 = length if (isClient) { b1 = b1 or B1_FLAG_MASK sinkBuffer.writeByte(b1) random.nextBytes(maskKey!!) sinkBuffer.write(maskKey) if (length > 0) { val payloadStart = sinkBuffer.size sinkBuffer.write(payload) sinkBuffer.readAndWriteUnsafe(maskCursor!!) maskCursor.seek(payloadStart) toggleMask(maskCursor, maskKey) maskCursor.close() } } else { sinkBuffer.writeByte(b1) sinkBuffer.write(payload) } sink.flush() } //写入(发送)普通消息数据帧 @Throws(IOException::class) fun writeMessageFrame(formatOpcode: Int, data: ByteString) { if (writerClosed) throw IOException("closed") messageBuffer.write(data) var b0 = formatOpcode or B0_FLAG_FIN val dataSize = messageBuffer.size sinkBuffer.writeByte(b0) var b1 = 0 if (isClient) { b1 = b1 or B1_FLAG_MASK } when { dataSize <= PAYLOAD_BYTE_MAX -> { b1 = b1 or dataSize.toInt() sinkBuffer.writeByte(b1) } dataSize <= PAYLOAD_SHORT_MAX -> { b1 = b1 or PAYLOAD_SHORT sinkBuffer.writeByte(b1) sinkBuffer.writeShort(dataSize.toInt()) } else -> { b1 = b1 or PAYLOAD_LONG sinkBuffer.writeByte(b1) sinkBuffer.writeLong(dataSize) } } if (isClient) { random.nextBytes(maskKey!!) sinkBuffer.write(maskKey) if (dataSize > 0L) { messageBuffer.readAndWriteUnsafe(maskCursor!!) maskCursor.seek(0L) toggleMask(maskCursor, maskKey) maskCursor.close() } } sinkBuffer.write(messageBuffer, dataSize) sink.emit() }
你们应该都能看懂了吧,其实就是组装数据帧,包括Opcode,mask,数据长度
等等。两个方法的不一样就在于普通数据须要判断数据长度的三种状况,再组装数据帧。最后都会经过sinkBuffer
写入到输出数据流。
终于,基本的流程说的差很少了。其中还有不少细节,同窗们能够本身花时间看看琢磨琢磨,好比Okio
部分。仍是那句话,但愿你们有空本身也读一读相关源码,这样理解才能深入,并且你确定会发现不少我没说到的细节,欢迎你们讨论。我也会继续努力,最后你们给我加个油点个赞吧,感谢感谢。
再来个图总结下吧!🎉
个人公众号:码上积木,天天三问面试题,详细剖析,助你成为offer收割机。
谢谢你的阅读,若是你以为写的还行,就点个赞支持下吧!感谢!
你的一个👍,就是我分享的动力❤️。