进程是cpu资源分配的最小单位,进程是正在进行的一个过程或者说一个任务。
线程是cpu调度的最小单位。
协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。(单线程下的并发)
进程线程区别:python
子进程结束后,子进程的状态信息仍然保存在系统中,直到父进程结束。(全部进程都会经历僵尸进程)mysql
父进程已经退出,但它的一个或多个子进程还在运行, 这些进程就会成为孤儿进程,由init进程收养。sql
守护进程会在主进程代码执行结束后就终止。安全
方式一:多线程
import time from multiprocessing import Process def task(name): # 子进程执行的任务 print('%s is running' % name) time.sleep(3) print('%s is done' % name) if __name__ == '__main__': # 实例化获得对象 p1 = Process(target=task, args=('子进程1',)) # 必须加逗号 p2 = Process(target=task, kwargs={'name': '子进程2'}) # 调用对象下的方法,开启进程 p1.start() # 仅是给操做系统发送信号, 由操做系统开启子进程 p2.start() print('主')
方式二:并发
import time from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): # 其实是start,可是必须叫run print('%s is running' % self.name) time.sleep(3) print('%s is done' % self.name) if __name__ == '__main__': p1 = MyProcess('子进程1') p2 = MyProcess('子进程2') p1.start() # 启动进程,并调用该子进程中的p.run() p2.start() print('主')
进程名:print(p.name)
进程PID: print(os.getpid)
print(p.pid)
父进程PID:print(os.getppid)
app
等待进程运行结束:p.join()
查看进程是否在运行:print(p.is_alive())
终止进程:p.terminate()
进程结束时间由操做系统决定。
终止进程但不进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程;若是p还保存了一个锁那么也将不会被释放,进而致使死锁。
开启守护进程:p.daemon = True
必须在p.start(
)以前设置,守护进程将被禁止建立子进程。dom
例子:异步
from multiprocessing import Process, Lock import time def task(name, lock): lock.acquire() print('%s 1' % name) time.sleep(1) print('%s 2' % name) time.sleep(1) print('%s 3' % name) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=task, args=('进程%s' % i, lock)) p.start()
例子:socket
from multiprocessing import Queue que = Queue(3) que.put(1) que.put({'a': 'hello'}) que.put([1, 2, 3]) print(que.full()) # 判断队列是否已满 # que.put(4) #再放就阻塞住了 print(que.get()) print(que.get()) print(que.get()) print(que.empty()) # 队列是否空了 # print(que.get()) # 再取就阻塞住了
例子:
from multiprocessing import Process, JoinableQueue import time import random def producer(q, name, food): for i in range(2): res = '%s%s' % (food, i) time.sleep(random.randint(1, 3)) print('\033[34m%s 生产了 %s\033[0m' % (name, res)) q.put(res) # 入队 q.join() # 等到消费者把队列中的全部的数据都取走以后,生产者才结束 def consumer(q, name): while True: res = q.get() # 出队 time.sleep(random.randint(1, 3)) print('\033[32m%s 吃 %s\033[0m' % (name, res)) q.task_done() # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了 if __name__ == '__main__': que = JoinableQueue() # 3个生产者 p1 = Process(target=producer, args=(que, '生产者1', '包子')) p2 = Process(target=producer, args=(que, '生产者2', '饺子')) p3 = Process(target=producer, args=(que, '生产者3', '月饼')) # 2个消费者 c1 = Process(target=consumer, args=(que, '消费者1')) c2 = Process(target=consumer, args=(que, '消费者2')) c1.daemon = True # 将消费者设为守护进程 c2.daemon = True # 消费者在生产者结束后, 随主进程一块儿结束 l1 = [p1, p2, p3] l2 = [c1, c2] # 开始生产 for p in l1: p.start() # 开始消费 for c in l2: c.start() # 主进程等生产者p一、p二、p3结束 # 而p一、p二、p3在消费者把全部数据都取干净以后结束 for p in l1: p.join() print('主')
使用greenlet
库:
from greenlet import greenlet def eat(name): print('%s eat 1' % name) g2.switch('egon') # 传入第一次参数, 以后不用再传 print('%s eat 2' % name) g2.switch() def play(name): print('%s play 1' % name) g1.switch() print('%s play 2' % name) g1 = greenlet(eat) g2 = greenlet(play) g1.switch('egon') # 在第一次switch时传入参数,之后都不须要
使用gevent
库 (没法识别socket
time
等模块的阻塞,须要使用gevent自带的阻塞):
import gevent def eat(name): print('%s eat 1' % name) gevent.sleep(2) # gevent识别到阻塞,进行切换 print('%s eat 2' % name) def play(name): print('%s play 1' %name) gevent.sleep(1) print('%s play 2' %name) g1 = gevent.spawn(eat, 'egon') g2 = gevent.spawn(play, name='egon') g1.join() g2.join() # gevent.joinall([g1,g2]) # 等待列表中全部协程对象结束 print('主')
导入monkey
可识别socket
time
等阻塞:
from gevent import monkey; monkey.patch_all() # patch_all()必须放在导入socket、time等模块前,不然gevent没法识别socket、time的阻塞 import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1 = gevent.spawn(eat) g2 = gevent.spawn(play) gevent.joinall([g1, g2]) print('主')
导入模块:
from concurrent.futures import ProcessPoolExecutor # 进程池 from concurrent.futures import ThreadPoolExecutor # 线程池
建立进程/线程池:
executor = ProcessPoolExecutor(max_worker=3) # 进程池 executor = ThreadPoolExecutor(max_workers=3) #线程池
将进程/线程放入池内:
future = exector.submit(task, parm) """ executor.map(task, range(1,12)) 至关于: for i in range(11): exector.submit(task, i) """
关闭进程/线程池:
exector.shutdown() # 默认wait参数为True
wait=True
等待池内全部任务执行完毕回收完资源后再执行后续代码。
wait=False
不join,直接执行后续代码。
关闭进程/线程池后,不容许再向已关闭的进程/线程池内加入进程/线程。
拿到进程/线程运行结果:
print(future.result())
回调函数:
future.add_done_callback(func)
: future
的task
结束后,会自动把future对象当作参数传给回调函数func。
例子:
from concurrent.futures import ProcessPoolExecutor import time import random def task(name): print("%s is running" % name) time.sleep(random.randint(3, 5)) return name def func(future): name = future.result() print("%s's callback function" % name) if __name__ == '__main__': executor = ProcessPoolExecutor(5) futures = [] for i in range(3): future = executor.submit(task, "task"+str(i)) future.add_done_callback(func) futures.append(future) executor.shutdown(True) print("主")
阻塞程度:阻塞IO>非阻塞IO>多路转接IO>信号驱动IO>异步IO,效率是由低到高。
信号量也是一把锁,但同一时间能够被指定大小的任务获取。
模块导入:
from threading import Semaphore
设置计数器大小:
sm = Semaphore(value=1) # 计数器大小默认为1
两个主要的方法:
acquire() # 内置计数器-1, 当计数器为0时阻塞,等待其余线程调用release() release() # 内置计数器+1
例子:
from threading import Thread, Semaphore import threading import time def func(): sm.acquire() print('%s get sm' % threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm = Semaphore(5) for _ in range(23): t = Thread(target=func) t.start()
Event对象:
用于线程间通讯,即程序中的其一个线程须要经过判断某个线程的状态来肯定本身下一步的操做,就用到了Event对象。
模块导入:
from threading import Event
建立Event对象:
event = Event()
相关方法:
event.isSet() # 返回event的状态值 event.wait([maxtime]) # 若是 event.isSet() == False将阻塞线程, [maxtime]-超时时间 event.set() # 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度 event.clear() # 恢复event的状态值为False
例子:
from threading import Thread, Event import threading import time import random def conn_mysql(): count = 1 while not event.is_set(): if count > 3: print('\033[31m[%s]链接失败\033[0m' % threading.current_thread().getName()) exit(0) print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count)) event.wait(1) count += 1 print('<%s>链接成功' % threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(1, 4)) event.set() if __name__ == '__main__': event = Event() conn1 = Thread(target=conn_mysql) conn2 = Thread(target=conn_mysql) check = Thread(target=check_mysql) conn1.start() conn2.start() check.start()
定时器Timer:指定n秒后执行某项操做(不会像sleep同样阻塞)
例子:
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) // 一秒后执行hello函数 t.start()
死锁现象: 指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,这种永远在互相等待的进程称为死锁进程。
递归锁: RLock
能够连续acquire屡次。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。
例子:
from threading import Thread, RLock import time mutexA = mutexB = RLock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A锁\033[0m' % self.name) mutexB.acquire() print('\033[42m%s 拿到B锁\033[0m' % self.name) mutexB.release() print('\033[41m%s 释放A锁\033[0m' % self.name) mutexA.release() print('\033[42m%s 释放B锁\033[0m' % self.name) def func2(self): mutexB.acquire() print('\033[43m%s 拿到B锁\033[0m' % self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A锁\033[0m' % self.name) mutexA.release() print('\033[43m%s 释放B锁\033[0m' % self.name) mutexB.release() print('\033[44m%s 释放A锁\033[0m' % self.name) if __name__ == '__main__': for _ in range(4): t = MyThread() t.start()
cpython中引进GIL,保证同一时刻同一进程中只有一个线程被执行,获取锁并获取资源,避免了多线程并发执行,保证了线程的安全,但没法使用多核优点。
结论:
多线程用于IO密集型
多进程用于计算密集型