RabbitMq与Redis的使用

<!DOCTYPE html>css

<html> <head> <title>RabbitMQRedisMysql</title> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <style type="text/css"> /* GitHub stylesheet for MarkdownPad (http://markdownpad.com) */ /* Author: Nicolas Hery - http://nicolashery.com */ /* Version: b13fe65ca28d2e568c6ed5d7f06581183df8f2ff */ /* Source: https://github.com/nicolahery/markdownpad-github */html

/* RESET =============================================================================*/python

html, body, div, span, applet, object, iframe, h1, h2, h3, h4, h5, h6, p, blockquote, pre, a, abbr, acronym, address, big, cite, code, del, dfn, em, img, ins, kbd, q, s, samp, small, strike, strong, sub, sup, tt, var, b, u, i, center, dl, dt, dd, ol, ul, li, fieldset, form, label, legend, table, caption, tbody, tfoot, thead, tr, th, td, article, aside, canvas, details, embed, figure, figcaption, footer, header, hgroup, menu, nav, output, ruby, section, summary, time, mark, audio, video { margin: 0; padding: 0; border: 0; }mysql

/* BODY =============================================================================*/git

body { font-family: Helvetica, arial, freesans, clean, sans-serif; font-size: 14px; line-height: 1.6; color: #333; background-color: #fff; padding: 20px; max-width: 960px; margin: 0 auto; }github

body>*:first-child { margin-top: 0 !important; }web

body>*:last-child { margin-bottom: 0 !important; }redis

/* BLOCKS =============================================================================*/sql

p, blockquote, ul, ol, dl, table, pre { margin: 15px 0; }mongodb

/* HEADERS =============================================================================*/

h1, h2, h3, h4, h5, h6 { margin: 20px 0 10px; padding: 0; font-weight: bold; -webkit-font-smoothing: antialiased; }

h1 tt, h1 code, h2 tt, h2 code, h3 tt, h3 code, h4 tt, h4 code, h5 tt, h5 code, h6 tt, h6 code { font-size: inherit; }

h1 { font-size: 28px; color: #000; }

h2 { font-size: 24px; border-bottom: 1px solid #ccc; color: #000; }

h3 { font-size: 18px; }

h4 { font-size: 16px; }

h5 { font-size: 14px; }

h6 { color: #777; font-size: 14px; }

body>h2:first-child, body>h1:first-child, body>h1:first-child+h2, body>h3:first-child, body>h4:first-child, body>h5:first-child, body>h6:first-child { margin-top: 0; padding-top: 0; }

a:first-child h1, a:first-child h2, a:first-child h3, a:first-child h4, a:first-child h5, a:first-child h6 { margin-top: 0; padding-top: 0; }

h1+p, h2+p, h3+p, h4+p, h5+p, h6+p { margin-top: 10px; }

/* LINKS =============================================================================*/

a { color: #4183C4; text-decoration: none; }

a:hover { text-decoration: underline; }

/* LISTS =============================================================================*/

ul, ol { padding-left: 30px; }

ul li > :first-child, ol li > :first-child, ul li ul:first-of-type, ol li ol:first-of-type, ul li ol:first-of-type, ol li ul:first-of-type { margin-top: 0px; }

ul ul, ul ol, ol ol, ol ul { margin-bottom: 0; }

dl { padding: 0; }

dl dt { font-size: 14px; font-weight: bold; font-style: italic; padding: 0; margin: 15px 0 5px; }

dl dt:first-child { padding: 0; }

dl dt>:first-child { margin-top: 0px; }

dl dt>:last-child { margin-bottom: 0px; }

dl dd { margin: 0 0 15px; padding: 0 15px; }

dl dd>:first-child { margin-top: 0px; }

dl dd>:last-child { margin-bottom: 0px; }

/* CODE =============================================================================*/

pre, code, tt { font-size: 12px; font-family: Consolas, "Liberation Mono", Courier, monospace; }

code, tt { margin: 0 0px; padding: 0px 0px; white-space: nowrap; border: 1px solid #eaeaea; background-color: #f8f8f8; border-radius: 3px; }

pre>code { margin: 0; padding: 0; white-space: pre; border: none; background: transparent; }

pre { background-color: #f8f8f8; border: 1px solid #ccc; font-size: 13px; line-height: 19px; overflow: auto; padding: 6px 10px; border-radius: 3px; }

pre code, pre tt { background-color: transparent; border: none; }

kbd { -moz-border-bottom-colors: none; -moz-border-left-colors: none; -moz-border-right-colors: none; -moz-border-top-colors: none; background-color: #DDDDDD; background-image: linear-gradient(#F1F1F1, #DDDDDD); background-repeat: repeat-x; border-color: #DDDDDD #CCCCCC #CCCCCC #DDDDDD; border-image: none; border-radius: 2px 2px 2px 2px; border-style: solid; border-width: 1px; font-family: "Helvetica Neue",Helvetica,Arial,sans-serif; line-height: 10px; padding: 1px 4px; }

/* QUOTES =============================================================================*/

blockquote { border-left: 4px solid #DDD; padding: 0 15px; color: #777; }

blockquote>:first-child { margin-top: 0px; }

blockquote>:last-child { margin-bottom: 0px; }

/* HORIZONTAL RULES =============================================================================*/

hr { clear: both; margin: 15px 0; height: 0px; overflow: hidden; border: none; background: transparent; border-bottom: 4px solid #ddd; padding: 0; }

/* TABLES =============================================================================*/

table th { font-weight: bold; }

table th, table td { border: 1px solid #ccc; padding: 6px 13px; }

table tr { border-top: 1px solid #ccc; background-color: #fff; }

table tr:nth-child(2n) { background-color: #f8f8f8; }

/* IMAGES =============================================================================*/

img { max-width: 100% } </style>

</head> <body> <p>RabbitMQ 消息队列<br /> python里有threading QUEUE 只用于线程间交互,进程QUEUE 用于父进程与子进程或者是兄弟进程<br /> RabbitMQ采用消息轮询的方式发送消息。一个一个的给每一个消费者<br /> 应用之间使用socket实现数据共享<br /> 连接几个应用的中间商著名的有:<br /> 1. RabbitMQ 2. ZeroMQ 3. ActiveMQ<br /> RabbitMQ使用<br /> 生产者:<br /> 1. 引用pika模块<br /> <code>import pika</code><br /> 2. 创建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 声明一个管道<br /> <code>channel=connection.channel()</code><br /> 4. 在管道中声明队列<br /> <code>channel.queue_declare(queue='队列名')</code><br /> 5. 经过管道发送消息 rounting<em>key就是队列名字 <br /> <code>channel.basic_publish(exchange='',routing_key='队列名',body='Hello World!')</code><br /> 6. 关闭队列,不用关闭管道<br /> <code>connection.close()</code> <br /> 消费者(多是其余机器,能够跨机器) 1. 引用pika模块<br /> <code>import pika</code><br /> 2. 创建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 声明一个管道<br /> <code>channel=connection.channel()</code><br /> 4. 在管道中声明队列<br /> <code>channel.queue_declare(queue='队列名')</code><br /> 5. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties包含发消息端的设置信息<br /> <code>def callback(ch,method,properties,body): print(ch,method,properties,body) print(&quot;[x] Received %r&quot;%body) ch.basic_ack(delivery_tag=method.delivery_tag)手动确认消息处理完,否则消息一直不销毁</code><br /> 6. 消费消息,定义函数的目的,若是收到消息,就用定义的函数处理消息no</em>ack参数消息确认,当为True时消息不等消费者确认消息队列就销毁消息,为False时须要等待消费者处理完消息的确认消息队消息队列才销毁消息(判断的是socket是否断开)<br /> <code>channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)</code><br /> 7. 启动管道接收消息,启动后一直处于开启状态,没有消息就等待。<br /> <code>channel.start_consuming()</code> </p> <h3>RabbitMQ消息分发轮询</h3> <p>采用轮询的方式,依次的发给每一个消费者<br /> 生产者会等待消费者肯定处理完消息的回复才会销毁消息。<br /> 当消息执行到一半的时候,消费者断开,消息会保留发送给下一个消费者处理 </p> <h3>消息持久化</h3> <p>消息必须等消费者手动肯定后,才销毁ch.basic<em>ack(delivery</em>tag=method.delivery_tag)手动确认消息处理完,否则消息一直不销毁<br /> 当RabbitMQ服务中止,服务里的消息队列会销毁。<br /> 若是想保持消息队列的持久化,必须在声明队列的时候设置,durable=True。这样当RabbitMQ服务断开再重启,消息队列仍是存在,消息会销毁<br /> <code>channel.queue_declare(queue='队列名',durable=True)</code> 消息也持久化<br /> <code>channel.basic_publish(exchange='',routing_key='队列名',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))</code> </p> <h3>广播模式</h3> <p>消费者端加channel.basic<em>qos(prefetch</em>count=1),加过这句话实现了,不是按顺序分发,而是看哪个是空闲的,才分发给空闲的消费者消息。多大本事干多少活。<br /> 广播是生产者发消息,全部消费者都收到。<br /> 用exchange实现广播。 fanout:全部bind到此exchange的queue均可以接收消息<br /> direct:经过routingkey和exchange决定的那个惟一的queue能够接收消息<br /> topic:全部符合routingkey(此时能够是一个表达式)的routingkey所bind的queue能够接受消息。 </p> <h5>fanout纯广播</h5> <p>设置管道的时候设置<br /> <code>channel.exchange_declare(exchange='logs',type='fanout')</code><br /> 不声明queue<br /> 生产者:<br /> 1. 引用pika模块<br /> <code>import pika</code><br /> 2. 创建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 声明一个管道<br /> <code>channel=connection.channel()</code><br /> 4. 设置管道的时候设置<br /> <code>channel.exchange_declare(exchange='logs',exchange_type='fanout')</code> <br /> 5. 经过管道发送消息,广播不须要管道名<br /> <code>channel.basic_publish(exchange='logs',routing_key='',body='Hello World!')</code><br /> 6. 关闭队列,不用关闭管道<br /> <code>connection.close()</code> <br /> 消费者(多是其余机器,能够跨机器) 1. 引用pika模块<br /> <code>import pika</code><br /> 2. 创建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 声明一个管道<br /> <code>channel=connection.channel()</code><br /> 4. 设置管道的时候设置<br /> <code>channel.exchange_declare(exchange='logs',exchange_type='fanout')</code> 5. 生成随机queue与exchange转发器绑定。 ``` result=channel.queue<em>declare(exclusive=True)exclusive排他的,单独的,生成随街queue,绑定再exchange上 queue</em>name=result.method.queue<br /> channel.queue<em>bind(exchange='logs',queue=queue</em>name)绑定转发器,接受转发器里的消息,exchange与随机生成的queue绑定 </p> <p><code>6. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print(&quot;[x] Received %r&quot;%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手动确认消息处理完,否则消息一直不销毁 ``` 7. 消费消息,定义函数的目的,若是收到消息,就用定义的函数处理消息no</em>ack参数消息确认,当为True时消息不等消费者确认消息队列就销毁消息,为False时须要等待消费者处理完消息的确认消息队消息队列才销毁消息(判断的是socket是否断开)<br /> <code>channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)</code><br /> 8. 启动管道接收消息,启动后一直处于开启状态,没有消息就等待。<br /> <code>channel.start_consuming()</code> </p> <h5>direct广播 info warning error 划分消息</h5> <p>生产者:<br /> 1. 引用pika模块<br /> <code>import pika</code><br /> 2. 创建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 声明一个管道<br /> <code>channel=connection.channel()</code><br /> 4. 设置管道的时候设置<br /> <code>channel.exchange_declare(exchange='direct_logs',exchange_type='direct')</code> 5. 接受分发消息的级别<br /> <code>severity=sys.argv[1] if len(sys.argv)&gt;1 else 'info' message=' '.join(sys.argv[2:]) or 'hello world!'</code> 5. 经过管道发送消息,广播不须要管道名<br /> <code>channel.basic_publish(exchange='direct_logs', routing_key=severity,#相似指定queue body=message)</code><br /> 6. 关闭队列,不用关闭管道<br /> <code>connection.close()</code><br /> 消费者<br /> 1. 引用pika模块<br /> <code>import pika</code><br /> 2. 创建socket<br /> <code>connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))</code><br /> 3. 声明一个管道<br /> <code>channel=connection.channel()</code><br /> 4. 设置管道的时候设置<br /> <code>channel.exchange_declare(exchange='direct_logs',exchange_type='direct')</code> 5. 生成随机queue与exchange转发器绑定。 ``` result=channel.queue<em>declare(exclusive=True)exclusive排他的,单独的,生成随街queue,绑定再exchange上 queue</em>name=result.method.queue<br /> severities = sys.argv[1:] if not severities: sys.stderr.write(&quot;Usage: %s [info] [warning] [error]\n&quot; % sys.argv[0]) sys.exit(1)</p> <p>for severity in severities: channel.queue<em>bind(exchange='direct</em>logs',queue=queue<em>name,routing</em>key=severity)</p> <p><code>6. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print(&quot;[x] Received %r&quot;%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手动确认消息处理完,否则消息一直不销毁 ``` 7. 消费消息,定义函数的目的,若是收到消息,就用定义的函数处理消息no</em>ack参数消息确认,当为True时消息不等消费者确认消息队列就销毁消息,为False时须要等待消费者处理完消息的确认消息队消息队列才销毁消息(判断的是socket是否断开)<br /> <code>channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)</code><br /> 8. 启动管道接收消息,启动后一直处于开启状态,没有消息就等待。<br /> <code>channel.start_consuming()</code> </p> <h5>topic广播</h5> <p>更细致的消息过滤,包括应用程序,#收全部消息,<em>.info接受带有.info的消息,mysql.</em>接受带mysql的消息 <br /> 生产者<br /> ``` import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>routing<em>key = sys.argv[1] if len(sys.argv) &gt; 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic</em>publish(exchange='topic<em>logs', routing<em>key=routing</em>key, body=message) print(&quot; [x] Sent %r:%r&quot; % (routing</em>key, message)) connection.close() <code>消费者</code> import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>result = channel.queue<em>declare(exclusive=True) queue</em>name = result.method.queue</p> <p>binding<em>keys = sys.argv[1:] if not binding</em>keys: sys.stderr.write(&quot;Usage: %s [binding_key]...\n&quot; % sys.argv[0]) sys.exit(1)</p> <p>for binding<em>key in binding</em>keys: channel.queue<em>bind(exchange='topic<em>logs', queue=queue</em>name, routing</em>key=binding_key)</p> <p>print(' [*] Waiting for logs. To exit press CTRL+C') </p> <p>def callback(ch, method, properties, body): print(&quot; [x] %r:%r&quot; % (method.routing_key, body)) </p> <p>channel.basic<em>consume(callback, queue=queue</em>name, no_ack=True) </p> <p>channel.start_consuming() <br /> ```</p> <h3>RabbitMQ rpc(remote procedure call)远程调用一个方法</h3> <p>即便生产者又是消费者 <br /> start<em>cosuming为阻塞模式,rpc不用阻塞,rpc是执行一会这个再去执行另外一个。process</em>data_events()非阻塞方法,能收到消息就收,没有消息不阻塞继续往下执行<br /> 服务器端<br /> ```</p> <h1><em><em><em>coding:utf-8</em></em></em></h1> <p><strong>author</strong> = 'Alex Li' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))</p> <p>channel = connection.channel()</p> <p>channel.queue<em>declare(queue='rpc</em>queue')</p> <p>def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2)</p> <p>def on_request(ch, method, props, body): n = int(body)</p> <pre><code>print(&quot; [.] fib(%s)&quot; % n)<br/> response = fib(n)<br/>

ch.basic_publish(exchange='',<br/> routing_key=props.reply_to,<br/> properties=pika.BasicProperties(correlation_id = <br/> props.correlation_id),<br/> body=str(response))<br/> ch.basic_ack(delivery_tag = method.delivery_tag)<br/> </code></pre><br/>

<p>channel.basic<em>qos(prefetch</em>count=1) channel.basic_consume(on<em>request, queue='rpc</em>queue')</p> <p>print(&quot; [x] Awaiting RPC requests&quot;) channel.start_consuming() <code>客户端</code> import pika import uuid</p> <p>class FibonacciRpcClient(object): def <strong>init</strong>(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))</p> <pre><code> self.channel = self.connection.channel()<br/> <br/> result = self.channel.queue_declare(exclusive=True)<br/> self.callback_queue = result.method.queue<br/>

self.channel.basic_consume(self.on_response, no_ack=True,<br/>
                           queue=self.callback_queue)<br/>

def on_response(self, ch, method, props, body):<br/> if self.corr_id == props.correlation_id:<br/> self.response = body<br/>

def call(self, n):<br/> self.response = None<br/> self.corr_id = str(uuid.uuid4())<br/> self.channel.basic_publish(exchange='',<br/> routing_key='rpc_queue',<br/> properties=pika.BasicProperties(<br/> </code></pre>

<p>reply<em>to = self.callback</em>queue,#客户端发送消息是,把接收返回消息的管道也告诉给服务器端 correlation<em>id = self.corr<em>id,#用于判断服务器端返回的结果和个人请求是不是同一条。目的就是为了客户端能够同时发两条消息,当服务器端返回结果时,须要有判断关于哪条请求的结果 ), body=str(n)) while self.response is None: self.connection.process</em>data</em>events() return int(self.response)</p> <p>fibonacci_rpc = FibonacciRpcClient()</p> <p>print(&quot; [x] Requesting fib(30)&quot;) response = fibonacci_rpc.call(30) print(&quot; [.] Got %r&quot; % response) ``` </p> <h3>Redis</h3> <p>缓存中间商,用socket,例如:mongodb,redis,memcache<br /> * 链接方式<br /> * 链接池<br /> * 操做<br /> - String 操做<br /> - Hash 操做<br /> - List 操做<br /> - Set 操做 <br /> - Sort Set 操做<br /> * 管道<br /> * 发布订阅 </p> <h5>String操做</h5> <ol> <li> set(name, value, ex=None, px=None, nx=False, xx=False)<br /> 在Redis中设置值,默认,不存在则建立,存在则修改<br /> 参数:<br /> ex,过时时间(秒) <br /> px,过时时间(毫秒)<br /> nx,若是设置为True,则只有name不存在时,当前set操做才执行<br /> xx,若是设置为True,则只有name存在时,岗前set操做才执行

</li> <li>setnx(name, value) 设置值,只有name不存在时,执行设置操做(添加)</li> <li>setex(name, value, time) 设置值 参数:time,过时时间(数字秒 或 timedelta对象)</li> <li>psetex(name, time<em>ms, value) 设置值 参数:time</em>ms,过时时间(数字毫秒 或 timedelta对象) </li> <li>mset(*args, **kwargs) 批量设置值 如:mset(k1='v1', k2='v2')或mget({'k1': 'v1', 'k2': 'v2'}) </li> <li>get(name) 获取值</li> <li>mget(keys, *args) 批量获取 如:mget('ylr', 'wupeiqi')或r.mget(['ylr', 'wupeiqi']) </li> <li>getset(name, value) 设置新值并获取原来的值</li> <li>getrange(key, start, end) 获取子序列(根据字节获取,非字符) 参数: name,Redis 的 name start,起始位置(字节) end,结束位置(字节) 如: &quot;武沛齐&quot; ,0-3表示 &quot;武&quot;</li> <li>setrange(name, offset, value) 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) 参数: offset,字符串的索引,字节(一个汉字三个字节) value,要设置的值 </li> <li>setbit(name, offset, value) BITCOUNT 统计二进制有多少个1 对name对应值的二进制表示的位进行操做 参数: name,redis的name offset,位的索引(将值变换成二进制后再进行索引) value,值只能是 1 或 0 注:若是在Redis中有一个对应: n1 = &quot;foo&quot;, 那么字符串foo的二进制表示为:01100110 01101111 01101111 因此,若是执行 setbit('n1', 7, 1),则就会将第7位设置为1, 那么最终二进制则变成 01100111 01101111 01101111,即:&quot;goo&quot; 扩展,转换二进制表示: source = &quot;武沛齐&quot; source = &quot;foo&quot; for i in source: num = ord(i) print bin(num).replace('b','') 特别的,若是source是汉字 &quot;武沛齐&quot;怎么办? 答:对于utf-8,每个汉字占 3 个字节,那么 &quot;武沛齐&quot; 则有 9个字节 对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每个字节转换 十进制数,而后再将十进制数转换成二进制 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000 -------------------------- ----------------------------- ----------------------------- 武 沛 齐 </li> <li>getbit(name, offset) 获取name对应的值的二进制表示中的某位的值 (0或1)</li> <li>bitcount(key, start=None, end=None) 获取name对应的值的二进制表示中 1 的个数 参数: key,Redis的name start,位起始位置 end,位结束位置</li> <li> bitop(operation, dest, *keys) 获取多个值,并将值作位运算,将最后的结果保存至新的name对应的值 参数: operation,AND(并) 、 OR(或) 、 NOT(非) 、 XOR(异或) dest, 新的Redis的name *keys,要查找的Redis的name 如: bitop(&quot;AND&quot;, 'new<em>name', 'n1', 'n2', 'n3') 获取Redis中n1,n2,n3对应的值,而后讲全部的值作位运算(求并集),而后将结果保存 new</em>name 对应的值中 </li> <li>strlen(name) 返回name对应值的字节长度(一个汉字3个字节)</li> <li>incr(self, name, amount=1) 自增 name对应的值,当name不存在时,则建立name=amount,不然,则自增。 参数: name,Redis的name amount,自增数(必须是整数) 注:同incrby</li> <li>incrbyfloat(self, name, amount=1.0) 自增 name对应的值,当name不存在时,则建立name=amount,不然,则自增。 参数: name,Redis的name amount,自增数(浮点型)</li> <li>decr(self, name, amount=1) 自减 name对应的值,当name不存在时,则建立name=amount,不然,则自减。 参数: name,Redis的name amount,自减数(整数)</li> <li>append(key, value) 在redis name对应的值后面追加内容 参数: key, redis的name value, 要追加的字符串   </li> </ol>

</body> </html> <!-- This document was created with MarkdownPad, the Markdown editor for Windows (http://markdownpad.com) -->

相关文章
相关标签/搜索