1.动态导入模块html
2.粘包问题python
3.paramkio(ssh连接模块)linux
4.多线程git
5.GIL锁github
6.互诉锁windows
7.递归锁缓存
8.Semaphore(信号量)服务器
9.事件(多线程标志位)网络
10.队列(queue)多线程
11.生产者消费者模型
12.多进程
13.进程之间通信
14.进程之间数据共享
15.进程池
16. 协程
17.事件驱动
18.堵塞IO 非堵塞,同步IO,异步IO
aa.py def test(): print("ok") class C: def __init__(self): self.name = 'abc' __import__ data = __import__('day5.aa') a =data.aa a.test() b =data.aa.C() print(b.name) -------------------------------------------- import importlib aa = importlib.import_module('day5.aa') print(aa.C().name) aa.test() --------------------------------------------- conn.send(str(len(cmd_res.encode())).encode("utf-8")) -------------------------------------------------------------
while True: cmd = input(">>:").strip() if len(cmd) == 0:continue if cmd.startwith("get"): clinet.send(cmd.encode()) file_toal_size = int(server_response.decode()) received_size = 0 filename = cmd.split()[1] f = open(filename,'wb') m = hashlib.md5() while received_size < file_toal_size: if file_toal_size - received_size > 1024: size = 1024 else: size = file_toal_size - received_size data = client.recv(size) received_size += len(data) f.write(data) else: new_file_md5 = m.hexdigest() -------------------------------------------- import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): def handler(self): while True: try: self.data = self.request.recv(1024).strip() print("{}wrote:".format(self.client_address[0])) print(self.data) self.request.send(self.data.upper()) except ConnectionAbortedError as e: print("ree",e) break if __name__ == "__main__": HOST,PORT = "localhost",9999
paramiko模块安装
http://blog.csdn.net/qwertyupoiuytr/article/details/54098029
#!/usr/bin/env python # _*_ encoding:utf-8 _*_ import paramiko #建立SSH对象 ssh = paramiko.SSHClient() #容许连接不在know_host文件主机中 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #连接服务器 ssh.connect(hostname='192.168.80.11',port=22,username='root',password='123.com') #执行命令 stdin,stdout,stderr = ssh.exec_command('df') #获取结果 res,err = stdout.read(),stderr.read() resilt = res if res else err print(resilt) #关闭链接 ssh.close() ------------------------------------------------------- transport = paramiko.Transport(('192.168.80.11',22)) transport.connect(username='root',password='123.com') ssh = paramiko.SSHClient() ssh._transport = transport stdin,stdout,stderr = ssh.exec_command('df') print(stdout.read()) transport.close()
transport = paramiko.Transport(('192.168.80.11',22)) transport.connect(username='root',password='123.com') sftp = paramiko.SFTPClient.from_transport(transport) #上传文件 sftp.put('windows.txt','/root/win.txt') #下载文件 #sftp.get('linux.txt','linux.txt') transport.close()
linux 拷贝公钥 ssh-copy-id "root@192.168.80.11" #指定公钥 private_key = paramiko.RSAKey.from_private_key_file('id_rsa') ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.80.11',port=22,username='root',pkey=private_key) stdin,stdout,stderr = ssh.exec_command('df') result = stdout.read() print(result) ssh.close()
private_key = paramiko.RSAKey.from_private_key_file('id_rsa') transport = paramiko.Transport(('192.168.80.11',22)) transport.connect(username='root',pkey=private_key) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put('windows.txt','/root/windows.txt') sftp.get('linux.txt','linux.txt') transport.close()
io 操做不沾用cpu
计算占用cpu,1+1
python多线程 不适合cpu密集操做型的任务,适合io密集型的任务操做
def run(n): print('task',n) time.sleep(2) t1=threading.Thread(target=run,args=('t1',)) t2=threading.Thread(target=run,args=('t2',)) t1.start() t2.start() ############ import threading import time class MyThread(threading.Thread): def __init__(self,n): super(MyThread,self).__init__() self.n = n def run(self): print("runnint task",self.n) t1 = MyThread("t1") t1.start() ################ start_time = time.time() t_objs = [] def run(n): print('task',n) time.sleep(2) for i in range(50): t = threading.Thread(target=run,args=('t-%s'%i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print('--------------all-------------') print("cost",time.time() - start_time)
print('-all--',threading.current_thread(),threading.active_count())
import threading import time start_time = time.time() t_objs = [] def run(n): print('task',n) time.sleep(2) for i in range(50): t = threading.Thread(target=run,args=('t-%s'%i,)) t.setDaemon(True) #把子线程变成守护线程(守护线程,主线程执行完推出,不等待守护线程执行结束) t.start() t_objs.append(t) for t in t_objs: t.join() print('--------------all-------------',threading.current_thread(),threading.active_count()) print("cost",time.time() - start_time)
python只能执行一个进程,因此在执的多进程工做室,是利用上下切花来完成的 由于python是调用C语言原始的进程接口,不能够调整 进程工做的顺序,在同一时间内只有一个进程在处理数据
互诉锁 防止上下切换覆盖数据 import threading import time def run(n): lock.acpuire() #加锁 global num num += 1 time.sleep(1) lock.release() #释放锁 lock = threading.Lock() #调用锁 num = 0 t_objs = [] for i in range(50): t = threading.Thread(target=run,args=("t-%s"%i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print('num',num)
递归锁 防止锁顺序错乱 import threading,time def run1(): print("grab the first part dara") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=2 lock.release() return num2 def run3(): lock.acquire() res = run1() print("----run1 run2-----") res2 = run2() lock.release() print(res,res2) if __name__ == '__main__': num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() !=1: print(threading.active_count()) else: print(num,num2)
互斥锁 同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据 ,好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。 import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the threading:%s\n"%n) semaphore.release() if __name__ == '__main__': semaphore = threading.BoundedSemaphore(5) #容许5个线程同时运行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() !=1: pass else: print('----all-------')
import threading,time event = threading.Event() def lighter(): count = 0 event.set() while True: if count >5 and count <10: event.clear() print("\033[41m--->红灯\033[0m") elif count >10: event.set() count = 0 else: print("\033[42m--->绿灯\033[0m") time.sleep(1) count +=1 def car(name): while True: if event.is_set(): print("[%s] running..."% name) time.sleep(1) else: print("[%s]sees red light waiting.."%name) event.wait() print("\033[34m[%s] green ligth is on start going....\033[0m"%name) light = threading.Thread(target=lighter,) light.start() car1 = threading.Thread(target=car,args=("Tesla",)) car1.start()
解耦,使程序直接耦合,提升程序效率,一个进程修改不影响其余进程
q = queue.Queue() # q = queue.Queue(maxsize=3) 设置队列数量 q.put(1) 传数据 q.put(2) q.put(3) print(q.qsize()) 查看队列大小 print(q.get()) 取数据 print(q.get()) # print(q.get_nowait()) 取数据为空时不会卡住 # print(q.get(block=False)) 设置false取数据为空时不会卡住 print(q.get(timeout=1)) 设置其数据时间为1秒
import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(2) print(q.get()) print(q.get()) print(q.get())
import queue q = queue.PriorityQueue() q.put((-1,"a")) q.put((3,"b")) q.put((6,"c")) print(q.get()) print(q.get()) print(q.get())
import threading import queue q = queue.Queue() def producer(): for i in range(10): q.put("骨头%s"%i) print("开始等待骨头被取走。。。") q.join() print("全部骨头被取完了。。。") def consumer(n): while q.qsize() >0: print("%s 取到"%n,q.get()) q.task_done() p = threading.Thread(target=producer,) p.start() # b = threading.Thread(target=consumer,args=("abc",)) # b.start() consumer("abc")
def run(name): time.sleep(2) print("hello",name) if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=run,args=("bob %s"%i,)) p.start() def thread_run(): print(threading.get_ident()) #返回当前线程的“线程标识符 def run(name): time.sleep(2) print("hello",name) t = threading.Thread(target=thread_run,) t.start() if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=run,args=("bob %s"%i,)) p.start()
from multiprocessing import Process,Queue def f(qq): qq.put([42,None,'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f,args=(q,)) p.start() print(q.get()) p.join() #################### from multiprocessing import Process import os def info(title): print(title) print('module name',__name__) print('parent process',os.getppid()) 打印父进程id print('process id:',os.getpid()) 打印子进程id print("\n\n") def f(name): info('\033[31mfunction f\033[0m') print('hello',name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f,args=('bob',)) p.start() p.join() #Pipes #Pipe是经过管道传送和接受数据的 from multiprocessing import Process,Pipe def f(conn): conn.send([42,None,"hello"]) conn.close() if __name__ == '__main__': parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()
from multiprocessing import Process,Manager import os def f(d,l): d[os.getpid()] = os.getpid() l.append(os.getpid()) print(l) print(d) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list =[] for i in range(10): p = Process(target=f,args=(d,l,)) p.start() p_list.append(p) for res in p_list: res.join() # print(d) # print(l) ---------------------------------------------- from multiprocessing import Process,Lock def f(l,i): l.acquire() try: print('hello world',i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f,args=(lock,num)).start()
from multiprocessing import process,Pool,freeze_support (windows 须要加,freeze_support) import time,os def Foo(i): time.sleep(2) print('in process',os.getpid()) return i + 100 def Bar(arg): print('-->exec done:',arg,os.getpid()) if __name__ == '__main__': pool = Pool(processes=3) 容许进程池同时放入5个进程 print("主进程",os.getpid()) for i in range(10): pool.apply_async(func=Foo,args=(i,),callback=Bar) #同步(并行) (callback方法 执行完Foo执行Bar 避免重复的长链接) #pool.apply(func=Foo,args=(i,)) 串行 print('end') pool.close() pool.join()
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") producer() ######################### from greenlet import greenlet def test1(): print(12) #2 gr2.switch() #切换 print(34) #4 gr2.switch() #切换 def test2(): print(56) #3 gr1.switch() #切换 print(78) #5 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() #1
协程切换原理 遇到IO操做就切换,执行时间短的先执行,(IO 为等待时间) import gevent def foo(): print('foo 1') #1 gevent.sleep(2) print('foo 2') #6 def bar(): print('bar 1') #2 gevent.sleep(1) print('bar 2') #5 def func3(): print("func 1") #3 gevent.sleep(0) print('func 2') #4 gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), gevent.spawn(func3), ])
#!/usr/bin/env python # _*_ encoding:utf-8 _*_ from greenlet import greenlet from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #把当前程序全部的IO操做作上标记(不然gevent没法识别) def f(url): print('GET: %s'% url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s'%(len(data),url)) urls = ['https://www.python.org/ ', 'https://www.yahoo.com/ ', 'https://github.com/ '] time_start = time.time() for url in urls: f(url) print("cost",time.time() - time_start) async_time_start = time.time() gevent.joinall([ gevent.spawn(f,'https://www.python.org/ '), gevent.spawn(f,'https://www.yahoo.com/ '), gevent.spawn(f,'https://github.com/ '),]) print("cost",time.time() - async_time_start)
server #!/usr/bin/env python # _*_ encoding:utf-8 _*_ import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('localhost',port)) s.listen(500) while True: cli,addr = s.accept() gevent.spawn(handle_request,cli) def handle_request(conn): try: while True: data = conn.recv(1024) print('recv:',data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001) clinet import socket HOST = 'localhost' PORT = 8001 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect((HOST,PORT)) while True: msg = bytes(input(">>"),encoding='utf8') s.sendall(msg) data = s.recv(1024) print('Received',data) s.close()
对事件作处理 例如:点击鼠标 放到一个时间列表 按键盘放到一个时间列表 有一个进程来处理
http://www.cnblogs.com/alex3714/articles/5876749.html
服务收到一个事件,会放到对应的列表里面,文件描述符就是对应的索引,而索引对应是文件句柄(文件对象)
程序不能够直接调用系内核,程序打开文件都是调用内核来完成的,例如拷贝文件,是先拷贝到内核缓存区 而后再拷贝到io
一个进程正在执行,另外一个进程在等待,就形成了堵塞。
当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,
而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,
而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,
因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,
那么它立刻就将数据拷贝到了用户内存,而后返回。
因此,nonblocking IO的特色是用户进程须要不断的主动询问kernel数据好了没有。
IO multiplexing就是咱们说的select,poll,epoll,
有些地方也称这种IO方式为event driven IO。
select/epoll的好处就在于单个process就能够同时处理多个网络链接的IO。
它的基本原理就是select,poll,
epoll这个function会不断的轮询所负责的全部socket,
当某个socket有数据到达了,就通知用户进程。
当用户进程调用了select,那么整个进程会被block,
而同时,kernel会“监视”全部select负责的socket,
当任何一个socket中的数据准备好了,select就会返回。
这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。
因此,I/O 多路复用的特色是经过一种机制一个进程能同时等待多个文件描述符,
而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就能够返回。
select,poll,epoll是IO多路复中监视数据
select
例若有100个连接过来 内核检测到其中只有两个有数据,内核不会告诉select
所以select 须要本身循环查找消耗事件,
select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,
不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。
poll
poll在1986年诞生于System V Release 3,
它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。
epoll
它几乎具有了以前所说的一切优势,
被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
例若有100个连接过来 内核检测到其中只有两个有数据,
内核会直接告诉epoll只有两个有数据 就不用本身查找。
用户进程发起read操做以后,马上就能够开始去作其它的事。
而另外一方面,从kernel的角度,当它受到一个asynchronous read以后,
首先它会马上返回,因此不会对用户进程产生任何block。而后,
kernel会等待数据准备完成,而后将数据拷贝到用户内存,
当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。
阻塞IO,非阻塞IO,IO多路复用:都为同步IO
异步IO:异步IO
IO多路复用之select模式
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 4 import socket 5 import queue 6 import select 7 8 msg_dic = {} 9 10 server = socket.socket() 11 server.bind(('localhost',9000)) 12 server.listen(1000) 13 14 #不阻塞 15 server.setblocking(False) 16 17 inputs = [server,] 18 outputs = [] 19 20 while True: #新链接 ,下次循环执行,异常 21 readable,writeable,execeptional=select.select(inputs,outputs,inputs) #io多路复用select模式 22 for r in readable: 23 if r is server: #表明来了一个新链接 24 conn,addr = server.accept() 25 print("来了新链接",addr) 26 inputs.append(conn) #是由于这个新建的连接尚未发送数据过来,如今就接收, 27 #因此要想实现这个客户端发送数据来时server能知道,就让server再监测这个conn. 28 29 msg_dic[conn]= queue.Queue() #初始化一个队列,后面存要返给这个客户端的数据 30 else: 31 data = r.recv(1024) 32 print('收到数据',data) 33 msg_dic[r].put(data) #把新来的连接添加队列 34 outputs.append(r) #添加outputs下次循环执行 35 36 for w in writeable: #要返回给客户端的连接列表 37 data_to_client = msg_dic[w].get() #取队列数据 38 w.send(data_to_client) #发送数据 39 outputs.remove(w) #确保下次循环的时候writeabke,不反回已经处理完的连接 40 41 for e in execeptional: 42 if e in outputs: 43 outputs.remove(e) #删除 44 45 inputs.remove(e) #删除 46 47 del msg_dic[e] #删除 48 49 server
1 import socket 2 3 HOST = 'localhost' 4 PORT = 9000 5 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 6 s.connect((HOST,PORT)) 7 8 while True: 9 msg = bytes(input(">>"),encoding='utf8') 10 s.sendall(msg) 11 data = s.recv(1024) 12 print('Received',data) 13 14 s.close()
IO多路复用之epoll模式
1 #!/usr/bin/env python 2 # _*_ encoding:utf-8 _*_ 3 import selectors 4 import socket 5 6 sel = selectors.DefaultSelector() 7 8 def accept(sock,mask): 9 conn,addr = sock.accept() 10 conn.setblocking(False) 11 sel.register(conn,selectors.EVENT_READ,read) 12 13 def read(conn,mask): 14 data = conn.recv(1000) 15 if data: 16 print('echoing', repr(data), 'to', conn) 17 conn.send(data) 18 else: 19 print('closing', conn) 20 sel.unregister(conn) 21 conn.close() 22 23 24 25 sock = socket.socket() 26 sock.bind(('localhost',9999)) 27 sock.listen(100) 28 sock.setblocking(False) #设置非堵塞 29 sel.register(sock,selectors.EVENT_READ,accept) #注册 30 31 while True: 32 events = sel.select() #默认堵塞,有活动连接就返回活动连接列表 33 for key,mask in events: #有连接过来 34 callback = key.data #accept 35 callback(key.fileobj,mask) #执行accept函数 key.fileobj=conn
1 import socket 2 import sys 3 4 messages = ['This is the mess', 5 'It will be sent', 6 'in parts', 7 ] 8 9 server_address = ('localhost',9999) 10 11 socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM) for i in range(5)] 12 13 for s in socks: 14 s.connect(server_address) 15 16 for message in messages: 17 for s in socks: 18 s.send(message.encode()) 19 print('send %s %s' % s.getsockname(),message) 20 21 for s in socks: 22 data = s.recv(1024) 23 print('recv %s %s'% s.getsockname(),data) 24 if not data: 25 print('not data %s %s'%s.getsockname(),data) 26 s.close()