人机矛盾: CPU利用率低css
磁带存储+批处理:下降数据的读取时间,提升CPU的利用率html
多道操做系统------在一个任务遇到IO的时候主动让出CPU,给其余任务使用python
多道技术: 1.产生背景:针对单核,实现并发 ps: 如今的主机通常是多核,那么每一个核都会利用多道技术 有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再从新调度,会被调度到4个 cpu中的任意一个,具体由操做系统调度算法决定。 2.空间上的复用:如内存中同时有多道程序 3.时间上的复用:复用一个cpu的时间片 强调:遇到io切,占用cpu时间过长也切,核心在于切以前将进程的状态保存下来,这样 才能保证下次切换回来时,能基于上次切走的位置继续运行
分时操做系统-------给时间分片,让多个任务轮流使用CPUlinux
每一个程序分配一个时间片,轮转使用CPU,切换须要时间,下降CPU利用率,提升用户体验ios
通用操做系统-------分时操做系统 + 多道操做系统 + 实时操做系统git
操做系统负责什么?redis
调度进程前后执行的顺序 控制执行的时间等等
资源的分配算法
进程:数据库
进程和程序的区别: 程序只是一个文件 进程是这个文件被CPU运行起来了
操做系统调度进程的算法:json
并行与并发:
就绪(Ready)状态
当进程已分配到除CPU之外的全部必要的资源,只要得到处理机即可当即执行,这时的进程状态称为就绪状态。
执行/运行(Running)
状态当进程已得到处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
阻塞(Blocked)状态
正在执行的进程,因为等待某个事件发生而没法执行时,便放弃处理机而处于阻塞状态。引发进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能知足、等待信件(信号)等。
同步异步:
所谓同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态能够保持一致。
所谓异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它是不可靠的任务序列。
进程:
# 建立进程 时间开销大 # 销毁进程 时间开销大 # 进程之间切换 时间开销大
线程:
线程是进程的一部分,每一个进程中至少有一个线程 能被CPU调度的最小单位 一个进程中的多个线程是能够共享这个进程的数据的 —— 数据共享 线程的建立、销毁、切换 开销远远小于进程 —— 开销小
进程:是计算机中最小的资源分配单位(进程是负责圈资源)
线程:是计算机中能被CPU调度的最小单位 (线程是负责执行具体代码的)
os.getpid()
:获取当前进程pid
os.getppid()
:获取父级进程pid,能够建立子进程,在pycharm中启动的全部py程序都是pycharm的子进程
import os
import time
from multiprocessing import Process
# multiprocessing多进程模块Process类
def func():
print('start',os.getpid())
time.sleep(1)
print('end',os.getpid())
if __name__ == '__main__':
p = Process(target=func) # 将函数封装到类,建立一个要开启func进程的对象
p.start() # 异步 调用开启进程的方法 可是并不等待这个进程真的开启
print('main :',os.getpid())
#main : 11436
#start 9860
#end 9860
操做系统建立进程的方式不一样 windows操做系统执行开启进程的代码 实际上新的子进程须要经过import父进程的代码来完成数据的导入工做 因此有一些内容咱们只但愿在父进程中完成,就写在if __name__ == '__main__':下面 ios linux操做系统建立进程 fork,拷贝的方式
主进程和子进程之间的关系
父进程会等待着全部的子进程结束以后才结束,为了回收资源
主进程代码执行完毕:
# 主进程负责回收子进程的资源
# 若是子进程执行结束,父进程没有回收资源,那么这个子进程会变成一个僵尸进程
# 主进程的结束逻辑
# 主进程的代码结束
# 全部的子进程结束
# 给子进程回收资源
# 主进程结束
# 主进程怎么知道子进程结束了的呢?
# 基于网络、文件
import time
from multiprocessing import Process
def send_mail():
time.sleep(3)
print('发送了一封邮件')
if __name__ == '__main__':
p = Process(target=send_mail)
p.start() # 异步 非阻塞
# time.sleep(5)
print('join start')
p.join() # 同步 阻塞 直到p对应的进程结束以后才结束阻塞
print('5000封邮件已发送完毕')
#join start
#发送了一封邮件
#5000封邮件已发送完毕
import time
import random
from multiprocessing import Process
def send_mail(a):
time.sleep(random.random())
print('发送了一封邮件',a)
if __name__ == '__main__':
l = []
for i in range(10):
p = Process(target=send_mail,args=(i,))#向子进程传参数,用元组
p.start()
l.append(p) #回收多个子进程资源,先添指列表,最后统一处理
for p in l:p.join()
# 阻塞 直到上面的十个进程都结束
print('5000封邮件已发送完毕')
发送了一封邮件 5
发送了一封邮件 4
发送了一封邮件 3
......
5000封邮件已发送完毕
补充:Windows开启进程,因为建立机制,必须采用此方式.
print([__name__])
if __name__ == '__main__':
# 控制当这个py文件被看成脚本直接执行的时候,就执行这里面的代码
# 当这个py文件被看成模块导入的时候,就不执行这里面的代码
print('hello hello')
# __name__ == '__main__'
# 执行的文件就是__name__所在的文件
# __name__ == '文件名'
# __name__所在的文件被导入执行的时候
守护进程
随着主进程的代码结束而结束的,全部的子进程都必须在主进程结束以前结束,由主进程来负责回收资源
p.daemon = True
其余方法:
p.is_alive() 判断进程是否活着 p.terminate() # 能够解释异步非阻塞, 关闭须要时间,并不等到返回结束进程结果,会变僵尸
def son1():
while True:
print('is alive')
time.sleep(0.5)
if __name__ == '__main__':
p = Process(target=son1)
p.start() # 异步 非阻塞
print(p.is_alive())
time.sleep(1)
p.terminate() # 异步的 非阻塞
print(p.is_alive()) # 进程还活着 由于操做系统还没来得及关闭进程
time.sleep(0.01)
print(p.is_alive()) # 操做系统已经响应了咱们要关闭进程的需求,再去检测的时候,获得的结果是进程已经结束了
使用面向对象方式开启进程
import os
import time
from multiprocessing import Process
class MyProcecss2(Process): #必须继承Process
def run(self): #必需要有run方法,重写process的run,start自动调用run
while True:
print('is alive')
time.sleep(0.5)
class MyProcecss1(Process):
def __init__(self,x,y): #传参数要定义init函数
self.x = x
self.y = y
super().__init__() #要导入父类的初始化参数
def run(self):
print(self.x,self.y,os.getpid())
for i in range(5):
print('in son2')
time.sleep(1)
if __name__ == '__main__':
mp = MyProcecss1(1,2)
mp.daemon = True
mp.start()
print(mp.is_alive())
mp.terminate()
# mp2 = MyProcecss2()
# mp2.start()
# print('main :',os.getpid())
# time.sleep(1)
Process操做进程的方法
# p.start() 开启进程 异步非阻塞 # p.terminate() 结束进程 异步非阻塞 # p.join() 同步阻塞 # p.isalive() 获取当前进程的状态 # daemon = True 设置为守护进程,守护进程永远在主进程的代码结束以后自动结束
锁
# 1.若是在一个并发的场景下,涉及到某部份内容 # 是须要修改一些全部进程共享数据资源 # 须要加锁来维护数据的安全 # 2.在数据安全的基础上,才考虑效率问题 # 3.同步存在的意义 # 数据的安全性 # 在主进程中实例化 lock = Lock() # 把这把锁传递给子进程 # 在子进程中 对须要加锁的代码 进行 with lock: # with lock至关于lock.acquire()和lock.release() # 在进程中须要加锁的场景 # 共享的数据资源(文件、数据库) # 对资源进行修改、删除操做 # 加锁以后可以保证数据的安全性 可是也下降了程序的执行效率
mport time
import json
from multiprocessing import Process,Lock
def search_ticket(user):
with open('ticket_count') as f:
dic = json.load(f)
print('%s查询结果 : %s张余票'%(user,dic['count']))
def buy_ticket(user,lock):
# with lock:
# lock.acquire() # 给这段代码加上一把锁
time.sleep(0.02)
with open('ticket_count') as f:
dic = json.load(f)
if dic['count'] > 0:
print('%s买到票了'%(user))
dic['count'] -= 1
else:
print('%s没买到票' % (user))
time.sleep(0.02)
with open('ticket_count','w') as f:
json.dump(dic,f)
# lock.release() # 给这段代码解锁
def task(user, lock):
search_ticket(user)
with lock:
buy_ticket(user, lock)
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=task,args=('user%s'%i,lock))
p.start()
进程之间通讯IPC
进程之间的通讯 - IPC(inter process communication) 第三方:redis,memcache,kafka,rabbitmq 特色:并发需求,高可用,断电保存数据,解耦
from multiprocessing import Queue,Process
# 先进先出
def func(exp,q):
ret = eval(exp)
q.put({ret,2,3})
q.put(ret*2)
q.put(ret*4)
if __name__ == '__main__':
q = Queue()
Process(target=func,args=('1+2+3',q)).start()
print(q.get())
print(q.get())
print(q.get())
# Queue基于 天生就是数据安全的
# 文件家族的socket pickle lock
# pipe 管道(不安全的) = 文件家族的socket pickle
# 队列 = 管道 + 锁
# from multiprocessing import Pipe
# pip = Pipe()
# pip.send()
# pip.recv()
import queue
# from multiprocessing import Queue
# q = Queue(5)
# q.put(1)
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5) # 当队列为满的时候再向队列中放数据 队列会阻塞
# print('5555555')
# try:
# q.put_nowait(6) # 当队列为满的时候再向队列中放数据 会报错而且会丢失数据
# except queue.Full:
# pass
# print('6666666')
#
# print(q.get())
# print(q.get())
# print(q.get()) # 在队列为空的时候会发生阻塞
# print(q.get()) # 在队列为空的时候会发生阻塞
# print(q.get()) # 在队列为空的时候会发生阻塞
# try:
# print(q.get_nowait()) # 在队列为空的时候 直接报错
# except queue.Empty:pass
什么是生产者消费者模型? # 把一个产生数据而且处理数据的过程解耦 # 让生产的数据的过程和处理数据的过程达到一个工做效率上的平衡 # 中间的容器,在多进程中咱们使用队列或者可被join的队列,作到控制数据的量 # 当数据过剩的时候,队列的大小会控制这生产者的行为 # 当数据严重不足的时候,队列会控制消费者的行为 # 而且咱们还能够经过按期检查队列中元素的个数来调节生产者消费者的个数
第一种方式:
import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
for i in range(10):
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生产了一个%s'%(name,food))
def consumer(q,name):
while True:
food = q.get()
if not food:break
time.sleep(random.randint(1,3))
print('%s吃了%s'%(name,food))
def cp(c_count,p_count):
q = Queue(10)
for i in range(c_count):
Process(target=consumer, args=(q, 'alex')).start()
p_l = []
for i in range(p_count):
p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
p1.start()
p_l.append(p1)
for p in p_l:p.join()
for i in range(c_count):
q.put(None)
if __name__ == '__main__':
cp(2,3)
流程:消费者开启进程get,生产者开启进程put,加入队列,所有结束后(jion),队列put(None),消费者get到空终止
第二种方式:
import time
import random
from multiprocessing import JoinableQueue,Process
def producer(q,name,food):
for i in range(10):
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生产了一个%s'%(name,food))
q.join()
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.random())
print('%s吃了%s'%(name,food))
q.task_done()
if __name__ == '__main__':
jq = JoinableQueue()
p =Process(target=producer,args=(jq,'wusir','泔水'))
p.start()
c = Process(target=consumer,args=(jq,'alex'))
c.daemon = True
c.start()
p.join()
JoinableQueue一样经过multiprocessing使用。
建立队列的另一个类:
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
参数介绍:
maxsize是队列中容许最大项数,省略则无大小限制。
方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
with lock:
dic['count'] -= 1
if __name__ == '__main__':
# m = Manager()
with Manager() as m:
l = Lock()
dic = m.dict({'count':100})
p_l = []
for i in range(100):
p = Process(target=func,args=(dic,l))
p.start()
p_l.append(p)
for p in p_l:p.join()
print(dic)
mulprocessing中有一个manager类,封装了全部和进程相关的 数据共享 数据传递相关的数据类型,可是对于 字典 列表这一类的数据操做的时候会产生数据不安全, 须要加锁解决问题,而且须要尽可能少的使用这种方式.
GIL锁:全局解释器锁,cpython解释器中特殊的垃圾回收机制,致使了在同一个进程中多个线程不能同时利用多核 —— python的多线程只能是并发不能是并行
因此使用所线程并不影响高io型的操做,只会对高计算型的程序由效率上的影响
主线程何时结束?等待全部子线程结束以后才结束 主线程若是结束了,主进程也就结束了
# multiprocessing 是彻底仿照这threading的类写的
from threading import Thread
def func():
print('start son thread')
# 启动线程 start
Thread(target=func).start()
# 开启多个子线程
def func(i):
print('start son thread',i)
time.sleep(1)
print('end son thread',i)
for i in range(10):
Thread(target=func,args=(i,)).start()
print('main')
# join方法 阻塞 直到子线程执行结束
import time
import os
from threading import Thread
def func(i):
print('start son thread',i)
time.sleep(1)
print('end son thread',i,os.getpid())
t_l = []
for i in range(10):
t = Thread(target=func,args=(i,))
t.start()
t_l.append(t)
for t in t_l:t.join()
print('子线程执行完毕')
# 使用面向对象的方式启动线程
class MyThread(Thread):
def __init__(self,i):
self.i = i
super().__init__()
def run(self):
print('start',self.i,self.ident)
time.sleep(1)
print('end',self.i)
for i in range(10):
t = MyThread(i)
t.start()
print(t.ident) #线程id
# 线程里的一些其余方法
from threading import current_thread,enumerate,active_count
def func(i):
t = current_thread() #当前线程对象
print('start son thread',i,t.ident)
time.sleep(1)
print('end son thread',i,os.getpid())
t = Thread(target=func,args=(1,))
t.start()
print(t.ident)
print(current_thread().ident) # 水性杨花 在哪个线程里,current_thread()获得的就是这个当前线程的信息
print(enumerate()) #活着的线程列表
print(active_count()) # =====len(enumerate())
terminate 结束进程,在线程中不能从主线程结束一个子线程
# 守护线程
import time
from threading import Thread
def son1():
while True:
time.sleep(0.5)
print('in son1')
def son2():
for i in range(5):
time.sleep(1)
print('in son2')
t =Thread(target=son1)
t.daemon = True
t.start()
Thread(target=son2).start()
time.sleep(3)
# 守护线程一直等到全部的非守护线程都结束以后才结束
# 除了守护了主线程的代码以外也会守护子线程
线程锁
即使是线程 即使有GIL 也会出现数据不安全的问题
# 1.操做的是全局变量 # 2.作一下操做 # += -= *= /+ 先计算再赋值才容易出现数据不安全的问题 # 包括 lst[0] += 1 dic['key']-=1
a = 0
def add_f(lock):
global a
for i in range(200000):
with lock:
a += 1
def sub_f(lock):
global a
for i in range(200000):
with lock:
a -= 1
from threading import Thread,Lock
lock = Lock()
t1 = Thread(target=add_f,args=(lock,))
t1.start()
t2 = Thread(target=sub_f,args=(lock,))
t2.start()
t1.join()
t2.join()
print(a)
加锁会影响程序的执行效率,可是保证了数据的安全
互斥锁是锁中的一种:在同一个线程中,不能连续acquire屡次
import time
from threading import Lock
class A:
__instance = None
lock = Lock()
def __new__(cls, *args, **kwargs):
with cls.lock:
if not cls.__instance:
time.sleep(0.1)
cls.__instance = super().__new__(cls)
return cls.__instance
def __init__(self,name,age):
self.name = name
self.age = age
def func():
a = A('alex', 84)
print(a)
from threading import Thread
for i in range(10):
t = Thread(target=func)
t.start()
递归锁
from threading import RLock
# rlock = RLock()
# rlock.acquire()
# print('*'*20)
# rlock.acquire()
# print('-'*20)
# rlock.acquire()
# print('*'*20)
优势:在同一个线程中,能够连续acuqire屡次不会被锁住
缺点:占用了更多资源
死锁现象:在某一些线程中出现陷入阻塞而且永远没法结束阻塞的状况就是死锁现象
1.多把锁+交替使用
2.互斥锁在一个线程中连续acquire
避免方法:在一个线程中只有一把锁,而且每一次acquire以后都要release
解决方法:能够用递归锁解决,也能够经过优化代码逻辑解决.
import time
from threading import RLock,Thread
# noodle_lock = RLock()
# fork_lock = RLock()
noodle_lock = fork_lock = RLock()
print(noodle_lock,fork_lock)
def eat1(name,noodle_lock,fork_lock):
noodle_lock.acquire()
print('%s抢到面了'%name)
fork_lock.acquire()
print('%s抢到叉子了' % name)
print('%s吃了一口面'%name)
time.sleep(0.1)
fork_lock.release()
print('%s放下叉子了' % name)
noodle_lock.release()
print('%s放下面了' % name)
def eat2(name,noodle_lock,fork_lock):
fork_lock.acquire()
print('%s抢到叉子了' % name)
noodle_lock.acquire()
print('%s抢到面了'%name)
print('%s吃了一口面'%name)
time.sleep(0.1)
noodle_lock.release()
print('%s放下面了' % name)
fork_lock.release()
print('%s放下叉子了' % name)
lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()
互斥锁解决
import time
from threading import Lock,Thread
lock = Lock()
def eat1(name,noodle_lock,fork_lock):
lock.acquire()
print('%s抢到面了'%name)
print('%s抢到叉子了' % name)
print('%s吃了一口面'%name)
time.sleep(0.1)
print('%s放下叉子了' % name)
print('%s放下面了' % name)
lock.release()
def eat2(name,noodle_lock,fork_lock):
lock.acquire()
print('%s抢到叉子了' % name)
print('%s抢到面了'%name)
print('%s吃了一口面'%name)
time.sleep(0.1)
print('%s放下面了' % name)
print('%s放下叉子了' % name)
lock.release()
lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()
先进先出队列
from queue import Queue
后进先出队列---栈
from queue import LifoQueue
优先级队列
自动的排序 抢票的用户级别 100000 100001 告警级别
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((10,'alex'))
pq.put((6,'wusir'))
pq.put((20,'yuan'))
print(pq.get())
print(pq.get())
预先的开启固定个数的进程数,当任务来临的时候,直接提交给已经开好的进程,让这个进程去执行就能够了,节省了进程,线程的开启关闭的切换时间,而且减轻了操做系统调度的负担.
开启步骤
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
# 建立一个池子
tp = ThreadPoolExecutor(池中线程/进程的个数)
# 异步提交任务
ret = tp.submit(函数,参数1,参数2....)
# 获取返回值
ret.result()
# 在异步的执行完全部任务以后,主线程/主进程才开始执行的代码
tp.shutdown() 阻塞 直到全部的任务都执行完毕
# 关闭池以后就不能继续提交任务,而且会阻塞,直到已经提交的任务完成
# map方法
ret = tp.map(func,iterable) #迭代获取iterable中的内容,做为func的参数,让子线程来执行对应的任务
for i in ret: 每个都是任务的返回值
# 回调函数
ret.add_done_callback(函数名)
# 要在ret对应的任务执行完毕以后,直接继续执行add_done_callback绑定的函数中的内容,而且ret的结果会做为参数返回给绑定的函数
带参数及返回值
def func(i,name):
print('start',os.getpid())
time.sleep(random.randint(1,3))
print('end', os.getpid())
return '%s * %s'%(i,os.getpid())
if __name__ == '__main__':
p = ProcessPoolExecutor(5)
ret_l = []
for i in range(10):
ret = p.submit(func,i,'alex')
ret_l.append(ret)
for ret in ret_l:
print('ret-->',ret.result()) # ret.result() 同步阻塞
print('main',os.getpid())
回调函数
import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
res = requests.get(url)
return {'url':url,'content':res.text}
def parserpage(ret): #必须有参数
dic = ret.result()
print(dic)
tp = ThreadPoolExecutor(5)
url_lst = [
'http://www.baidu.com', # 3
'http://www.cnblogs.com', # 1
'http://www.douban.com', # 1
'http://www.tencent.com',
'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
ret_l = []
for url in url_lst:
ret = tp.submit(get_page,url)
ret_l.append(ret)
ret.add_done_callback(parserpage)
回调函数add_done_callback # 执行完子线程任务以后直接调用对应的回调函数 # 爬取网页 须要等待数据传输和网络上的响应高IO的 -- 子线程 # 分析网页 没有什么IO操做 -- 这个操做不必在子线程完成,交给回调函数
是单独开启线程进程仍是池? # 若是只是开启一个子线程作一件事情,就能够单独开线程 # 有大量的任务等待程序去作,要达到必定的并发数,开启线程池 # 根据你程序的io操做也能够断定是用池仍是不用池? # socket的server 大量的阻塞io recv recvfrom socketserver # 爬虫的时候 池
池的总结
hreadPoolExecutor中的几个经常使用方法 # tp = ThreadPoolExecutor(cpu*5) # obj = tp.submit(须要在子线程执行的函数名,参数) # obj # 1.获取返回值 obj.result() 是一个阻塞方法 # 2.绑定回调函数 obj.add_done_callback(子线程执行完毕以后要执行的代码对应的函数) # ret = tp.map(须要在子线程执行的函数名,iterable) # 1.迭代ret,老是能获得全部的返回值 # shutdown # tp.shutdown()
进程和线程中的锁
# 全部在线程中能工做的基本都不能在进程中工做 # 在进程中可以使用的基本在线程中也可使用
在多进程中启动多线程
在多进程里启动多线程
import os
from multiprocessing import Process
from threading import Thread
def tfunc():
print(os.getpid())
def pfunc():
print('pfunc-->',os.getpid())
Thread(target=tfunc).start()
if __name__ == '__main__':
Process(target=pfunc).start()
# 协程: # 用户级别的,由咱们本身写的python代码来控制切换的 # 是操做系统不可见的 # 在Cpython解释器下 - 协程和线程都不能利用多核,都是在一个CPU上轮流执行 # 因为多线程自己就不能利用多核 # 因此即使是开启了多个线程也只能轮流在一个CPU上执行 # 协程若是把全部任务的IO操做都规避掉,只剩下须要使用CPU的操做 # 就意味着协程就能够作到题高CPU利用率的效果 # 多线程和协程 # 线程 切换须要操做系统,开销大,操做系统不可控,给操做系统的压力大 # 操做系统对IO操做的感知更加灵敏 # 协程 切换须要python代码,开销小,用户操做可控,彻底不会增长操做系统的压力 # 用户级别可以对IO操做的感知比较低
gevent模块开启协程
import time
print('-->',time.sleep)
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
print('wusir is eating')
print('in eat: ')
time.sleep(1) #遇到阻塞让出CPU
return 'wusir finished eat'
def sleep():
print('小马哥 is sleeping')
time.sleep(1)
print('小马哥 finished sleep')
g_l=[]
for i in range(10): # 创造十个协程任务
g1 = gevent.spawn(eat)
g_l.append(g1)
g2 = gevent.spawn(sleep) # 创造一个协程任务
g2.join() # 阻塞 直到g1任务完成为止
gevent.joinall(g_l) #jionall后面加包含gevent对象的列表
for i in g_l:
print(i.value) #value取值
asyncio模块
# 起一个任务
# async def demo(): # 协程方法
# print('start')
# await asyncio.sleep(1) # 阻塞
# print('end')
#
# loop = asyncio.get_event_loop() # 建立一个事件循环
# loop.run_until_complete(demo()) # 把demo任务丢到事件循环中去执行
# 启动多个任务,而且没有返回值
# async def demo(): # 协程方法
# print('start')
# await asyncio.sleep(1) # 阻塞
# print('end')
#
# loop = asyncio.get_event_loop() # 建立一个事件循环
# wait_obj = asyncio.wait([demo(),demo(),demo()])
# loop.run_until_complete(wait_obj)
# 启动多个任务而且有返回值
# async def demo(): # 协程方法
# print('start')
# await asyncio.sleep(1) # 阻塞
# print('end')
# return 123
#
# loop = asyncio.get_event_loop()
# t1 = loop.create_task(demo())
# t2 = loop.create_task(demo())
# tasks = [t1,t2]
# wait_obj = asyncio.wait([t1,t2])
# loop.run_until_complete(wait_obj)
# for t in tasks:
# print(t.result())
# 谁先回来先取谁的结果
# import asyncio
# async def demo(i): # 协程方法
# print('start')
# await asyncio.sleep(10-i) # 阻塞
# print('end')
# return i,123
#
# async def main():
# task_l = []
# for i in range(10):
# task = asyncio.ensure_future(demo(i))
# task_l.append(task)
# for ret in asyncio.as_completed(task_l):
# res = await ret
# print(res)
#
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# import asyncio
#
# async def get_url():
# reader,writer = await asyncio.open_connection('www.baidu.com',80)
# writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
# all_lines = []
# async for line in reader:
# data = line.decode()
# all_lines.append(data)
# html = '\n'.join(all_lines)
# return html
#
# async def main():
# tasks = []
# for url in range(20):
# tasks.append(asyncio.ensure_future(get_url()))
# for res in asyncio.as_completed(tasks):
# result = await res
# print(result)
#
#
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main()) # 处理一个任务