[TOC]python
程序:例如XXXX.py这是程序,处于静态的。数据库
进程:一个程序运行起来后,代码+用到的资源称之为进程,它是操做系统分配资源的基本单元。缓存
在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;
在当代面向线程设计的计算机结构中,进程是线程的容器。bash
所谓同步就是一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。
所谓异步是不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务没法肯定,因此它是不可靠的任务序列。多线程
阻塞和非阻塞跟同步和异步无关,主要与程序等待消息通知时的状态有关。也就是说阻塞与非阻塞主要是从程序(线程)等待消息通知时的状态角度来说的。并发
1)并行,parallel 强调同一时刻同时执行
2)并发 concurrency 则指的一个时间段内去一块儿执行app
就绪态:运行的条件都已经慢去,正在等在cpu执行
执行态:cpu正在执行其功能
等待态:等待某些条件知足,例如一个程序sleep了,此时就处于等待态dom
multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来表明一个进程对象,这个对象能够理解为是一个独立的进程,能够执行另外的事情异步
from multiprocessing import Process import time def run_process(): while True: print("子进程----2----") time.sleep(1) if __name__=='__main__': p = Process(target=run_process) # target指定目标函数 p.start() while True: print("主进程----1----") time.sleep(1)
Process语法:
Process([group [, target [, name [, args [, kwargs]]]]])async
参数--------------------------
target:若是传递了函数的引用,能够任务这个子进程就执行这里的代码
args:给target指定的函数传递的参数,以元组的方式传递
kwargs:给target指定的函数传递命名参数
name:给进程设定一个名字,能够不设定
group:指定进程组,大多数状况下用不到
Process建立的实例对象的经常使用方法:
方法--------------------------
start():启动子进程实例(建立子进程)
is_alive():判断进程子进程是否还在活着
join([timeout]):是否等待子进程执行结束,或等待多少秒
terminate():无论任务是否完成,当即终止子进程
Process建立的实例对象的经常使用属性:
属性-------------------------
name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
pid:当前进程的pid(进程号)
from multiprocessing import Process import time import os def run_process(): while True: print("子进程----pid:{}----".format(os.getpid())) print() time.sleep(1) if __name__=='__main__': p = Process(target=run_process) p.start() while True: print("主进程----pid:{}----".format(os.getpid())) time.sleep(1)
from multiprocessing import Process import time import os def run_process(course, teacher, *args, **kwargs): while True: print("子进程----pid:{}----{}上{}课".format(os.getpid(), teacher, course)) print() time.sleep(1) if __name__=='__main__': p = Process(target=run_process, args=('语文',), kwargs={'teacher':'张三'}) p.start() while True: print("主进程----pid:{}----{}上{}课".format(os.getpid(),'李四','数学')) time.sleep(1)
from multiprocessing import Process import time import os num_list = [0 , 1, 3, 4, 5, 6, 7, 8, 9, 10] i = 3 def run_process1(): global i while i: print("子进程----pid:{}----".format(os.getpid())) num_list.pop() print(num_list) i = i - 1 time.sleep(1) def run_process2(): global i while i: print("子进程----pid:{}----".format(os.getpid())) num_list.append(i+1) print(num_list) i = i - 1 time.sleep(1) if __name__=='__main__': p = Process(target=run_process1) p.start() p = Process(target=run_process2) p.start()
输出
子进程----pid:10187---- [0, 1, 3, 4, 5, 6, 7, 8, 9] 子进程----pid:10188---- [0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4] 子进程----pid:10187---- [0, 1, 3, 4, 5, 6, 7, 8] 子进程----pid:10188---- [0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3] 子进程----pid:10187---- [0, 1, 3, 4, 5, 6, 7] 子进程----pid:10188---- [0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3, 2]
可使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue自己是一个消息列队程序。
示例
from multiprocessing import Process, Queue import time, random def worker(q): """完成文件""" for i in range(10): file_num = random.randint(1, 100) print('已完成工做{}...'.format(file_num)) q.put(file_num) time.sleep(1) def boss(q): """查看文件""" while True: if not q.empty(): file_num = q.get(True) print('已查看工做{}...'.format(file_num)) time.sleep(1) else: break if __name__=='__main__': # 建立Queue,并传给各个子进程: q = Queue(5) pw = Process(target=worker, args=(q,)) pb = Process(target=boss, args=(q,)) pw.start() # pw.join() pb.start() # pb.join()
当须要建立的子进程数量很少时,能够直接利用multiprocessing中的Process动态成生多个进程,但若是是上百甚至上千个目标,手动的去建立进程的工做量巨大,此时就能够用multiprocessing模块提供的Pool方法。
初始化Pool时,能够指定一个最大进程数,当有新的请求提交到Pool中时,若是池尚未满,那么就会建立一个新的进程用来执行该请求;但若是池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用以前的进程来执行新的任务。
示例
from multiprocessing import Pool import os, time, random def worker(msg): t_start = time.time() print("start pro {},pid为{}".format(msg,os.getpid())) # random.random()随机生成0~1之间的浮点数 time.sleep(random.random()*2) t_stop = time.time() print(msg,"执行完毕,耗时{:.2f}" .format(t_stop-t_start)) # 定义一个进程池,最大进程数3 po = Pool(5) for i in range(0,10): # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,)) # 每次循环将会用空闲出来的子进程去调用目标 po.apply_async(worker,(i,)) print("----start----") time.sleep(10) po.close() # 关闭进程池,关闭后po再也不接收新的请求 po.join() # 等待po中全部子进程执行完成,必须放在close语句以后 print("-----end-----")
使用Pool建立进程,就须要使用multiprocessing.Manager()中的Queue()
示例
from multiprocessing import Pool, Manager import time, random def worker(q): """完成文件""" for i in range(10): file_num = random.randint(1, 100) print('已完成工做{}...'.format(file_num)) q.put(file_num) time.sleep(1) def boss(q): """查看文件""" while True: if not q.empty(): file_num = q.get(True) print('已查看工做{}...'.format(file_num)) time.sleep(1) else: break if __name__=='__main__': # 建立Queue,并传给各个子进程: q = Manager().Queue() po = Pool() po.apply_async(worker, (q,)) time.sleep(1) # 先让上面的任务向Queue存入数据,而后再让下面的任务开始从中取数据 po.apply_async(boss, (q,)) po.close() po.join()
如今操做系统提出进程的概念,每个进程都认为本身独占全部的计算机硬件资源。
进程就是独立的王国,进程间不能够随便的共享数据。
线程就是省份,同一个进程内的线程能够共享进程的资源,每个线程都拥有本身独立的堆栈。
线程一样有着相似进程的状态
1)运行态:该时刻,该线程正在占用CPU
2)就绪态:可随时转换为运行态,由于其余线程正在运行而暂停,该进程不占用CPU
3)阻塞态:除非某些外部事件发生,不然线程不能运行
Python线程的操做可使用threading模块,threading模块是对更底层thread作了一些包装的,能够更加方便的被使用。
Thread类:
def __init__(self, group=None, target=Nonoe, name=None, args=(), kwargs=None, daemon=None)
target 线程调用的对象,就是目标函数
name 为线程起个名字(能够重名,由于线程区分靠ID,不靠名字)
args,为目标函数传递实参,元组
kwargs, 为目标函数关键字传参,字典
threading的属性和方法
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处于alive状态的线程个数
enumerate() 返回全部活着的线程的列表,不包括已经终止的和未开始的线程
get_ident() 返回当前线程的ID,非0整数
Thread实例的属性和方法
name 只是一个名字
ident 线程ID
is_alive() 返回线程是否活着
start() 启动线程,每个线程必须且只能执行该方法一次
run() 运行线程函数
import threading import time def worker(): for _ in range(10): time.sleep(0.5) print('start') print(threading.get_ident()) # 返回当前线程对象线程id print('Thread over') t = threading.Thread(target=worker) t.start()
import threading import time def finish_working(): for i in range(5): print("线程:{} --完成工做加{}".format(threading.currentThread(), i)) print(threading.current_thread()) time.sleep(1) if __name__ == "__main__": for i in range(5): t = threading.Thread(target=finish_working, name=str(i)) t.start() #启动线程,即让线程开始执行
经过使用threading模块能完成多任务的程序开发,为了让每一个线程的封装性更完美,因此使用threading模块时,每每会定义一个新的子类class,只要继承threading.Thread就能够了,而后重写run方法。
import threading import time class MyThread(threading.Thread): def run(self): print('run') super().run() def start(self): print('start') super().start() def worker1(): for _ in range(5): time.sleep(0.5) print('线程:{}-woring'.format(threading.currentThread())) print('Thread over') t = MyThread(target=worker1,name='w') t.start()
import time count = 100 def work1(): global count for i in range(3): count += 1 print("----in work1, g_num is {}---".format(count)) def work2(): global count print("----in work2, g_num is {}---".format(count)) print("---线程建立以前g_num is {}---".format(count)) t1 = Thread(target=work1) t1.start() #延时一会,保证t1线程中的事情作完 time.sleep(1) t2 = Thread(target=work2) t2.start()
输出
---线程建立以前g_num is 100--- ----in work1, g_num is 103--- ----in work2, g_num is 103---
线程同步,线程间协同,经过某种技术,让一个线程访问某些数据时,其余线程不能访问这些数据,直到该线程完成对数据的操做。
Event事件,是线程间通讯机制中最简单的实现,使用一个内部的标记flag,经过flag的True或False的变化来进行操做。
名称 | 含义 |
---|---|
set() | 标记设置为True |
clear() | 标记设置为False |
is_set() | 标记是否设置为True |
wait(timeout=None) | 设置等待标记为True的时长,None为无限等待。获得返回True, 未等到超时了返回False。 |
from threading import Event, Thread import time def boss(event:Event): """ 等待员工全部任务完成,点评 """ print("I'm boss, waiting for u.") event.wait() print('good job') def worker(event:Event, count=10): print("I am working for u") cups = [] while True: print('make 1') time.sleep(0.5) cups.append(1) if len(cups) >= count: event.set() break print('I finished my job. cups={}'.format(cups)) event = Event() w = Thread(target=worker, args=(event, )) b = Thread(target=boss, args=(event, )) w.start() time.sleep(1) b.start(
总结:
使用同一个event对象的标记flag
谁wait就是等到flag变为True,或者等到超时返回False,不限制等待的个数。
锁,凡是存在共享资源争抢的地方均可以使用锁,从而保证只有一个使用者均可以彻底使用这个资源。
示例 不加锁:
import threading cups = [] def worker(task=100): while True: count = len(cups) print(count) if count >= task: break cups.append(1) print('{}'.format(threading.current_thread())) print('I finished {} cups'.format(count)) for x in range(10): threading.Thread(target=worker, args=(100, )).start()
以上任务完成的数量会大于100,使用锁能够解决
示例
import logging import threading logging.basicConfig(level=logging.INFO) cups = [] # 实例一把锁 lock = threading.Lock() def worker(lock:threading.Lock,task=100): while True: lock.acquire() # 加锁 count = len(cups) logging.info(count) if count >= task: lock.release() #记得退出循环时释放锁 break cups.append(1) lock.release() # 释放锁 logging.info('{}'.format(threading.current_thread())) logging.info('I finished {} cups'.format(count)) for x in range(10): threading.Thread(target=worker, args=(lock, 100)).start()
通常来讲加锁后还有一些代码实现,在释放锁以前还有可能抛异常,一旦出现异常,锁是没法释放,可是当前线程可能由于这个异常被终止了,这就产生了死锁。
加锁、解锁经常使用语句:
1)使用try...finally语句保证锁的释放
2)with上下文管理,锁对象支持上下文管理
示例:
from threading import Thread, Lock import time, logging logging.basicConfig(level=logging.INFO) class Counter: def __init__(self): self.c = 0 self.lock = Lock() def inc(self): try: self.lock.acquire() self.c += 1 logging.info('add {}'.format(self.c)) finally: self.lock.release() def dec(self): try: self.lock.acquire() self.c -= 1 logging.info('sub {}'.format(self.c)) finally: self.lock.release() @property def value(self): with self.lock: return self.c def do(c:Counter, count=100): for _ in range(count): for i in range(-50, 50): if i < 0: c.dec() else: c.inc() c = Counter() c1 = 10 c2 = 10 for i in range(c1): Thread(target=do, args=(c,c2)).start() time.sleep(5) logging.info(c.value)
Condition 用于生产者、消费模型,为了解决生产者消费速度匹配问题。
构造方法Condition(lock=None), 能够传入一个lock或者RLock对象,默认是RLock。
名称 | 含义 |
---|---|
acquire(*args) | 获取锁 |
wait(self, timoout=None) | 等待或超时间 |
notify(n=1) | 唤醒至多指定个数的等待的线程,没有等待的线程没有操做 |
notiy_all() | 唤醒全部等待的线程 |
示例1 不使用Condition
import threading import logging import random logging.basicConfig(level=logging.INFO) class Dispatcher: def __init__(self, data=0): self.data = data self.event = threading.Event() def produce(self): for i in range(100): data = random.randint(1,100) self.data = data logging.info("produce--{}".format(self.data)) self.event.wait(1) def custom(self): while True: logging.info("curstom---{}".format(self.data)) self.event.wait(1) d = Dispatcher() p = threading.Thread(target=d.produce) c = threading.Thread(target=d.custom) c.start() p.start()
示例2 使用Condition
import threading import logging import random logging.basicConfig(level=logging.INFO) class Dispatcher: def __init__(self, data=0): self.data = data self.event = threading.Event() self.cond = threading.Condition() def produce(self): for i in range(100): data = random.randint(1,100) with self.cond: self.data = data self.cond.notify_all() logging.info('produce {}'.format(self.data)) self.event.wait(1) def custom(self): while True: with self.cond: self.cond.wait() logging.info('custom {}'.format(self.data)) self.event.wait(0.5) d = Dispatcher() p = threading.Thread(target=d.produce) c = threading.Thread(target=d.custom) c.start() p.start()
示例3 多个消费者
import threading import logging import random logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s') class Dispatcher: def __init__(self, data=0): self.data = data self.event = threading.Event() self.cond = threading.Condition() def produce(self): for i in range(100): data = random.randint(1,100) with self.cond: self.data = data self.cond.notify(1) logging.info('pru {}'.format(self.data)) self.event.wait(1) def custom(self): while True: with self.cond: self.cond.wait() logging.info("线程{}--消费{}".format(threading.get_ident(), self.data)) self.event.wait(0.5) d = Dispatcher() p = threading.Thread(target=d.produce) c = threading.Thread(target=d.custom) c1 = threading.Thread(target=d.custom) c.start() c1.start() p.start()
总结:
Condition是用于生产者消费者模型中,解决生产者消费者速度匹配的问题。
采用通知机制,很是有效率。
使用方式:
使用Condition必须先acquire,用玩release,由于内部使用锁,默认使用RLock,最好的方式是使用with上下文。
消费者wait,等待通知。
生产者生产好消息,对消费者发通知,可使用notify或者notify_all方法。
名称 | 含义 |
---|---|
Barrier(parties, action=None, timeout=None) | 构建Barrier对象,指定参与方数目。timeout是wait方法未指定超时的默认值。 |
n_waiting | 当前在屏障中等待的线程数 |
parties | 各方数,就是须要多少个等待 |
wait(timeout=None) | 等待经过屏障,返回0到[线程数-1]的整数,每一个线程返回不一样。若是wait方法设置了超时,并超时发送,屏障将处于broken状态。 |
方法:
名称 | 含义 |
---|---|
broken | 若是屏障处于打破状态,返回True |
abort() | 将屏障至于broken状态,等待中的线程或者调用等待方法的线程都会抛出BrokenBarrierError异常, 直到reset方法来恢复屏障 |
reset() | 恢复屏障,从新开始拦截 |
示例
import threading import logging logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s') def worker(barrier:threading.Barrier): logging.info('n_waiting={}'.format(barrier.n_waiting)) try: bid = barrier.wait() logging.info("after barrier {}".format(bid)) except threading.BrokenBarrierError: logging.info('Broken Barrier in {}'.format(threading.current_thread().name)) barrier = threading.Barrier(3) for i in range(5): #调整数字看结果 threading.Thread(target=worker, args=(barrier, )).start()
全部线程冲到了Barrier前等待,直到到达parties的数目,屏障打开,全部线程中止等待,继续执行。
再有线程wait,屏障就绪等到到达参数方数目。
Barrier应用场景:并发初始化全部线程都必须初始化完成后,才能继续工做,例如运行前加载数据、检查,若是这些工做没完成,就开始运行,将不能正常工做。10个线程作10种准备工做,每一个线程负责一种工做,只有这10个线程都完成后,才能继续工做,先完成的要等待后完成的线程。例如,启动一个程序,须要先加载磁盘文件、缓存预热、初始化链接池等工做。这些工做能够齐头并进,不过只有都知足了,程序才能继续向后执行。假设数据库链接失败,则初始化工做失败,就要about,屏障broken,全部线程收到异常退出。