在php中,进程通讯的方法有FIFO,System V消息队列,SystemV共享内存,System V信号量
这些System V的相关方法默认是不开启的,若是须要,则要再编译安装的时候打开
--enable-sysvsem --enable --sysvshm --enable-sysvmsg
管道和System V消息队列以及System V信号量是内核级的共享信息的方式。两个或多个进程共享驻留在内核中的某些信息。访问共享信息的每次操做涉及对内核的一次系统调用。
消息队列中的每一个消息相关联的类型字段提供了两个特性php
类型字段能够用于标识消息,从而容许多个进程在单个队列上复用(multiplex)消息。 例如,类型字段的某个值用于标识从各个客户到服务器的消息,对于每个客户均为惟一的另外某个值用于标识从服务器到各个客户的消息。每一个客户的进程ID天然能够用做对于每一个客户均为惟一的类型字段。服务器
类型字段能够用做优先级字段。这容许接受者以不一样于先进先出FIFO的某个顺序读出各个消息。这点不一样于管道或FIFO,使用管道时,数据必须是以写入的顺序读出。使用System V消息队列时,消息可以以任意顺序读出,之哟啊跟消息类型关联的值一致就行。并且咱们能够指定MSG_IPC_NOWAIT标志调用msg_receive从某个队列中读出某个给定类型的任意消息,可是没有给定类型的消息存在,那就当即返回。ide
本文主要记录一下php利用消息队列进行通讯的方法和心得。ui
// 生成一个消息队列的key // 生成一个消息队列的key $msgKey = ftok(__FILE__,'w'); /** msg_get_queue() returns an id that can be used to access the System V message queue with the given {key}. The first call creates the message queue with the optional {perms}. A second call to msg_get_queue() for the same {key} will return a different message queue identifier, but both identifiers access the same underlying message queue. */ // 产生一个消息队列 $msgQueue = msg_get_queue($msgKey,0666); // 检查一个队列是否存在 $status = msg_queue_exists($msgKey); var_dump($status); // 查看当前消息的一些详细信息 /** * msg_perm.uid The uid of the owner of the queue. * msg_perm.gid The gid of the owner of the queue. * msg_perm.mode The file access mode of the queue. * msg_stime The time that the last message was sent to the queue. * msg_rtime The time that the last message was received from the queue. * msg_ctime The time that the queue was last changed. * msg_qnum The number of messages waiting to be read from the queue. * msg_qbytes The maximum number of bytes allowed in one message queue. On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb. * msg_lspid The pid of the process that sent the last message to the queue. * msg_lrpid The pid of the process that received the last message from the queue. * */ $msgStat = msg_stat_queue($msgQueue); print_r($msgStat); // 把数据加入消息队列,默认数据会被序列化 msg_send($msgQueue,1,'hahha,1'); msg_send($msgQueue,2,'ooooo,2'); msg_send($msgQueue,1,'xxxxx,3'); // 从消息队列中读取一条消息 msg_receive($msgQueue,1, $message_type, 1024, $message1); msg_receive($msgQueue,1, $message_type, 1024, $message2); //msg_receive($msgQueue,1, $message_type, 1024, $message3,true,MSG_IPC_NOWAIT); msg_receive($msgQueue,2, $message_type, 1024, $message3); $msgStat = msg_stat_queue($msgQueue); print_r($msgStat); msg_remove_queue($msgQueue); echo $message1.PHP_EOL; echo $message2.PHP_EOL; echo $message3.PHP_EOL;
结果以下:this
Array ( [msg_perm.uid] => 0 [msg_perm.gid] => 0 [msg_perm.mode] => 438 [msg_stime] => 0 [msg_rtime] => 0 [msg_ctime] => 1492759388 [msg_qnum] => 0 [msg_qbytes] => 16384 [msg_lspid] => 0 [msg_lrpid] => 0 ) Array ( [msg_perm.uid] => 0 [msg_perm.gid] => 0 [msg_perm.mode] => 438 [msg_stime] => 1492759388 [msg_rtime] => 1492759388 [msg_ctime] => 1492759388 [msg_qnum] => 0 [msg_qbytes] => 16384 [msg_lspid] => 23336 [msg_lrpid] => 23336 ) hahha,1 xxxxx,3 ooooo,2
手册中对msg_send和msg_receive这两个定义以下。code
bool msg_send ( resource $queue , int $msgtype , mixed $message [, bool $serialize = true [, bool $blocking = true [, int &$errorcode ]]] ) msg_send() sends a message of type msgtype (which MUST be greater than 0) to the message queue specified by queue. bool msg_receive ( resource $queue , int $desiredmsgtype , int &$msgtype , int $maxsize , mixed &$message [, bool $unserialize = true [, int $flags = 0 [, int &$errorcode ]]] ) msg_receive() will receive the first message from the specified queue of the type specified by desiredmsgtype.
msg_receive的第二个参数desiredmsgtype ,指定从队列中获取什么样的消息。队列
若是desiredmsgtype等于0,那么就返回改队列中的第一个消息。每个消息队列都是做为一个先进先出的链表维护,所以type为0,返回该队列中最先的消息。(可是这个状况下,会出现阻塞,$flags要设置为MSG_IPC_NOWAIT)进程
若是desiredmsgtype 大于0,那就返回其类型值为desiredmsgtype的第一个消息。ip
若是desiredmsgtype 小于0,那就返回其类型值小于或等于desiredmsgtype参数的绝对值的消息中类型最小的第一个消息。(可是这个状况下,会出现阻塞,$flags要设置为MSG_IPC_NOWAIT)内存
若是你要读取的那类消息不存在,程序就会阻塞,直到有对应类型的消息写入到队列里。固然,能够经过设置$flags = MSG_IPC_NOWAIT来设置为非阻塞。
而后就是多进程间通讯了。代码以下:
// 获取消息队列key $key = ftok(__FILE__,'w'); // 建立一个消息队列 $queue = msg_get_queue($key); $child = []; $num = 5; $result = []; for($i=0;$i<$num;$i++){ $pid = pcntl_fork(); if($pid == -1) { die('fork failed'); } else if ($pid > 0) { $child[] = $pid; } else if ($pid == 0) { $sleep = rand(1,4); msg_send($queue,2,array('name' => $i.'~'.$sleep)); sleep($sleep); exit(0); } } while(count($child)){ foreach($child as $k => $pid) { $res = pcntl_waitpid($pid,$status,WNOHANG); if ($res == -1 || $res > 0 ) { unset($child[$k]); msg_receive($queue,2,$message_type,1024,$data); $result[] = $data; } } } msg_remove_queue($queue); print_r($result);
结果以下:
Array ( [0] => Array ( [name] => 1~3 ) [1] => Array ( [name] => 0~3 ) [2] => Array ( [name] => 3~2 ) [3] => Array ( [name] => 4~4 ) [4] => Array ( [name] => 2~4 ) )