php 事务处理,ActiveMQ的发送消息,与处理消息 php ActiveMQ的发送消息,与处理消息

能够经过链式发送->处理->发送。。。的方式处理相似事务型业务逻辑

 

好比 发送一个注册消息,消息队列处理完注册之后,紧接着发送一个新手优惠券赠送,赠送完再发一个其它后续逻辑处理的消息等待后续队列处理php

 

 

php ActiveMQ的发送消息,与处理消息

咱们以一个简单的用户注册为例,当用户点击注册按钮后,咱们发送一个消息,后台php接收到该消息而后处理。html

1.php代码以下:前端

1
2
3
4
5
6
7
8
9
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
 
$obj  new  Stdclass();
//下面这些数据,实际中是用户经过前端页面post来的,这里只作演示
$obj ->username =  'test' ;
$obj ->password =  '123456' ;
//发送一个注册消息到队列,咱们这里模拟用户注册
$stomp ->send( '/queue/userReg' , json_encode( $obj ));

2.php代码以下:数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
//订阅只对一个有效,若是启动多个脚本,只有一个会接收到消息
$stomp ->subscribe( '/queue/userReg' );
 
while (true) {
     //判断是否有读取的信息
     if ( $stomp ->hasFrame()) {
         $frame  $stomp ->readFrame();
 
         $data  = json_decode( $frame ->body, true);
         var_dump( $data );
 
         //咱们经过获取的数据
         //处理相应的逻辑,好比存入数据库,发送验证码等一系列操做。
         //$db->query("insert into user values('{$username}','{$password}')");
         //sendVerify();
 
         //表示消息被处理掉了,ack()函数很重要
         $stomp ->ack( $frame );
     }
     sleep(1);
}

分别运行上面两个脚本文件json

1
2
> /data/php56/bin/php 1.php
> /data/php56/bin/php 2.php

咱们还能够把上面的2.php代码分红多步执行。tcp

2.php代码以下:函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
$stomp ->subscribe( '/queue/userReg' );
 
while (true) {
     //判断是否有读取的信息
     if ( $stomp ->hasFrame()) {
         $frame  $stomp ->readFrame();
 
         $data  = json_decode( $frame ->body, true);
 
         //注册信息入库
         //$ret = db->query("insert into user values('{$data['username']}', '{$data['password']}')");
         //这里演示直接设成true了
         $ret  = true;
         if ( $ret ) {
             echo  $data [ 'username' ],  '入库成功' , PHP_EOL;
             //若是入库成功,再次把数据发送到另外一个消息队列中,进行下一步处理
             $stomp ->send( '/queue/sendVerify' $frame ->body);
 
             $stomp ->ack( $frame );
         }
     }
     sleep(1);
}

3.php代码以下:post

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
$stomp ->subscribe( '/queue/sendVerify' );
 
while (true) {
     //判断是否有读取的信息
     if ( $stomp ->hasFrame()) {
         $frame  $stomp ->readFrame();
 
         $data  = json_decode( $frame ->body, true);
 
         //$ret = sendVerify()发送验证码,实际中应该是请求某接口
         $ret  = true;
         if ( $ret ) {
             echo  $data [ 'username' ],  '发送验证码成功' , PHP_EOL;
 
             $stomp ->ack( $frame );
         }
     }
     sleep(1);
}

再次分别运行上面的三个脚本url

1
2
3
> /data/php56/bin/php 1.php
> /data/php56/bin/php 2.php
> /data/php56/bin/php 3.php