线程是最小的工做单位,一个应用程序至少有一个进程,一个进程至少有一个线程。
应用场景:
IO密集型:使用线程
计算密集型:使用进程
GIL: 全局解释器锁,保证同一进程中只有一个线程同时被调度。python
线程的基本使用:git
def task(arg): time.sleep(arg) print(arg) for i in range(5): t = threading.Thread(target=task,args=[i,]) # t.setDaemon(True) # 主线程终止,不等待子线程 # t.setDaemon(False) t.start() # t.join() # 一直等 # t.join(1) # 等待最大时间
锁:
1.只有一我的使用锁:github
import threading import time lock = threading.Lock() v =10 def task(arg): time.sleep(1) lock.acquire() global v v -= 1 print(v) lock.release() for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start()
2.多人使用锁:并发
lock = threading.BoundedSemaphore(3) v =10 def task(arg): lock.acquire() time.sleep(1) global v v -= 1 print(v) lock.release() for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start()
3.事件锁(全部人解脱锁的限制)app
lock = threading.Event() def task(arg): time.sleep(1) lock.wait() print(arg) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() while True: value = input(">>>") if value == '1': lock.set()
4.自定义解锁个数(为所欲为锁)socket
lock = threading.Condition() def task(arg): time.sleep(1) lock.acquire() lock.wait() print("线程:%s" % arg) lock.release() for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() while True: value = input(">>") lock.acquire() lock.notify(int(value)) lock.release()
线程池:
模式一: 直接处理ui
def task(url): """ 任务执行两个操做:下载;保存本地 """ # response中封装了Http请求响应的全部数据 # - response.url 请求的URL # - response.status_code 响应状态码 # - response.text 响应内容(字符串格式) # - response.content 响应内容(字节格式) # 下载 response = requests.get(url) # 下载内容保存至本地 f = open('a.log','wb') f.write(response.content) f.close() pool = ThreadPoolExecutor(2) url_list = [ 'http://www.oldboyedu.com', 'http://www.autohome.com.cn', 'http://www.baidu.com', ] for url in url_list: print('开始请求',url) # 去链接池中获取连接 pool.submit(task,url)
模式二:分步处理url
def save(future): """ 只作保存 # future中包含response """ response = future.result() # 下载内容保存至本地 f = open('a.log','wb') f.write(response.content) f.close() def task(url): """ 只作下载 requests """ # response中封装了Http请求响应的全部数据 # - response.url 请求的URL # - response.status_code 响应状态码 # - response.text 响应内容(字符串格式) # - response.content 响应内容(字节格式) # 下载 response = requests.get(url) return response pool = ThreadPoolExecutor(2) url_list = [ 'http://www.oldboyedu.com', 'http://www.autohome.com.cn', 'http://www.baidu.com', ] for url in url_list: print('开始请求',url) # 去链接池中获取连接 # future中包含response future = pool.submit(task,url) # 下载成功后,自动调用save方法 future.add_done_callback(save)
进程的基本使用:spa
from multiprocessing import Process import time def task(arg): time.sleep(arg) print(arg) if __name__ == '__main__': for i in range(10): p = Process(target=task,args=(i,)) p.daemon = True # p.daemon = False p.start() p.join(1) print('主进程最后...')
进程之间的数据共享:经过Array(‘类型’,长度) 或者Manager().list() / Manager().dict()线程
from multiprocessing import Process,Array,Manager from threading import Thread ''' # 验证进程之间数据不共享 def task(num,li): li.append(num) print(li) if __name__ == "__main__": v = [] for i in range(10): p = Process(target=task,args=(i,v,)) p.start() ''' ''' # 进程间数据共享方式一: def task(num,li): li[num] = 1 print(list(li)) if __name__ == "__main__": v = Array('i',10) for i in range(10): p = Process(target=task,args=(i,v,)) p.start() ''' # 进程间数据共享方式二:经过socket def task(num,li): li.append(num) print(li) if __name__ == '__main__': v = Manager().list() # v = Manager().dict() for i in range(10): p = Process(target=task,args=(i,v,)) p.start() p.join()
进程池的使用:
from concurrent.futures import ProcessPoolExecutor def call(arg): data = arg.result() print(data) def task(arg): return arg + 100 if __name__ == "__main__": pool = ProcessPoolExecutor(10) for i in range(100): obj = pool.submit(task,i) obj.add_done_callback(call)
协程永远是一个进程在执行,是对线程的分片处理。
greenlet: python自带的协程模块
# 协程 from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() # 根据协程二次开发:协程+IO from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): response = requests.get(url) print(response.url,response.status_code) gevent.joinall([ gevent.spawn(f, 'http://www.oldboyedu.com/'), gevent.spawn(f, 'http://www.baidu.com/'), gevent.spawn(f, 'http://github.com/'), ])
IO多路复用,用于监听多个socket对象是否变化(可读,可写,发送错误)
import socket import select # IO多路复用:8002,8001 # ############### 基于select实现服务端的“伪”并发 ############### sk1 = socket.socket() sk1.bind(('127.0.0.1',8001,)) sk1.listen(5) sk2 = socket.socket() sk2.bind(('127.0.0.1',8002,)) sk2.listen(5) inputs = [sk1,sk2,] w_inputs = [] while True: # IO多路复用,同时监听多个socket对象 # - select,内部进行循环操做(1024) 主动查看 # - poll, 内部进行循环操做 主动查看 # - epoll, 被动告知 r,w,e = select.select(inputs,w_inputs,inputs,0.05) # r = [sk2,] # r = [sk1,] # r = [sk1,sk2] # r = [] # r = [conn,] # r = [sk1,Wconn] #######? for obj in r: if obj in [sk1,sk2]: # 新链接捡来了... print('新链接来了:',obj) conn,addr = obj.accept() inputs.append(conn) else: # 有链接用户发送消息来了.. print('有用户发送数据了:',obj) try: data = obj.recv(1024) except Exception as ex: data = "" if data: w_inputs.append(obj) # obj.sendall(data) else: obj.close() inputs.remove(obj) w_inputs.remove(obj) for obj in w: obj.sendall(b'ok') w_inputs.remove(obj)
模拟socketserver:
import socket import select import threading def process_request(conn): while True: v = conn.recv(1024) conn.sendall(b'1111') sk1 = socket.socket() sk1.bind(('127.0.0.1',8001,)) sk1.listen(5) inputs=[sk1,] while True: # IO多路复用,同时监听多个socket对象 # - select,内部进行循环操做(1024) 主动查看 # - poll, 内部进行循环操做 主动查看 # - epoll, 被动告知 r,w,e = select.select(inputs,[],inputs,0.05) for obj in r: if obj in sk1: # conn客户端的socket conn,addr = obj.accept() t = threading.Thread(target=process_request,args=(conn,)) t.start()