实例分析
前面咱们看了AMQP说明文档, 对AMQP有了大致的了解, 本文从实例出发再过一遍AMQP的基本操做.css
准备
环境
RabbitMQ server 3.7.16
RabbitMQ client 5.7.3java
客户端代码使用的是RabbitMQ官网教程, 以下:网络
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel();) { boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); String message = String.join(" ", "dMessage......."); channel.exchangeDeclare("mind", "direct"); channel.basicPublish("mind", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } }
下面是用wireshark抓包结果并发
imageapp
咱们后面以编码No来具体分析less
抓包分析
imagesocket
1-6是tcp建立链接的三次握手步骤, 这里不作过多分析tcp
imageoop
7-24是amqp建立链接的过程, 咱们能够对照前面的博客中的说明文档来分析这里. 每次一端向另外一端发送信息, 另外一端在接收到后都会发送一个ack表示接收到了.学习
image
1 在tcp链接建立后, 客户端会向服务端发送协议版本信息, 这里是amqp的0.9.1版本, 服务端会校验版本是否接受, 若是不符合要求会返回错误信息, 这里只有正确信息, 后面咱们本身实现客户端的时候能够实现一个错误例子.
image
2 服务端校验协议经过后, 会向客户端发送建立链接请求Connection.Start, 客户端在准备好后会返回一个Connection.Start-Ok. 接着服务端发送Connection.Tune与客户端进行参数调试, 调试的内容有Channel最大数量, Frame最大长度等. 客户端在调试后发送Connection.Tune-OK. 这个阶段就是对链接的参数调试.
image
3 参数调试以后, 客户端请求服务端打开链接Connection.Open, 服务端打开以后会返回Connection.Open-Ok. Connection打开成功后, 客户端请求打开通道Channel.Open, 服务端打开以后返回Channel.Open-Ok. 至此链接建立成功.
image
4 链接建立成功以后, 客户端进行队列和exchange的声明, Queue.Declare -> Queue.Declare-Ok, Exchange.Declare -> Exchange.Declare-Ok.
image
5 有了Exchange后, 客户端向Exchange发送信息, 咱们能够看到发送的Exchange, 和发送的内容
image
image
6 发送内容结束后, 客户端关闭, 先关闭通道Channel, 而后关闭Connection.
image
7 最后是tcp关闭链接
代码分析
下面咱们从代码层面分析这个过程, 下面是一个整体的时序图, 你们能够参考
image
下面咱们仍是按照抓包中看到的顺序来分析代码
建立tcp链接
代码很简单
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection();
咱们重点看factory.newConnection(); 顺着方法咱们很快找到了AutorecoveringConnection的init()方法
public void init() throws IOException, TimeoutException { this.delegate = this.cf.newConnection(); this.addAutomaticRecoveryListener(delegate); }
在this.cf.newConnection()中重点看下
FrameHandler frameHandler = factory.create(addr, connectionName()); RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector); conn.start(); metricsCollector.newConnection(conn); return conn;
你们debug代码能够看到factory是SocketFrameHandlerFactory的实例, 因此create中的代码以下:
public FrameHandler create(Address addr, String connectionName) throws IOException { String hostName = addr.getHost(); int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl); Socket socket = null; try { socket = createSocket(connectionName); configurator.configure(socket); socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout); return create(socket); } catch (IOException ioe) { quietTrySocketClose(socket); throw ioe; } }
这里咱们能够看到java网络的底层代码Socket,
socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
这句代码完成了tcp的链接的建立工做.
(准备看这里源码的时候就想着确定有个地方在作这步操做, 但就是找不到, 最后一点一点debug找到的...)
建立Connection
在上一步的最后, 把socket对象封装到一个FrameHandler实例中, 从这里咱们能够猜测, 后面全部消息的通讯都跟这个FrameHandler分不开.
咱们继续看, 返回以后
FrameHandler frameHandler = factory.create(addr, connectionName()); RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector); conn.start();
使用FrameHandler实例构造了一个Connection对象, 而后调用了start()方法, 实际调用的是父类AMQConnection方法, 这个也是整个链接过程的重点
这里代码比较长, 咱们选择一些重要的一点一点看
initializeConsumerWorkService(); // 初始化工做线程 initializeHeartbeatSender(); // 初始化心跳线程
// Make sure that the first thing we do is to send the header, // which should cause any socket errors to show up for us, rather // than risking them pop out in the MainLoop // 确保咱们在最开始发送的信息头在发生错误的时候不会出如今MainLoop中 // 这个实体就是为了接收在发送给服务端版本后接收服务端的Connection.Start方法的 AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation(); // We enqueue an RPC continuation here without sending an RPC // request, since the protocol specifies that after sending // the version negotiation header, the client (connection // initiator) is to wait for a connection.start method to // arrive. // 咱们这里没有经过发送请求获取响应, 是由于服务端在接收到版本信息后会主动发送信息 _channel0.enqueueRpc(connStartBlocker);
enqueueRpc里面以下, 就是在循环中等待服务端信息接收成功的通知
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) { synchronized (_channelMutex) { boolean waitClearedInterruptStatus = false; while (_activeRpc != null) { try { _channelMutex.wait(); // 后面在接收到Connection.Start方法后会通知 } catch (InterruptedException e) { //NOSONAR waitClearedInterruptStatus = true; // No Sonar: we re-interrupt the thread later } } if (waitClearedInterruptStatus) { Thread.currentThread().interrupt(); } // 获取到通知后更新实体信息 _activeRpc = rpcWrapperSupplier.get(); } }
_frameHandler.sendHeader(); // 发送版本信息, 对应抓包7
this._frameHandler.initialize(this); // 初始化, 主要是启动了一个MainLoop线程用于获取服务端信息
MainLoop线程的核心代码代码
Frame frame = _frameHandler.readFrame(); readFrame(frame);
_frameHandler.readFrame() 内部代码以下, 这里你们能够查看译文中的2.3.5 Frame Details 帧的细节部分, 对照客户端是如何构造的, Frame结构以下
image
public static Frame readFrom(DataInputStream is) throws IOException { int type; int channel; try { type = is.readUnsignedByte(); // 一个字节的类型信息 } catch (SocketTimeoutException ste) { // System.err.println("Timed out waiting for a frame."); return null; // failed } if (type == 'A') { // 这里是处理, 若是服务端不支持客户端的版本, 会发送支持的版本信息, 开头是'A' /* * Probably an AMQP.... header indicating a version * mismatch. */ /* * Otherwise meaningless, so try to read the version, * and throw an exception, whether we read the version * okay or not. */ protocolVersionMismatch(is); // 这里面会抛出异常 } channel = is.readUnsignedShort(); // 两个个字节的channel编号 int payloadSize = is.readInt(); // 4个字节的payload大小 byte[] payload = new byte[payloadSize]; is.readFully(payload); // 读取payloadSize大小的字节 int frameEndMarker = is.readUnsignedByte(); // 一个字节的尾部 if (frameEndMarker != AMQP.FRAME_END) { throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker); } // 构造对象并返回 return new Frame(type, channel, payload); }
上一步主要是对信息的封装, 下面是客户端对封装对象的处理
private void readFrame(Frame frame) throws IOException { if (frame != null) { _missedHeartbeats = 0; if (frame.type == AMQP.FRAME_HEARTBEAT) { // Ignore it: we've already just reset the heartbeat counter. } else { if (frame.channel == 0) { // the special channel 0通道是在建立链接过程当中使用的 _channel0.handleFrame(frame); // 这一步就是将Connection.Start内容放到了channel提早设置的实体中 } else { if (isOpen()) { // If we're still _running, but not isOpen(), then we // must be quiescing, which means any inbound frames // for non-zero channels (and any inbound commands on // channel zero that aren't Connection.CloseOk) must // be discarded. ChannelManager cm = _channelManager; if (cm != null) { ChannelN channel; try { channel = cm.getChannel(frame.channel); } catch(UnknownChannelException e) { // this can happen if channel has been closed, // but there was e.g. an in-flight delivery. // just ignoring the frame to avoid closing the whole connection LOGGER.info("Received a frame on an unknown channel, ignoring it"); return; } channel.handleFrame(frame); } } } } } else { // Socket timeout waiting for a frame. // Maybe missed heartbeat. handleSocketTimeout(); } }
咱们回到start()方法中, 获取Connection.Start方法, 而后设置一些服务单发过来的参数
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();
而后按照是响应Start.Ok, Tune方法, 对应抓包9-16
do { Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); try { Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod(); if (serverResponse instanceof AMQP.Connection.Tune) { connTune = (AMQP.Connection.Tune) serverResponse; } else { challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, username, password); } } catch (ShutdownSignalException e) { Method shutdownMethod = e.getReason(); if (shutdownMethod instanceof AMQP.Connection.Close) { AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod; if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) { throw new AuthenticationFailureException(shutdownClose.getReplyText()); } } throw new PossibleAuthenticationFailureException(e); } } while (connTune == null);
获取到调试信息, 设置本地参数
int channelMax = negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax()); _channelManager = instantiateChannelManager(channelMax, threadFactory); int frameMax = negotiatedMaxValue(this.requestedFrameMax, connTune.getFrameMax()); this._frameMax = frameMax; int heartbeat = negotiatedMaxValue(this.requestedHeartbeat, connTune.getHeartbeat());
setHeartbeat(heartbeat); 启动心跳线程
发送调整完毕方法TuneOk, 并请求打开链接Open
_channel0.transmit(new AMQP.Connection.TuneOk.Builder() .channelMax(channelMax) .frameMax(frameMax) .heartbeat(heartbeat) .build()); _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder() .virtualHost(_virtualHost) .build());
至此Connection的链接已经建立并打开
建立Channel
接下来是Channel的建立, 咱们前面代码中使用的Channel是特殊的, 专门用于建立Connection, 下面建立的是为了后面发送队列消息使用的Channel.
Channel channel = connection.createChannel() // 入口
根据AMQP文档, 建立Channel须要客户端发送Channel.Open方法而后接收服务端的Channel.OpenOk, 咱们从抓包中也能够观察到. 咱们一步一步跟踪代码, 代码层级比较深, 这里给出调用逻辑, 从下到上(对, 没错, 这就是建立Channel报错日志截取了部分)
com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:295) com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133) com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:182) com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:555) com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:165)
privateRpc的代码咱们看下
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{ SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m); rpc(m, k); // 发送Channel.Open 方法 // At this point, the request method has been sent, and we // should wait for the reply to arrive. // 这里咱们已经发送了请求, 咱们应该等待响应 // Calling getReply() on the continuation puts us to sleep // until the connection's reader-thread throws the reply over // the fence or the RPC times out (if enabled) // 调用getReply()方法会阻塞直到获取到结果或者超时 if(_rpcTimeout == NO_RPC_TIMEOUT) { return k.getReply(); } else { try { return k.getReply(_rpcTimeout); } catch (TimeoutException e) { throw wrapTimeoutException(m, e); } } }
接收Channel.OpenOk方法是由MainLoop线程完成的, 方式相似以前获取Connection.Start方法.
消息发送
至此AMQP链接算是彻底建立完毕, 下面就是消息队列相关. 首先是队列和Exchange的声明, 这里队列的声明其实没有什么用, 代码这么写就是为了看下声明过程
channel.queueDeclare(QUEUE_NAME, durable, false, false, null); channel.exchangeDeclare("mind", "direct"); channel.basicPublish("mind", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
这里声明的方式很是简单你们跟着代码很容易明白, queue和Exchange的声明过程基本同样, 不一样的是queue在声明前会校验下队列的合法性(长度). 它们获取响应结果的方式和Channel.OpenOk的获取方式一毛同样.
消息的发送过程也是发送一个AMQPCommand, 可是细节不少, 准备在后面实现客户端的部分再详细看.
关闭链接
程序执行结束, 执行try-with-resources部分, 自动执行close()方法, 执行顺序从下到上, 也就是先执行Channel的close(), 而后Connection的close(); 从抓包中也能够看到先发送的Channel.close方法, 再发送Connection.close方法. 代码细节的部分这里就不展开了, 会放到后面代码实现上.
总结
总体过了一遍主要流程, 后面咱们会本身实现一个简单客户端加深下理解; 这个过程当中除了了解了客户端的操做流程外, 对java的部分知识也学习了一下
try-with-resources 在关闭时, 执行关闭的顺序和声明顺序相反;
try-with-resources 也能够有catch和finally块, 它们是在try-with-resources声明关闭以后执行的.
java线程状态流转
image
客户端实现(待完成~)
今天咱们的目标是实现rabbitmq客户端, 并使用该客户端发送消息到指定Exchange中.
tcp链接建立
超级简单
socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); // 保存链接输入输出流 inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
抓包
image
发送信息头
咱们经过抓包和源码知道, 发送头就是发送了"AMQP0091"
private int major = 0; private int minor = 9; private int revision = 1;
outputStream.write("AMQP".getBytes()); outputStream.write(0); outputStream.write(major); outputStream.write(minor); outputStream.write(revision); outputStream.flush();
抓包结果
image
能够看到服务端已经承认协议, 并发送了Connection.Start方法过来.
若是咱们发送的协议服务端不认识会怎么样, 咱们把major改成2试试
抓包结果
image
本身看下咱们发的内容以下
image
咱们是发送了0291, 抓包是支持AMQP协议的, 因此这里应该是不认识了, 因此显示为unknown version, 可是让我不理解的是服务端返回的结果也是unknown version, 根据AMQP文档中的说明, 服务端这时应该返回支持的协议, 咱们点开看下
image
的确是0091正常的协议, 可是抓包软件没有显示出来, 很奇怪~