目录python
from threading import Thread,Lock x = 0 mutex = Lock() def task(): global x for i in range(200000): x = x+1 """ 因为没有进程锁,程序切换比较快,例如: t1 的x 刚拿到0 x = 0+1 还没进行运算,保存了状态,就被cpu切换 t2 的 x 拿到0 进行了运算 x = 1 又切换到t1,x = 0+1 x = 1 产生数据安全的问题,因此结果就会随机 """ if __name__ == '__main__': t1 = Thread(target=task) t2 = Thread(target=task) t3 = Thread(target=task) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print(x) ################# 337566 #比较随机,正确的应该是600000
from threading import Thread, Lock x = 0 mutex = Lock() def task(): mutex.acquire() # 线程锁 global x for i in range(200000): x = x + 1 mutex.release() if __name__ == '__main__': t1 = Thread(target=task) t2 = Thread(target=task) t3 = Thread(target=task) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print(x) ########################## 600000
from threading import Thread,Lock import time mutex1 = Lock() mutex2 = Lock() class MyTreada(Thread): def run(self): self.task1() self.task2() def task1(self): mutex1.acquire() print(f'{self.name}抢到了锁1') mutex2.acquire() print(f'{self.name}抢到了锁2') mutex2.release() print(f'{self.name}释放了锁1') mutex1.release() print(f'{self.name}释放了锁2') def task2(self): mutex2.acquire() print(f'{self.name}抢到了锁2') time.sleep(1) mutex1.acquire() print(f'{self.name}抢到了锁2') mutex2.release() print(f'{self.name}释放了锁2') mutex1.release() print(f'{self.name}释放了锁1') for i in range(3): t = MyTreada() t.start() ################## Thread-1抢到了锁1 Thread-1抢到了锁2 Thread-1释放了锁1 Thread-1释放了锁2 Thread-1抢到了锁2 Thread-2抢到了锁1##########下面的线程被阻塞 ############################# """ 俩个线程 线程1拿到了锁头2,要想往下执行须要锁头1 线程2拿到了锁头1,想要往下执行须要锁头2 互相拿到了彼此往下执行的必要条件(锁头),互相不放弃手里的锁头,出现阻塞 """
解决方法,递归锁,在Python中为了支持在同一线程中屡次请求同一资源,python提供了可重入锁RLock。安全
mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的状况,则counter继续加1,这期间全部其余线程都只能等待,等待该线程释放全部锁,即counter递减到0为止
from threading import Thread,RLock """ 递归锁:在同一个线程内能够被屡次acquire 释放:内部至关于维护了一个计数器,也就是说同一个线程acquire了几回就要release()几回 """ import time mutex1 = RLock() mutex2 = mutex1 class MyThreada(Thread): def run(self): self.task1() self.task2() def task1(self): mutex1.acquire() print(f'{self.name}抢到了锁1') mutex2.acquire() print(f'{self.name}抢到了锁2') mutex2.release() print(f'{self.name}释放了锁2') mutex1.release() print(f'{self.name}释放了锁1') def task2(self): mutex2.acquire() print(f'{self.name}抢到了锁2') time.sleep(1) mutex1.acquire() print(f'{self.name}抢到了锁1') mutex1.release() print(f'{self.name}释放了锁1') mutex2.release() print(f'{self.name}释放了锁2') for i in range(3): t = MyThreada() t.start() ############################# Thread-1抢到了锁1 Thread-1抢到了锁2 Thread-1释放了锁2 Thread-1释放了锁1 Thread-1抢到了锁2 Thread-1抢到了锁1 Thread-1释放了锁1 Thread-1释放了锁2 Thread-2抢到了锁1 Thread-2抢到了锁2 Thread-2释放了锁2 Thread-2释放了锁1 Thread-2抢到了锁2 Thread-2抢到了锁1 Thread-2释放了锁1 Thread-2释放了锁2 Thread-3抢到了锁1 Thread-3抢到了锁2 Thread-3释放了锁2 Thread-3释放了锁1 Thread-3抢到了锁2 Thread-3抢到了锁1 Thread-3释放了锁1 Thread-3释放了锁2
同进程的同样服务器
实现:定制了锁的个数,也就意味着最多有几个线程能够抢到锁头网络
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器+1;
调用release() 时内置计数器-1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。多线程
实例:(同时只有5个线程能够得到semaphore,便可以限制最大链接数为5):并发
from threading import Thread,currentThread,Semaphore import time def task(): sm.acquire() print(f'{currentThread().name}正在执行') time.sleep(3) sm.release() sm = Semaphore(5) for i in range(15): t = Thread(target=task) t.start() #################### Thread-1正在执行 Thread-2正在执行 Thread-3正在执行 Thread-4正在执行 Thread-5正在执行 Thread-7正在执行 Thread-6正在执行 Thread-8正在执行 Thread-9正在执行 Thread-10正在执行 Thread-11正在执行 Thread-13正在执行 Thread-12正在执行 Thread-14正在执行 Thread-15正在执行
与进程池是彻底不一样的概念,进程池Pool(4),最大只能产生4个进程,并且从头至尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程。app
由于cpython自带的垃圾回收机制不是线程安全的,因此要有GIL锁dom
致使了同一个进程下,同一时间只能由一个线程,没法利用多核优点异步
分析:socket
推荐使用多进程,利用cpu的多核优点,并行的计算(cpu主要负责计算)
下面例子,分析过程:
from threading import Thread from multiprocessing import Process import time def workl(): res = 0 for i in range(10000000): res*=i if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): p = Process(target=workl) t_list.append(p) p.start() for p in t_list: p.join() end = time.time() print("多进程",end-start) ################ 多进程 1.082251787185669
from threading import Thread from multiprocessing import Process import time def workl(): res = 0 for i in range(10000000): res*=i if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): t = Thread(target=workl) t_list.append(t) t.start() for t in t_list: t.join() end = time.time() print("多线程",end-start) ############### 多进程 2.43462872505188
推荐使用多线程解决,大部分时间都在io,而且开启一个线程要比开启进程的速度快的多
大部分的需求都是io密集型,由于大部分的软件都是基于网络的
from threading import Thread from multiprocessing import Process import time def work(): x = 1+1 time.sleep(5) if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): t = Thread(target=work) t_list.append(t) t.start() for t in t_list: t.join() end = time.time() print("多线程",end-start) ################ 多线程 5.002916097640991
from multiprocessing import Process import time def work(): x = 1+1 time.sleep(5) if __name__ == '__main__': t_list = [] start = time.time() for i in range(4): p = Process(target=work) t_list.append(p) p.start() for p in t_list: p.join() end = time.time() print("多进程",end-start) ################# 多进程 5.1951446533203125
[TOC]
import queue q = queue.Queue() q.put('123') q.put('dd') print(q.get()) print(q.get()) ############### 123 dd
import queue q = queue.LifoQueue()#堆栈,后进先出 q.put('ocean') q.put('sku') q.put('chen') print(q.get()) print(q.get()) print(q.get()) ############## chen sku ocean
import queue q = queue.PriorityQueue() q.put((50,'chen'))#一般元组的第一个值是int类型 q.put((100,'sky')) q.put((1,'rocky')) print(q.get()) print(q.get()) print(q.get()) ############## (1, 'rocky') (50, 'chen') (100, 'sky')
import queue q = queue.PriorityQueue() q.put('chen') q.put('sky') q.put('rocky') print(q.get()) print(q.get()) print(q.get()) ################## chen rocky sky
import queue q = queue.PriorityQueue() q.put(50) q.put(100) q.put(1) print(q.get()) print(q.get()) print(q.get()) ############ 1 50 100
和time.sleep()不一样,线程定时器不会阻碍下面代码的执行,而time.sleep()会阻碍。
from threading import Timer import time def task(): print('线程执行了') time.sleep(2) print('线程结束了') t = Timer(4,task)###元组第一个数是多少秒开启线程 t.start() print("不会阻碍线程的执行") ######################## 不会阻碍线程的执行 线程执行了 线程结束了
服务端
import socket from threading import Thread def talk(conn): while True: try: msg = conn.recv(1024) if len(msg)==0:break conn.send(msg.upper()) except Exception: print('客户点关闭链接') break def server_demo(): server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(("127.0.0.1",8008)) server.listen(5) while True: conn,addr = server.accept() t = Thread(target=talk,args=(conn,)) t.start() if __name__ == '__main__': server_demo()
客户端
import socket from threading import Thread,currentThread def client_dome(): client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8008)) while True: msg = f"{currentThread().name}" if len(msg)==0:continue client.send(msg.encode('utf8')) feedback = client.recv(1024) print(feedback.decode('utf8')) client.close() if __name__ == '__main__': for i in range(20): t = Thread(target=client_dome) t.start()
进程池和线程池
何时限制?
当并发的任务数量远远大于计算机所能承受的范围的时候,即没法一次性开启过多的任务数量,就应该考虑去限制进程数或者线程数,从而保证服务器不会崩溃。(荡机)
同步:提交一个任务,必须等待任务执行完毕(或者最后拿到返回值),才能执行下一行代码
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{current_process().name}在执行任务{i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ProcessPoolExecutor(4)#池子里有四个进程 fu_list = [] for i in range(20): future = pool.submit(task,i)#task任务要作20次,4个线程负责作这些事情 print(future.result())#若是没有结果会一直等下去,致使了全部的任务都在串行 ################# Process-1在执行任务0 0 Process-2在执行任务1 1 Process-3在执行任务2 4 Process-4在执行任务3 9 Process-1在执行任务4 16 Process-2在执行任务5 25 Process-3在执行任务6 36 Process-4在执行任务7 49 Process-1在执行任务8 64 Process-2在执行任务9 81 Process-3在执行任务10 100 Process-4在执行任务11 121 Process-1在执行任务12 144 Process-2在执行任务13 169 Process-3在执行任务14 196 Process-4在执行任务15 225 Process-1在执行任务16 256 Process-2在执行任务17 289 Process-3在执行任务18 324 Process-4在执行任务19 361
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{current_process().name}在执行任务{i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ProcessPoolExecutor(4)#池子里有四个进程 fu_list = [] for i in range(20): future = pool.submit(task,i)#task任务要作20次,4个线程负责作这些事情 # print(future.result())#若是没有结果会一直等下去,致使了全部的任务都在串行 fu_list.append(future) pool.shutdown()#关闭了池的入口,会等待全部的任务执行完,结束阻塞. for fu in fu_list: print(fu.result()) ########################### Process-1在执行任务0 Process-2在执行任务1 Process-3在执行任务2 Process-4在执行任务3 Process-1在执行任务4 Process-2在执行任务5 Process-3在执行任务6 Process-4在执行任务7 Process-1在执行任务8 Process-2在执行任务9 Process-3在执行任务10 Process-4在执行任务11 Process-1在执行任务12 Process-2在执行任务13 Process-3在执行任务14 Process-4在执行任务15 Process-1在执行任务16 Process-2在执行任务17 Process-3在执行任务18 Process-4在执行任务19 0 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name}在执行任务{i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ThreadPoolExecutor(4)#池子里有四个线程 fu_list = [] for i in range(20): future = pool.submit(task,i)#task任务要作20次,4个线程负责作这些事情 print(future.result())#若是没有结果会一直等下去,致使了全部的任务都在串行 ########################## ThreadPoolExecutor-0_0在执行任务0 0 ThreadPoolExecutor-0_0在执行任务1 1 ThreadPoolExecutor-0_1在执行任务2 4 ThreadPoolExecutor-0_0在执行任务3 9 ThreadPoolExecutor-0_2在执行任务4 16 ThreadPoolExecutor-0_1在执行任务5 25 ThreadPoolExecutor-0_3在执行任务6 36 ThreadPoolExecutor-0_0在执行任务7 49 ThreadPoolExecutor-0_2在执行任务8 64 ThreadPoolExecutor-0_1在执行任务9 81 ThreadPoolExecutor-0_3在执行任务10 100 ThreadPoolExecutor-0_0在执行任务11 121 ThreadPoolExecutor-0_2在执行任务12 144 ThreadPoolExecutor-0_1在执行任务13 169 ThreadPoolExecutor-0_3在执行任务14 196 ThreadPoolExecutor-0_0在执行任务15 225 ThreadPoolExecutor-0_2在执行任务16 256 ThreadPoolExecutor-0_1在执行任务17 289 ThreadPoolExecutor-0_3在执行任务18 324 ThreadPoolExecutor-0_0在执行任务19 361
异步:提交一个任务,不须要等待执行完毕,能够直接执行下一行代码。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{current_process().name}在执行任务{i}') time.sleep(1) return i ** 2 def parse(future): print(future.result()) if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 池子里有四个进程 for i in range(20): future = pool.submit(task, i) # task任务要作20次,4个线程负责作这些事情 # print(future.result())#若是没有结果会一直等下去,致使了全部的任务都在串行 future.add_done_callback(parse) # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数, # 会把future对象做为参数传给函数 # 这个称之为回调函数,处理完了回来就调用这个函数. ####################### Process-1在执行任务0 Process-2在执行任务1 Process-3在执行任务2 Process-4在执行任务3 Process-1在执行任务4 0 Process-2在执行任务5 1 Process-3在执行任务6 4 Process-4在执行任务7 9 Process-1在执行任务8 16 Process-2在执行任务9 25 Process-3在执行任务10 36 Process-4在执行任务11 49 Process-1在执行任务12 64 Process-2在执行任务13 81 Process-3在执行任务14 100 Process-4在执行任务15 121 Process-1在执行任务16 144 Process-2在执行任务17 169 Process-3在执行任务18 196 Process-4在执行任务19 225 256 289 324 361
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name}在执行任务{i}') time.sleep(1) return i ** 2 def parse(future): print(future.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 池子里有四个线程 for i in range(20): future = pool.submit(task, i) # task任务要作20次,4个线程负责作这些事情 # print(future.result())#若是没有结果会一直等下去,致使了全部的任务都在串行 future.add_done_callback(parse) # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数, # 会把future对象做为参数传给函数 # 这个称之为回调函数,处理完了回来就调用这个函数. ############################## ThreadPoolExecutor-0_0在执行任务0 ThreadPoolExecutor-0_1在执行任务1 ThreadPoolExecutor-0_2在执行任务2 ThreadPoolExecutor-0_3在执行任务3 0 ThreadPoolExecutor-0_0在执行任务4 1 ThreadPoolExecutor-0_1在执行任务5 9 ThreadPoolExecutor-0_3在执行任务6 4 ThreadPoolExecutor-0_2在执行任务7 25 ThreadPoolExecutor-0_1在执行任务8 16 ThreadPoolExecutor-0_0在执行任务9 36 ThreadPoolExecutor-0_3在执行任务10 49 ThreadPoolExecutor-0_2在执行任务11 81 ThreadPoolExecutor-0_0在执行任务12 64 ThreadPoolExecutor-0_1在执行任务13 100 ThreadPoolExecutor-0_3在执行任务14 121 ThreadPoolExecutor-0_2在执行任务15 144 ThreadPoolExecutor-0_0在执行任务16 169 ThreadPoolExecutor-0_1在执行任务17 196 ThreadPoolExecutor-0_3在执行任务18 225 ThreadPoolExecutor-0_2在执行任务19 256 289 361 324
什么样的协程是有有意义的?
优势:
缺点:
对比多线程
本身要检测全部的io,只要有一个阻塞总体都会跟着阻塞
对比多进程
没法利用多核优点
为何要有协程?
本身控制切换的速度比操做系统控制切换的快,下降了单个线程的io时间。
import gevent import time def eat(): print("eat 1") gevent.sleep(2) print('eat 2') def play(): print("play 1") gevent.sleep(3) print("play 2") start = time.time() g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() end = time.time() print(end - start) ################### eat 1 play 1 eat 2 play 2 3.006275177001953
from gevent import monkey;monkey.patch_all()####为协程打补丁 # monkey.patch_all() import gevent,time def eat(): print('eat 1') time.sleep(2) print('eat 2') def play(): print('play 1 ') time.sleep(3) print('play 2') start = time.time() g1 = gevent.spawn(eat) g2 = gevent.spawn(play) g1.join() g2.join() end = time.time() print(end-start) ##################### eat 1 play 1 eat 2 play 2 3.004676580429077