【伍哥原创】php
1,前言python
RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,所以也是继承了这些优势。mysql
AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变更),以下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这二者都在 Server 端,又称做 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,一般有 Producer 和 Consumer 两种类型:
RabbitMQ做为一个很是成熟的消息队列技术方案,也应用到了豆荚商城项目里面。
2,邮件服务:将慢动做从请求中分离出来sql
做为一个商城,天然少不了给用户发送邮件。好比注册的时候要发送确认邮件,下单之后发送订单邮件,推广信息也须要发送邮件,相似的状况很是多。
实现邮件发送的一般作法比较简单,就是在HTTP请求中一并完成邮件发送这个动做。而发送邮件依赖于SMTP服务。在小并发的环境下,一切都工做的很正常。缓存
可是,当并发请求上到必定的程度,问题就来了。HTTP必须等待SMTP这个慢动做,若是你须要带附件的话,状况就更糟糕了。
另一个问题来自于SMTP,当请求过于频密的时候,SMTP就出现超负荷工做的状况,这样各类邮件发送的异常状况就在所不免了。架构
怎么才能很好的解决这个问题呢?
答案在前面就给出了,就是创建消息队列机制!
其实原理很是简单,就是在内存维护一个队列(queue),若是要发送一封邮件,就往队列里面写一条消息,也就是所谓的信息生产者。
再创建一个进程,处理队列里面的邮件发送,就是所谓的信息消费者。并发
因为商城是用PHP开发的,因此就须要支持amqp的PHP客户端代码。这里用的是php amqplib (http://code.google.com/p/php-amqplib/)。
首先是链接rabbitmq,获取一个通道,而后是发送消息,最后断开通道和链接。下面是代码示例:eclipse
1
2
3
4
5
6
7
8
9
10
11
|
$queue
=
'mail_queue'
$conn
=
new
AMQPConnection(
$config
[
'host'
],
$config
[
'port'
],
$config
[
'user'
],
$config
[
'pass'
]);
$channel
=
$conn
->channel();
$channel
->queue_declare(
$queue
, false, true, false, false);
$send_data
= serialize(
$send_msg
);
//数据先序列化一下,也可使用JSON格式化
$msg
=
new
AMQPMessage(
$send_data
,
array
(
'delivery_mode'
=> 2)
//让消息持久化
);
$channel
->basic_publish(
$msg
,
''
,
$queue
);
$channel
->close();
$conn
->close();
|
在项目里固然不能这样写,应该封装成一个分布式服务接口,融入到整个系统代码架构里面,方便其余地方,好比controller,model的使用。分布式
接下来是实现消息的消费程序。这里用的是python的pika。
首先是链接rabbitmq,获取通道,开始消费队列里面的信息。如下的代码写在类里面:memcached
1
2
3
4
5
6
7
|
self
.connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
self
.rmq_host))
self
.channel
=
self
.connection.channel()
self
.channel.queue_declare(queue
=
self
.rmq_queue, durable
=
True
)
self
.channel.basic_qos(prefetch_count
=
1
)
# callback里面就是具体处理消息的地方
self
.channel.basic_consume(
self
.callback, queue
=
self
.rmq_queue)
self
.channel.start_consuming()
|
callback回调
1
2
3
4
5
6
7
8
9
10
11
|
def
callback(
self
, ch, method, properties, body):
time.sleep(
1
)
#休息一秒才发送邮件
msg
=
phpserialize.loads(body)
#按PHP的格式作反序列化
validateutil
=
ValidateUtil()
if
validateutil.isEmail(msg[
'mail_to'
]):
mail
=
Mailer()
mail.setMailTo(msg[
'mail_to'
])
mail.setMailSubject(msg[
'mail_subject'
])
mail.setMailHtmlBody(msg[
'mail_body'
])
mail.sendEmail()
ch.basic_ack(delivery_tag
=
method.delivery_tag)
|
这里只是骨干代码。应该创建一个python的project,在eclipse(加PyDev)里面管理起来。
你还须要用到:配置文件以及配置文件解析库,系统日志,Mailer,phpserialize,ValidateUtil等等辅助类库。
关于PyDev请参考:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-pydev/
熟悉了邮件的应用,后面扩展到手机短信通知服务、站内通讯消息等等就很是方便了。固然,面对这样的需求,咱们就须要在实现时考虑使用可扩展的消息队列的模型了。
3,页面访问统计:经过写缓存减轻DB的负载
对于商城来讲,都有商品推荐的功能,好比人气商品推荐。怎么定义人气呢?通常看商品页面的访问量。这里就出现了页面统计的需求了。统计的数据通常须要持久化到DB。
通常来讲,某商品页面被访问一次,就应该插入或者更新一次DB记录。这彻底没有什么技术难度。
然而当并发链接上到必定水平,DB的性能问题就出来了。由于DB,好比MYSQL,都有必定的锁机制。当出现频繁的insert或者update时,select的速度天然就受到很大制约了。并且打开一次页面就触发一次统计,也就要操做一次DB,那DB不哭才怪!
有见及此,咱们就经过消息队列实现了页面访问统计的写缓存。
何谓写缓存?对于某些不须要高实时的数据,好比咱们这里的页面访问统计,能够把更新操做先缓存起来,当累积到必定程度时,才进行一次实际的更新。这样的好处是显而易见的,DB操做少了不少,并且也避免的DB锁机制引起的性能问题。
实现写缓存的方式有不少,好比经过memcached来实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
$page
=
'goods_100052'
;
$memcache
= memcache_connect(
'192.168.1.100'
, 11711);
// 经过memcached提供的原子加操做,避免并发访问带来的统计出错.
$count
=
$memcache
->increment(
$page
, 1);
if
(!
$count
) {
$memcache
->add(
$page
, 1, false, 0);
exit
;
}
if
(
$count
>= 1000) {
$sql
=
"update `goods_viewlog` set `count` = `count`+{$count} where `page` = $page"
;
$result
=
$mysql
->query(
$sql
);
if
(
$result
) {
// 更新成功后,把缓存统计清零
$memcache
->set(
$page
, 0, false, 0);
}
}
|
咱们这里采用了消息队列的实现方式。
消息生产者代码和消息消费者代码和上面介绍的邮件是几乎同样的。惟一不一样在于回调函数那里。这里就再也不重复说明了。
4,总结
咱们在开发消息队列应用特别要注意的是要先搞清楚消息队列的主要概念和机制:好比交换,队列,绑定,持久化等等。
搞清楚了之后,再根据具体的应用类型,定义好消息队列模型。
具体能够参考伍哥前面的文章。