官网地址:http://activemq.apache.org/php
解压后基本目录结构:html
进入 bin 目录:前端
./activemq star # 启动activeMQ服务 ./activemq stop # 关闭activeMQ服务
ActiveMQ 默认启动时,启动了内置的 jetty 服务器,提供一个用于监控 ActiveMQ 的 admin 应用(默认端口为8161,默认帐号密码都是admin):git
STOMP 是一个简单的可互操做的协议,被用于经过中间服务器在客户端之间进行异步消息传递。它定义了一种在客户端与服务端进行消息传递的文本格式。github
STOMP 是基于帧的协议。以 command 字符串开始,以 EOL 结束,command 下面是0个或多个 <key>:<value> 格式的 header 条目。每一个条目由 EOL 结束。一个空白行(即额外EOL)表示 header 结束和 body 开始。body 链接着 NULL 字节(ASCII 中用 ctrl+@表示,看起来是 ^@)。web
大概的协议格式:apache
CONNECT accept-version:1.0,1.1,2.0 host:www.jtthink.com ^@
STOMP 1.2规范:https://stomp.github.io/stomp-specification-1.2.htmljson
下载地址:http://pecl.php.net/package/stomp服务器
$ wget http://pecl.php.net/get/stomp-2.0.2.tgz $ tar zxf stomp-2.0.2.tgz $ cd stomp-2.0.2 $ phpize $ ./configure --enable-stomp --with-php-config=/usr/local/php/bin/php-config $ make && make install
完成后能够在结果中看见 extension 安装路径,在 php.ini 中添加节点:app
[stomp] extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/stomp.so
验证安装结果:
php -m | grep Stomp
进入管理后台,建立一个新的 Queue:test
并在该 test 队列中发送几条消息
<?php //61613是STOMP链接默认的端口,在ActiveMQ目录conf/activemq.xml文件可修改 $broker = 'tcp://ActiveMQ服务IP:61613'; $queue = '/queue/test'; try { $stomp = new Stomp($broker); $stomp->subscribe($queue); while($stomp->hasFrame()) { //订阅一个消息队列 $frame = $stomp->readFrame(); //输出消息体内容 echo $frame->body.PHP_EOL; } } catch(StompException $e) { echo $e->getMessage(); }
运行上面的代码:
以最简单的用户注册为例,当用户提交注册时,分别向部署好的多个 ActiveMQ 中间站发送消息,来处理不一样的业务流程(如信息入库、验证短信发送等)。
前端注册页面:
<?php if(isset($_POST["username"])) { $broker = 'tcp://ActiveMQ服务IP:61613'; $queue1 = '/queue/userreg'; // 用户数据入库队列 $queue2 = '/queue/usersmsg'; // 用户短信发送队列 // 模拟数据 $userID = rand(50,500); $user = new stdClass(); $user->userID = $userID; $user->userName = $_POST["username"]; $user->userPass = $_POST["userpass"]; $user->regDate = date('Y-m-d h:i:s'); $msg = new stdClass(); $msg->userID = $userID; // 开启事务发送消息 $stomp = new Stomp($broker,"txl"); $stomp->begin('userReg'); if($stomp->send($queue1, json_encode($user), array('transaction'=>'userReg')) && $stomp->send($queue2, json_encode($msg), array('transaction'=>'userReg'))) { $stomp->commit('userReg'); // 提交事务 } unset($stomp); } ?> <html> <head> <style> .container{margin:50px auto;width:500px;} .container div{line-height:21pt;margin-top:30px} .container .text{width:150px;height:25px;} </style> </head> <body> <div class="container"> <form method="post"> <h3>用户注册演示界面</h3> <div> 用户名:<input type="text" class="text" name="username"/> </div> <div> 密 码:<input type="text" class="text" name="userpass"/> </div> <div> <input type="submit" value="提交注册"> </div> </form> </div> </body> </html>
用户信息入库中间站:
<?php $broker = 'tcp://ActiveMQ服务IP:61613'; $queue = '/queue/userreg'; try { $stomp = new Stomp($broker,'txl'); // 订阅 userreg 队列 $stomp->subscribe($queue); while(true) { if($stomp->hasFrame()) { $frame = $stomp->readFrame(); $userObj = json_decode($frame->body); // 这里调用用户数据入库接口 echo $frame->body.' user reg is done...'.PHP_EOL; $stomp->ack($frame); // 该条记录已处理完毕 } } } catch(StompException $e) { echo $e->getMessage(); }
用户短信发送中间站:
<?php $broker = 'tcp://ActiveMQ服务IP:61613'; $queue = '/queue/usersmsg'; try { $stomp = new Stomp($broker,'txl'); // 订阅 short message 队列 $stomp->subscribe($queue); while(true) { if($stomp->hasFrame()) { $frame = $stomp->readFrame(); $userObj = json_decode($frame->body); // 这里调用用户短信发送接口 echo $frame->body.' user short message is done...'.PHP_EOL; $stomp->ack($frame); // 该条记录已处理完毕 } } } catch(StompException $e) { echo $e->getMessage(); }
测试结果: