概述html
1、线程池python
上下文管理mysql
终止线程redis
2、redissql
发布订阅数据库
链接池express
服务器列表操做编程
事物api
3、rebbitMQ安全
基础
4、MySQL
一、建立文件夹和文件
二、文件内部数据
表内部数据
inner join
left join
right join
5、python pymysql
6、sqlachemy
7、python ORM
8、Paramiko
线程池里的with
import contextlib @contextlib.contextmanager #加上这个装饰器 下面with就能够调用 worker-state函数 管理上下文 def worker_state(state_list,worker_thread): """ 用于记录线程中正在等待的线程数 :param state_list: 至关于free_list :param worker_thread: 当前线程 :return: """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) free_list = [] current_thread = "alex" with worker_state(free_list,current_thread): print(123) print(456) #执行顺序:
#先执行with一行-->而后执行函数worker_state里的state_list.append-->yield-->出去执行with里的print-->再回到函数里执行finally下的state_list.remove
自定义用with管理socket,不用关闭
import contextlib import socket @contextlib.contextmanager def context_socket(host,port): sk = socket.socket() sk.bind((host,port)) sk.listen(5) try: yield sk #至关于把sk返回给as sock的sock创建了链接 finally: sk.close() with context_socket('127.0.0.1',8888) as sock: print(sock)
之后若是涉及到须要close就能够用with不用关闭
终止线程
设置空标识
上面把self.terminal 设置为False,若是想中止线程,就设置为True,让event为空。
import queue import threading import contextlib import time StopEvent = object() #能够设置为空,是为了终止进程 class ThreadPool(): def __init__(self, max_num, max_task_num = None): if max_task_num: #若是max_task_num有值就建立队列 self.q = queue.Queue(max_task_num) #建立队列,最多放max_task_num个任务 else: #若max_task_num无值,则不定义最大长度 self.q = queue.Queue() #建立先进先出队列 self.max_num = max_num #表示最多有多少个线程 self.cancel = False self.terminal = False self.generate_list = [] #表示当前已经建立多少个线程 self.free_list = [] #当前还空闲多少个线程 def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数一、任务函数执行状态;二、任务函数返回值(默认为None,即:不执行回调函数) :return: 若是线程池已经终止,则返回True不然None """ if self.cancel: return # 判断 没有空闲线程而且已经建立的线程小于最大线程数 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() #调用建立线程的方法 w = (func, args, callback,) #把传过来的三个参数放到一个元组里 self.q.put(w) #把任务放到队列中 def generate_thread(self): """ 建立一个线程 """ t = threading.Thread(target=self.call)#建立的线程执行call方法 t.start() #执行 def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread #返回当前线程对象 self.generate_list.append(current_thread) #把当前建立的线程放到列表里 event = self.q.get() #获取队列中的任务 while event != StopEvent: #任务不等于后面的值就会一直循环 func, arguments, callback = event try: #捕捉异常 result = func(*arguments) #执行action函数 success = True except Exception as e: success = False result = None if callback is not None: try: #捕捉异常 callback(success, result) except Exception as e: pass #使线程变为空闲,以便下次再取任务 #等待着,等待着...... #有任务来了 #当前线程状态,再也不空闲 with self.worker_state(self.free_list, current_thread): if self.terminal: ###########让线程终止############ event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) #移除当前线程 终止线程 def close(self): """ 执行完全部的任务后,全部线程中止 """ self.cancel = True full_size = len(self.generate_list) #获取建立的线程个数 while full_size: self.q.put(StopEvent) #往队列里放入空值,空值的个数和建立线程的个数相等 full_size -= 1 def terminate(self): """ 不管是否还有任务,终止线程 """ self.terminal = True while self.generate_list: #若列表还有建立的线程 self.q.put(StopEvent) #把空值放入队列中 self.q.empty() #队列为空返回True,不然返回False @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) pool = ThreadPool(5) #线程池里只能放5个 def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(300): ret = pool.run(action, (i,), callback) #执行run方法,参数(函数,i,函数名)
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消 息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过 队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ安装
安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang $ yum -y install erlang 安装RabbitMQ $ yum -y install rabbitmq-server
启动中止服务:service rabbitmq-server start/stop
安装API
pip install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika
使用API操做RabbitMQ
能够基于Queue实现生产者消费者模型
对于RabbitMQ来讲,生产和消费再也不针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.queue_declare(queue='hello2') #ch=频道,method=队列名字,properties=属性,body=取到的内容 def callback(ch,method,properties,body): print("[x]Received%r"%body) #到hello2中取数据,取完调用callback函数 channel.basic_consume(callback,queue='hello2',no_ack=True) print('[*]Waiting for messages. To exit press CTRL+C') channel.start_consuming()
import pika #建立链接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) #建立频道 channel = connection.channel() #定义队列,若是存在忽略,不存在就建立 channel.queue_declare(queue='hello2') #往hello2队列中发内容hello world! channel.basic_publish(exchange='',routing_key='hello2',body='hello world!') print("[x]Sent 'hello world!'") connection.close()
acknowledgment 消息不丢失
no-ack = False,若是消费者遇到状况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会从新将该任务添加到队列中。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') # 随机建立队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # exchange和队列作绑定 channel.queue_bind(exchange='logs_fanout',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
状况一:no_ack=true (此时为自动应答)
在这种状况下,consumer 会在接收到 Basic.Deliver + Content-Header + Content-Body 以后,当即回复 Ack 。而这个 Ack 是 TCP 协议中的 Ack 。此 Ack 的回复不关心 consumer 是否对接收到的数据进行了处理,固然也不关心处理数据所须要的耗时。
状况二:no_ack=false (此时为手动应答)
在这种状况下,要求 consumer 在处理完接收到的 Basic.Deliver + Content-Header + Content-Body 以后才回复 Ack 。而这个 Ack 是 AMQP 协议中的 Basic.Ack 。此 Ack 的回复是和业务处理相关的,因此具体的回复时间应该要取决于业务处理的耗时。
随机建立队列
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') # 随机建立队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # exchange和队列作绑定 channel.queue_bind(exchange='logs_fanout',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
import pika #建立链接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) #建立频道 channel = connection.channel() #建立exchange channel.exchange_declare(exchange='logs_fanout',type='fanout') message = '123' #发布 channel.basic_publish(exchange='logs_fanout',routing_key='',body=message) print("[x] Sent %r"%message) connection.close()
消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,再也不按照奇偶数排列
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给全部的订阅者,而消息队列中的数据被消费一次便消失。因此,RabbitMQ实现发布和订阅时,会为每个订阅者建立一个队列,而发布者发布消息时,会将消息放置在全部相关队列中。
exchange type = fanout
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
关键字发送
exchange type = direct
以前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs_test',type='direct') result = channel.queue_declare(exclusive=True) queque_name = result.method.queue severities = ['error','info','warning'] for severity in severities: channel.queue_bind(exchange='direct_logs_test',queue=queque_name,routing_key=severity) print('[*] Waiting for logs. To exit press CTRL+C') def calback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channel.basic_consume(calback,queue=queque_name,no_ack=True) channel.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channet = connection.channel() channet.exchange_declare(exchange='direct_logs_test',type='direct') result = channet.queue_declare(exclusive=True) queque_name = result.method.queue severities = ['error',] for severity in severities: channet.queue_bind(exchange='direct_logs_test',queue=queque_name,routing_key=severity) print('[*] Waiting for logs. To exit press CTRL+C') def calback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channet.basic_consume(calback,queue=queque_name,no_ack=True) channet.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs_test',type='direct') severity = 'error' message = '123' channel.basic_publish(exchange='direct_logs_test',routing_key=severity,body=message) print("[x] Sent %r:%r"%(severity,message)) connection.close()
模糊匹配
exchange type = topic
在topic类型下,可让队列绑定几个模糊的关键字,以后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
发送者路由值 队列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
Demo:
##############producer.py################ import pika import sys #建立链接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立虚拟链接channel cha = con.channel() #建立队列anheng,durable参数为真时,队列将持久化;exclusive为真时,创建临时队列 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False) #建立名为yanfa,类型为fanout的exchange,其余类型还有direct和topic,若是指定durable为真,exchange将持久化 cha.exchange_declare(durable=False, exchange='yanfa', type='direct',) #绑定exchange和queue,result.method.queue获取的是队列名称 cha.queue_bind(exchange='yanfa', queue=result.method.queue, routing_key='',) #公平分发,使每一个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message cha.basic_qos(prefetch_count=) #发送信息到队列‘anheng' message = ' '.join(sys.argv[:]) #消息持久化指定delivery_mode=; cha.basic_publish(exchange='', routing_key='anheng', body=message, properties=pika.BasicProperties( delivery_mode = , )) print '[x] Sent %r' % (message,) #关闭链接 con.close() ################consumer.py################## # 博客 http://www.cnblogs.com/duanv/ import pika #创建链接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立虚拟链接channel cha = con.channel() #建立队列anheng result=cha.queue_declare(queue='anheng',durable=True) #建立名为yanfa,类型为fanout的交换机,其余类型还有direct和topic cha.exchange_declare(durable=False, exchange='yanfa', type='direct',) #绑定exchange和queue,result.method.queue获取的是队列名称 cha.queue_bind(exchange='yanfa', queue=result.method.queue, routing_key='',) #公平分发,使每一个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message cha.basic_qos(prefetch_count=) print ' [*] Waiting for messages. To exit press CTRL+C' #定义回调函数 def callback(ch, method, properties, body): print " [x] Received %r" % (body,) ch.basic_ack(delivery_tag = method.delivery_tag) cha.basic_consume(callback, queue='anheng', no_ack=False,) cha.start_consuming()
1、概念:
Connection: 一个TCP的链接。Producer和Consumer都是经过TCP链接到RabbitMQ Server的。程序的起始处就是创建这个TCP链接。
Channels: 虚拟链接。创建在上述的TCP链接中。数据流动都是在Channel中进行的。通常状况是程序起始创建TCP链接,第二步就是创建这个Channel。
2、队列:
首先创建一个Connection,而后创建Channels,在channel上创建队列
创建时指定durable参数为真,队列将持久化;指定exclusive为真,队列为临时队列,关闭consumer后该队列将再也不存在,通常状况下创建临时队列并不指定队列名称,rabbitmq将随机起名,经过result.method.queue来获取队列名:
result = channel.queue_declare(exclusive=True)
result.method.queue
区别:durable是队列持久化与否,若是为真,队列将在rabbitmq服务重启后仍存在,若是为假,rabbitmq服务重启前不会消失,与consumer关闭与否无关;
而exclusive是创建临时队列,当consumer关闭后,该队列就会被删除
3、exchange和bind
Exchange中durable参数指定exchange是否持久化,exchange参数指定exchange名称,type指定exchange类型。Exchange类型有direct,fanout和topic。
Bind是将exchange与queue进行关联,exchange参数和queue参数分别指定要进行bind的exchange和queue,routing_key为可选参数。
Exchange的三种模式:
Direct:
任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue 1.通常状况可使用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串); 2.这种模式下不须要将Exchange进行任何绑定(bind)操做; 3.消息传递时须要一个“routing_key”,能够简单的理解为要发送到的队列名字; 4.若是vhost中不存在routing_key中指定的队列名,则该消息会被抛弃。 Demo中虽然声明了一个exchange='yanfa'和queue='anheng'的bind,可是在后面发送消息时并无使用该exchange和bind,而是采用了direct的模式,没有指定exchange,而是指定了routing_key的名称为队列名,消息将发送到指定队列。 若是一个exchange 声明为direct,而且bind中指定了routing_key,那么发送消息时须要同时指明该exchange和routing_key.
Fanout:
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的全部Queue上 1.能够理解为路由表的模式 2.这种模式不须要routing_key 3.这种模式须要提早将Exchange与Queue进行绑定,一个Exchange能够绑定多个Queue,一个Queue能够同多个Exchange进行绑定。 4.若是接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。 Demo中建立了一个将一个exchange和一个queue进行fanout类型的bind.可是发送信息时没有用到它,若是要用到它,只要在发送消息时指定该exchange的名称便可,该exchange就会将消息发送到全部和它bind的队列中。在fanout模式下,指定的routing_key是无效的 。
Topic:
任何发送到Topic Exchange的消息都会被转发到全部关心routing_key中指定话题的Queue上 1.这种模式较为复杂,简单来讲,就是每一个队列都有其关心的主题,全部的消息都带有一个“标题”(routing_key),Exchange会将消息转发到全部关注主题能与routing_key模糊匹配的队列。 2.这种模式须要routing_key,也许要提早绑定Exchange与Queue。 3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心全部涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。 4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,没法与“log.warn.timeout”匹配;可是“log.#”能与上述二者匹配。 5.一样,若是Exchange没有发现可以与routing_key匹配的Queue,则会抛弃此消息。
4、任务分发
1.Rabbitmq的任务是循环分发的,若是开启两个consumer,producer发送的信息是轮流发送到两个consume的。 2.在producer端使用cha.basic_publish()来发送消息,其中body参数就是要发送的消息,properties=pika.BasicProperties(delivery_mode = 2,)启用消息持久化,能够防止RabbitMQ Server 重启或者crash引发的数据丢失。 3.在接收端使用cha.basic_consume()无限循环监听,若是设置no-ack参数为真,每次Consumer接到数据后,而无论是否处理完成,RabbitMQ Server会当即把这个Message标记为完成,而后从queue中删除了。为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不只仅是被Consumer收到,那么咱们不能采用no-ack。而应该是在处理完数据后发送ack。 在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够去安全的删除它了。若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的状况下数据也不会丢失。 这里并无用到超时机制。RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来作数据处理。 Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告诉rabbitmq消息已经正确处理。若是没有这条代码,Consumer退出时,Message会从新分发。而后RabbitMQ会占用愈来愈多的内存,因为RabbitMQ会长时间运行,所以这个“内存泄漏”是致命的。去调试这种错误,能够经过一下命令打印un-acked Messages: sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 4.公平分发:设置cha.basic_qos(prefetch_count=1),这样RabbitMQ就会使得每一个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
5、注意:
生产者和消费者都应该声明创建队列,网上教程上说第二次建立若是参数和第一次不同,那么该操做虽然成功,可是queue的属性并不会被修改。
可能由于版本问题,在个人测试中若是第二次声明创建的队列属性和第一次不彻底相同,将报相似这种错406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"
若是是exchange第二次建立属性不一样,将报这种错406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"
若是第一次声明创建队列也出现这个错误,说明以前存在名字相同的队列且本次声明的某些属性和以前声明不一样,可经过命令sudo rabbitmqctl list_queues查看当前有哪些队列。解决方法是声明创建另外一名称的队列或删除原有队列,若是原有队列是非持久化的,可经过重启rabbitmq服务删除原有队列,若是原有队列是持久化的,只能删除它所在的vhost,而后再重建vhost,再设置vhost的权限(先确认该vhost中没有其余有用队列)。
用户受权
用户管理:
建立用户 create user '用户名'@'IP地址' identified by '密码'; 删除用户 drop user '用户名'@'IP地址'; 修改用户 rename user '用户名'@'IP地址'; to '新用户名'@'IP地址';; 修改密码 set password for '用户名'@'IP地址' = Password('新密码') PS:用户权限相关数据保存在mysql数据库的user表中,因此也能够直接对其进行操做(不建议)
受权管理:
show grants for '用户'@'IP地址' -- 查看权限 grant 权限 on 数据库.表 to '用户'@'IP地址' -- 受权 revoke 权限 on 数据库.表 from '用户'@'IP地址' -- 取消权限
all privileges 除grant外的全部权限 select 仅查权限 select,insert 查和插入权限 ... usage 无访问权限 alter 使用alter table alter routine 使用alter procedure和drop procedure create 使用create table create routine 使用create procedure create temporary tables 使用create temporary tables create user 使用create user、drop user、rename user和revoke all privileges create view 使用create view delete 使用delete drop 使用drop table execute 使用call和存储过程 file 使用select into outfile 和 load data infile grant option 使用grant 和 revoke index 使用index insert 使用insert lock tables 使用lock table process 使用show full processlist select 使用select show databases 使用show databases show view 使用show view update 使用update reload 使用flush shutdown 使用mysqladmin shutdown(关闭MySQL) super 使用change master、kill、logs、purge、master和set global。还容许mysqladmin调试登录 replication client 服务器位置的访问 replication slave 由复制从属使用
对于目标数据库以及内部其余: 数据库名.* 数据库中的全部 数据库名.表 指定数据库中的某张表 数据库名.存储过程 指定数据库中的存储过程 *.* 全部数据库
用户名@IP地址 用户只能在改IP下才能访问 用户名@192.168.1.% 用户只能在改IP段下才能访问(通配符%表示任意) 用户名@% 用户能够再任意IP下访问(默认IP地址为%)
grant all privileges on db1.tb1 TO '用户名'@'IP' grant select on db1.* TO '用户名'@'IP' grant select,insert on *.* TO '用户名'@'IP' revoke select on db1.tb1 from '用户名'@'IP'
表操做
一、建立表
是否可空,null表示空,非字符串 not null - 不可空 null - 可空
默认值,建立列时能够指定默认值,当插入数据时若是未主动设置,则自动添加默认值 create table tb1( nid int not null defalut 2, num int not null )
自增,若是为某列设置自增列,插入数据时无需设置此列,默认将自增(表中只能有一个自增列) create table tb1( nid int not null auto_increment primary key, num int null ) 或 create table tb1( nid int not null auto_increment, num int null, index(nid) ) 注意:1、对于自增列,必须是索引(含主键)。 2、对于自增能够设置步长和起始值 show session variables like 'auto_inc%'; set session auto_increment_increment=2; set session auto_increment_offset=10; shwo global variables like 'auto_inc%'; set global auto_increment_increment=2; set global auto_increment_offset=10;
主键,一种特殊的惟一索引,不容许有空值,若是主键使用单个列,则它的值必须惟一,若是是多列,则其组合必须惟一。 create table tb1( nid int not null auto_increment primary key, num int null ) 或 create table tb1( nid int not null, num int not null, primary key(nid,num) )
外键,一个特殊的索引,只能是指定内容 creat table color( nid int not null primary key, name char(16) not null ) create table fruit( nid int not null primary key, smt char(32) null , color_id int not null, constraint fk_cc foreign key (color_id) references color(nid) )
二、删除表
drop table 表名
三、清空表
delete from 表名 truncate table 表名
四、修改表
添加列:alter table 表名 add 列名 类型 删除列:alter table 表名 drop column 列名 修改列: alter table 表名 modify column 列名 类型; -- 类型 alter table 表名 change 原列名 新列名 类型; -- 列名,类型 添加主键: alter table 表名 add primary key(列名); 删除主键: alter table 表名 drop primary key; alter table 表名 modify 列名 int, drop primary key; 添加外键:alter table 从表 add constraint 外键名称(形如:FK_从表_主表) foreign key 从表(外键字段) references 主表(主键字段); 删除外键:alter table 表名 drop foreign key 外键名称 修改默认值:ALTER TABLE testalter_tbl ALTER i SET DEFAULT 1000; 删除默认值:ALTER TABLE testalter_tbl ALTER i DROP DEFAULT;
五、基本数据类型
MySQL的数据类型大体分为:数值、时间和字符串
http://www.runoob.com/mysql/mysql-data-types.html
Windows能够安装Navicat for mysql操做数据库。
insert into 表 (列名,列名...) values (值,值,值...) insert into 表 (列名,列名...) values (值,值,值...),(值,值,值...) insert into 表 (列名,列名...) select (列名,列名...) from 表
delete from 表 delete from 表 where id=1 and name='alex'
update 表 set name = 'alex' where id>1
select * from 表 select * from 表 where id > 1 select nid,name,gender as gg from 表 where id > 1
1、条件 select * from 表 where id > 1 and name != 'alex' and num = 12; select * from 表 where id between 5 and 16; select * from 表 where id in (11,22,33) select * from 表 where id not in (11,22,33) select * from 表 where id in (select nid from 表) 2、通配符 select * from 表 where name like 'ale%' - ale开头的全部(多个字符串) select * from 表 where name like 'ale_' - ale开头的全部(一个字符) 3、限制 select * from 表 limit 5; - 前5行 select * from 表 limit 4,5; - 从第4行开始的5行 select * from 表 limit 5 offset 4 - 从第4行开始的5行 4、排序 select * from 表 order by 列 asc - 根据 “列” 从小到大排列 select * from 表 order by 列 desc - 根据 “列” 从大到小排列 select * from 表 order by 列1 desc,列2 asc - 根据 “列1” 从大到小排列,若是相同则按列2从小到大排序 5、分组 select num from 表 group by num select num,nid from 表 group by num,nid select num,nid from 表 where nid > 10 group by num,nid order nid desc select num,nid,count(*),sum(score),max(score),min(score) from 表 group by num,nid select num from 表 group by num having max(id) > 10 特别的:group by 必须在where以后,order by以前 6、连表 无对应关系则不显示 select A.num, A.name, B.name from A,B Where A.nid = B.nid 无对应关系则不显示 select A.num, A.name, B.name from A inner join B on A.nid = B.nid A表全部显示,若是B中无对应关系,则值为null select A.num, A.name, B.name from A left join B on A.nid = B.nid B表全部显示,若是B中无对应关系,则值为null select A.num, A.name, B.name from A right join B on A.nid = B.nid 7、组合 组合,自动处理重合 select nickname from A union select name from B 组合,不处理重合 select nickname from A union all select name from B
pymsql是Python中操做MySQL的模块,其使用方法和MySQLdb几乎相同。
1、下载安装:
pip3 install pymysql
2、使用
一、执行SQL
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql # 建立链接 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 建立游标 cursor = conn.cursor() # 执行SQL,并返回收影响行数 effect_row = cursor.execute("update hosts set host = '1.1.1.2'") # 执行SQL,并返回受影响行数 #effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,)) # 执行SQL,并返回受影响行数 #effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) # 提交,否则没法保存新建或者修改的数据 conn.commit() # 关闭游标 cursor.close() # 关闭链接 conn.close()
二、获取新建立数据自增ID
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) conn.commit() cursor.close() conn.close() # 获取最新自增ID new_id = cursor.lastrowid
三、获取查询数据
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.execute("select * from hosts") # 获取第一行数据 row_1 = cursor.fetchone() # 获取前n行数据 # row_2 = cursor.fetchmany(3) # 获取全部数据 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
注:在fetch数据时按照顺序进行,可使用cursor.scroll(num,mode)来移动游标位置,如:
四、fetch数据类型
关于默认获取的数据是元祖类型,若是想要或者字典类型的数据,即:
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 游标设置为字典类型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) r = cursor.execute("call p1()") result = cursor.fetchone() conn.commit() cursor.close() conn.close()
SQLAlchemy是Python编程语言下的一款ORM框架,该框架创建在数据库API之上,使用关系对象映射进行数据库操做,简言之即是:将对象转换成SQL,而后使用数据API执行SQL并获取执行结果。
Dialect用于和数据API进行交流,根据配置文件的不一样调用不一样的数据库API,从而实现对数据库的操做,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操做,Engine使用ConnectionPooling链接数据库,而后再经过Dialect执行SQL语句。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) engine.execute( "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')" ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),) ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1" ) result = engine.execute('select * from ts_test') result.fetchall()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) # 事务操做 with engine.begin() as conn: conn.execute("insert into table (x, y, z) values (1, 2, 3)") conn.execute("my_special_procedure(5)") conn = engine.connect() # 事务操做 with conn.begin(): conn.execute("some statement", {'x':5, 'y':10})
注:查看数据库链接:show status like 'Threads%';
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操做。Engine使用Schema Type建立一个特定的结构对象,以后经过SQL Expression Language将该对象转换成SQL语句,而后经过 ConnectionPooling 链接数据库,再而后经过 Dialect 执行SQL,并获取结果。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) metadata.create_all(engine) # metadata.clear() # metadata.remove()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 建立SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{'id':7,'name':'seven'}) conn.close() # sql = user.insert().values(id=123, name='wu') # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()
更多内容详见:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy没法修改表结构,若是须要可使用SQLAlchemy开发者开源的另一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 全部组件对数据进行操做。根据类建立对象,对象转换成SQL,执行SQL。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) # 寻找Base的全部子类,按照子类的结构在数据库中生成对应的数据表信息 # Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() # ########## 增 ########## # u = User(id=2, name='sb') # session.add(u) # session.add_all([ # User(id=3, name='sb'), # User(id=4, name='sb') # ]) # session.commit() # ########## 删除 ########## # session.query(User).filter(User.id > 2).delete() # session.commit() # ########## 修改 ########## # session.query(User).filter(User.id > 2).update({'cluster_id' : 0}) # session.commit() # ########## 查 ########## # ret = session.query(User).filter_by(name='sb').first() # ret = session.query(User).filter_by(name='sb').all() # print ret # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all() # print ret # ret = session.query(User.name.label('name_label')).all() # print ret,type(ret) # ret = session.query(User).order_by(User.id).all() # print ret # ret = session.query(User).order_by(User.id)[1:3] # print ret # session.commit()
2、ORM功能使用
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 全部组件对数据进行操做。根据类建立对象,对象转换成SQL,执行SQL。
一、建立表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 建立单表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) # 一对多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多对多 class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine)
二、操做表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 建立单表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) def __repr__(self): return "%s-%s" %(self.id, self.name) # 一对多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) def __repr__(self): return "%s-%s" %(self.nid, self.caption) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 与生成表结构无关,仅用于查询方便 favor = relationship("Favor", backref='pers') # 多对多 class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) group = relationship("Group", backref='s2g') server = relationship("Server", backref='s2g') class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) # group = relationship('Group',secondary=ServerToGroup,backref='host_list') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) session = Session()
增
obj = Users(name="alex0", extra='sb') session.add(obj) session.add_all([ Users(name="alex1", extra='sb'), Users(name="alex2", extra='sb'), ]) session.commit()
删
session.query(Users).filter(Users.id > 2).delete()
session.commit()
改
session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit()
查
ret = session.query(Users).all() ret = session.query(Users.name, Users.extra).all() ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter_by(name='alex').first()
其余
# 条件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
paramiko模块,基于SSH用于链接远程服务器并执行相关操做。
1、安装
pip3 install paramiko
2、使用
SSHClient
用于链接远程服务器并执行基本命令
基于用户名密码链接:
import paramiko # 建立SSH对象 ssh = paramiko.SSHClient() # 容许链接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 链接服务器 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123') # 执行命令 stdin, stdout, stderr = ssh.exec_command('ls') # 获取命令结果 result = stdout.read() # 关闭链接 ssh.close()