一、操做系统可以进行运算调度的最小单位,即程序执行的最小单位java
二、进程负责程序所必须的资源分配(文本区域、数据区域、堆栈区域),一个进程中也常常须要同时作多件事,即要同时运行多个‘子任务’,这些子任务即线程。python
线程是每个进程中的单一顺序控制流 ,其包含在进程中,是进程的实际运做单位(进程是线程的容器)linux
⼀个程序中⾄少要有⼀个进程,⼀个进程中⾄少要有⼀个线程编程
线程不可以独⽴执⾏,必须依存在进程中windows
三、线程基本不占用系统资源,其只拥有在运行过程当中必不可少的资源(如程序计数器、一组寄存器和栈)安全
四、同一个进程中的全部线程都共享此进程所拥有的所有资源,多线程
一个进程中的线程共享相同的内存单元/内存地址空间——>能够访问相同的变量和对象,并且它们从同一堆中分配对象——>通讯、数据交换、同步操做并发
五、线程之间的通讯主要经过共享所属进程的资源app
线程间的通讯是在同一地址空间上进行的,因此不须要额外的通讯机制,这就使得通讯更简便并且信息传递的速度也更快dom
六、线程的上下文切换很快,资源开销较少,可是相对于进程而言,不够安全,在多个线程共同操做进程的某一资源时,可能会丢失数据
七、线程和进程之间的区别
- 新状态:线程对象已经建立,还未调用
start()
方法。
- 可运行状态:当线程有资格运行,但调度程序尚未把它选定为运行线程时线程所处的状态。当
start()
方法调用时,线程首先进入可运行状态。在线程运行以后或者从阻塞、等待或睡眠状态回来后,也返回到可运行状态。
- 运行状态:线程调度程序从可运行池中选择一个线程做为当前线程时线程所处的状态。这也是线程进入运行状态的惟一方式。
- 等待/阻塞/睡眠状态:这是线程有资格运行时它所处的状态。实际上这个三状态组合为一种,其共同点是:线程仍旧是活的(可运行的),可是当前没有条件运行。可是若是某件事件出现,他可能返回到可运行状态。
- 死亡态:当线程的
run()
方法完成时就认为它死去。这个线程对象也许是活的,可是,它已经不是一个单独执行的线程。线程一旦死亡,就不能复生。若是在一个死去的线程上调用start()
方法,会抛出java.lang.IllegalThreadStateException
异常。
Python中的多线程能够并发,但不能并行(同一个进程下的多个线程不能被多个cpu同时执行),原因就是GIL全局解释器锁,致使同一时间内只有一个线程在执行
python 文件的执行流程为:
- 操做系统先将python解释器和须要执行的py文件由硬盘加载到内存中,开辟一个进程空间
- 此进程即便得python解释器首先将py文件中的代码指令经过编译器编译成字节码
- 编译完成的c的字节码经过虚拟机转换为机器码由cpu执行
这个执行流程便是py文件执行进程中的主线程
若Python中的多线程并行,则每一个线程都要执行上述过程,从而同一时间须要多个CPU同时执行转换而来的机器码,极大限度的提升执行效率。但众所周知,Python是由荷兰人吉多·范罗苏姆 (Guido van Rossum)于1989年圣诞节期间开发的一个新的脚本解释程序,而双核cpu是2005年才被广泛应用的,即在当时的条件下,Cpython执行多线程时应用不了多核。故为了避免多个线程并发执行而形成数据的不完整以及线程的不安全,龟叔在python的解释器中加上了互斥锁——全局解释器锁(GIL锁),即便得Cpython在全部线程进入解释器以前加了一个全局解释器锁,当执行完当前py文件后才释放该锁,这便致使了python中同一时间内只有一个线程在执行
注:若想使得多线程并行,能够用多进程间接实现线程的并行,或者更换解释器为Pypy、Ppython。
from threading import Thread
from threading import Thread import os def func(num): print('当前线程{},所归属的进程id号{}'.format(os.getpid(), num)) for i in range(10): # 异步建立10个子线程 t = Thread(target=func, args=(i,)) t.start() # 主线程执行任务 print(os.getpid())
这种方法付只须要重写
threading.Thread
类的run
方法,而后调用start()
开启线程就能够了
from threading import Thread import time class MyThread(Thread): def __init__(self, name): # 手动调用父类的构造方法 super().__init__() self.name = name def run(self): time.sleep(1) print("当前线程正在执行runing ... ", self.name) if __name__ == "__main__": t = MyThread("机器今天会再次爆炸么?") t.start() print("主线程执行结束 ... ")
t.is_alive() 检测线程是否仍然存在
t.setName() 设置线程名字
t.getName() 获取线程名字
from threading import Thread import time def func(): time.sleep(1) if __name__ == "__main__": t = Thread(target=func) t.start() print(t , type(t)) print(t.is_alive()) # False print(t.getName()) t.setName("xboyww") print(t.getName())
- currentThread().ident 查看线程id号
- enumerate() 返回目前正在运行的线程列表
- activeCount() 返回目前正在运行的线程数量
from threading import Thread import time from threading import currentThread from threading import enumerate from threading import activeCount # 1.currentThread().ident 查看线程id号 def func(): print("子线程id", currentThread().ident, os.getpid()) if __name__ == "__main__": Thread(target=func).start() print("主线程id", currentThread().ident, os.getpid()) # 2.enumerate() 返回目前正在运行的线程列表 def func(): print("子线程id", currentThread().ident, os.getpid()) time.sleep(0.5) if __name__ == "__main__": for i in range(10): Thread(target=func).start() lst = enumerate() # 子线程10 + 主线程1个 = 11 print(lst ,len(lst)) # 3.activeCount() 返回目前正在运行的线程数量 print(activeCount())
默认若是一个线程短期内能够完成更多的任务,就不会建立额外的新的线程,以节省资源
from concurrent.futures import ThreadPoolExecutor from threading import current_thread as cthread def func(i): print("thread ... start", cthread().ident, i) time.sleep(3) print("thread ... end", i) return cthread().ident if __name__ == "__main__": lst = [] setvar = set() # (1) 建立线程池对象 """限制线程池最多建立os.cpu_count() * 5 = 线程数,全部任务全由这几个线程完成,不会额外建立线程""" tp = ThreadPoolExecutor() # 个人电脑40个线程并发 # (2) 异步提交任务 for i in range(100): res = tp.submit(func, i) lst.append(res) # (3) 获取返回值 for i in lst: setvar.add(i.result()) # (4) 等待全部子线程执行结束 tp.shutdown() print(len(setvar), setvar) print("主线程执行结束 ... ")
守护线程 : 等待全部线程所有执行完毕以后,再本身终止,守护的是全部线程
线程对象.setDaemon(True)
from threading import Thread import time def func1(): while True: time.sleep(0.5) print("我是func1") def func2(): print("我是func2 start ... ") time.sleep(3) print("我是func2 end ... ") t1 = Thread(target=func1) t2 = Thread(target=func2) # 在start调用以前,设置守护线程 t1.setDaemon(True) t1.start() t2.start() print("主线程执行结束 ... ")
同步意味着顺序、统一的时间轴
场景1:是指完成事务的逻辑,先执行第一个事务,若是阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,协同步调,按预约的前后次序进行运行
场景2:一个任务的完成须要依赖另一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列
异步则意味着乱序、效率优先的时间轴
处理调用这个事务以后,不会等待这个事务的处理结果,直接处理第二个事务去了,经过状态、回调来通知调用者处理结果
对于I/O相关的程序来讲,异步编程能够大幅度的提升系统的吞吐量,由于在某个I/O操做的读写过程当中,系统能够先去处理其它的操做(一般是其它的I/O操做)
不肯定执行顺序
程序中有了IO操做,就会发生阻塞,必需要输入/输出一个字符串,不然代码不往下执行
程序中没有任何耗时操做,无需等待正常往下执行
同步阻塞 :效率低,cpu利用不充分
异步阻塞 :好比socketserver,能够同时链接多个,可是彼此都有recv
同步非阻塞:没有相似input的代码,从上到下执行.默认的正常状况代码
异步非阻塞:效率是最高的,cpu过分充分,过分发热, 需液冷
假设有A、B两个任务,则串行、并行、并发的区别如图所示。
A和B两个任务运行在一个CPU线程上,在A任务执行完以前不能够执行B。即,在整个程序的运行过程当中,仅存在一个运行上下文,即一个调用栈一个堆。程序会按顺序执行每一个指令。
并行指两个或两个以上任务同一时刻被不一样的cpu执行。在多道程序环境下,并行性使多个程序同一时刻可在不一样CPU上同时执行。好比,A和B两个任务能够同时运行在不一样的CPU线程上,效率较高,但受限于CPU线程数,若是任务数量超过了CPU线程数,那么每一个线程上的任务仍然是顺序执行的。
并发指多个线程在宏观(相对于较长的时间区间而言)上表现为同时执行,而其实是轮流穿插着执行,并发的实质是一个物理CPU在若干道程序之间多路复用,其目的是提升有限物理资源的运行效率。 并发与并行串行并非互斥的概念,若是是在一个CPU线程上启用并发,那么天然就仍是串行的,而若是在多个线程上启用并发,那么程序的执行就能够是既并发
因为一个进程中的多个线程享进程中的资源,因此可能形成多个线程同时修改一个变量的状况(即线程⾮安全),可能形成数据混乱,故须要进⾏同步控制,即线程同步。
能够经过延时肯定多线程的执行顺序,但不推荐。
import threading import time def work1(nums): nums.append(44) print('-----in work1-----', nums) def work2(nums): time.sleep(1) # 延时一会保证另外一线程执行 print('-----in work2-----', nums) g_nums = [11, 22, 33] t1 = threading.Thread(target=work1, args=(g_nums,)) t1.start() t2 = threading.Thread(target=work2, args=(g_nums,)) t2.start()
互斥锁保证了每次只有⼀个线程操做共享数据,从⽽保证了多线程状况下数据的安全性(原子性),能够实现线程同步。
互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”状态,其余线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其余的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程操做共享数据,从而保证了多线程状况下数据的安全性。
尽可能使用一把锁解决问题,不要互相嵌套,不然容易死锁
import threading num = 0 def test1(): global num # 调用Lock对象的acquire()方法得到锁时,这把锁进入“locked”状态 # 若是此时另外一个线程2试图得到这个锁,该线程2就会变为同步阻塞状态 if mutex.acquire(): for i in range(1000): num += 1 # 调用Lock对象的release()方法释放锁以后,该锁进入“unlocked”状态。 mutex.release() def test2(): global num # 线程调度程序继续从处于同步阻塞状态的线程中选择一个来得到锁,并使得该线程进入运行(running)状态 if mutex.acquire(): for i in range(1000): num += 1 mutex.release() mutex = threading.Lock() p1 = threading.Thread(target=test1) p1.start() p2 = threading.Thread(target=test2) p2.start() print(num)
在多个线程间共享多个资源的时候, 若是两个线程分别占有⼀部分资源而且同时等待对⽅的资源, 就会形成死锁
在多线程程序中,死锁问题很大一部分是因为线程同时获取多个锁形成的。如一个线程获取了第一个锁,而后在获取第二个锁的时候发生阻塞,那么这个线程就可能阻塞其余线程的执行,从而致使整个程序假死
import threading import time class MyThread1(threading.Thread): def run(self): # 线程1被 A 锁——>锁定 if mutexA.acquire(): print(self.name + '---do1---up---') time.sleep(1) if mutexB.acquire(): print(self.name + '---do1---down---') mutexB.release() # 线程1被 A 锁释放的前提是:线程1 抢到 B 锁 mutexA.release() class MyThread2(threading.Thread): def run(self): time.sleep(1) # 线程2被 B 锁——>锁定 if mutexB.acquire(): print(self.name + '---do2---up---') if mutexA.acquire(): print(self.name + '---do2---down---') mutexA.release() # 线程2被 B 锁释放的前提是:线程2 抢到 A 锁 mutexB.release() if __name__ == '__main__': mutexA = threading.Lock() mutexB = threading.Lock() t1 = MyThread1() t2 = MyThread2() t1.start() t2.start() # Thread-1---do1---up--- # Thread-2---do2---up--- # 程序卡死 # 线程1不释放A锁 # 线程2不释放B锁
用于快速解决项目因死锁问题不能正常运行的场景,用来处理异常死锁的
import threading import time class MyThread1(threading.Thread): def run(self): if mutexA.acquire(): print(self.name + '---do1---up---') time.sleep(1) if mutexB.acquire(): print(self.name + '---do1---down---') mutexB.release() mutexA.release() class MyThread2(threading.Thread): def run(self): time.sleep(1) if mutexB.acquire(): print(self.name + '---do2---up---') if mutexA.acquire(): print(self.name + '---do2---down---') mutexA.release() mutexB.release() if __name__ == '__main__': mutexA = threading.RLock() mutexB = threading.RLock() t1 = MyThread1() t2 = MyThread2() t1.start() t2.start() # Thread-1---do1---up--- # Thread-1---do1---down--- # Thread-2---do2---up--- # Thread-2---do2---down---
信号量
semaphore
:用于控制同一时间内能够操做进程资源的线程数量的一把锁,简言之信号量是用来控制线程并发数的一把锁,也能够实现线程同步。使用场景:在读写文件的时候,通常只有一个线程在写,而读能够有多个线程同时进行,若是须要限制同时读文件的线程个数,这时候就能够用到信号量了(若是用互斥锁,就是限制同一时刻只能有一个线程读取文件)
import time import threading def foo(se): se.acquire() time.sleep(2) print("ok") se.release() if __name__ == "__main__": # 设置同一时间内能够有5个线程并发 se = threading.Semaphore(5) for i in range(20): t1 = threading.Thread(target=foo, args=(se,)) t1.start() # 此时能够控制同时进入的线程数
经过3种类型的队列来实现线程同步,都实现了锁原语(能够理解为原⼦操做, 即要么不作, 要么就作完) , 可以在多线程中直接使⽤
# 基本使用 from queue import Queue # put 存 # get 取 # put_nowait 存,超出了队列长度,报错 # get_nowait 取,没数据取不出来,报错 # linux windows 线程中put_nowait,get_nowait都支持 """先进先出,后进后出""" # maxsize为一个整数,表示队列的最大条目数,可用来限制内存的使用。 # 一旦队列满,插入将被阻塞直到队列中存在空闲空间。若是maxsize小于等于0,队列大小为无限。maxsize默认为0 q = Queue(maxsize=0) q.put(1) q.put(2) print(q.get()) print(q.get()) # 取不出来,阻塞 # print(q.get()) print(q.get_nowait()) q2 = Queue(3) q2.put(11) q2.put(22) q2.put(33) # 放不进去了,阻塞 # q2.put(44) q2.put_nowait(44)
import threading import time from queue import Queue class Pro(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count + 1 msg = '生成产品' + str(count) queue.put(msg) # 队列中添加新产品 print(msg) time.sleep(1) class Con(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消费了' + queue.get() print(msg) time.sleep(1) if __name__ == "__main__": queue = Queue() # 建立一个队列。线程中能用,进程中不能使用 for i in range(500): # 建立500个产品放到队列里 queue.put('初始产品' + str(i)) # 字符串放进队列 for i in range(2): # 建立了两个线程 p = Pro() p.start() for i in range(5): # 5个线程 c = Con() c.start()
# LifoQueue 先进后出,后进先出(按照栈的特色设计) from queue import LifoQueue lq = LifoQueue(3) lq.put(11) lq.put(22) lq.put(33) # print(lq.put_nowait(444)) print(lq.get()) print(lq.get()) print(lq.get())
# PriorityQueue 按照优先级顺序排序 (默认从小到大排序) from queue import PriorityQueue # 若是都是数字,默认从小到大排序 pq = PriorityQueue() pq.put(13) pq.put(3) pq.put(20) print(pq.get()) print(pq.get()) print(pq.get()) # 若是都是字符串 """若是是字符串,按照ascii编码排序""" pq1 = PriorityQueue() pq1.put("chinese") pq1.put("america") pq1.put("latinos") pq1.put("blackman") print(pq1.get()) print(pq1.get()) print(pq1.get()) print(pq1.get()) # 要么全是数字,要么全是字符串,不能混合 error """ pq2 = PriorityQueue() pq2.put(13) pq2.put("aaa") pq2.put("拟稿") """ pq3 = PriorityQueue() # 默认按照元组中的第一个元素排序 pq3.put( (20,"wangwen") ) pq3.put( (18,"wangzhen") ) pq3.put( (30,"weiyilin") ) pq3.put( (40,"xiechen") ) print(pq3.get()) print(pq3.get()) print(pq3.get()) print(pq3.get())
进程(线程)之间若是直接通讯,可能会出现两个问题
解决方式,找一个缓冲区来中转数据即生产者——消费者模式
回调函数:
把函数当成参数传递给另一个函数
在当前函数执行完毕以后,最后调用一下该参数(函数),这个函数就是回调函数功能:
打印状态: a属性
支付状态: b属性
退款状态: c属性
转帐的状态: d属性
把想要的相关成员或者相关逻辑写在自定义的函数中
支付宝接口在正常执行以后,会调用自定义的函数,来执行相应的逻辑
那么这个函数就是回调函数
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import current_thread as cthread import os, time def func1(i): print("Process start ... ", os.getpid()) time.sleep(0.5) print("Process end ... ", i) return "*" * i def func2(i): print("thread start ... ", cthread().ident) time.sleep(0.5) print("thread end ... ", i) return "*" * i def call_back1(obj): print("<==回调函数callback进程号:===>", os.getpid()) print(obj.result()) def call_back2(obj): print("<==回调函数callback线程号:===>", cthread().ident) print(obj.result()) # (1) 进程池的回调函数: 由主进程执行调用完成 if __name__ == "__main__": p = ProcessPoolExecutor(5) for i in range(1, 11): res = p.submit(func1, i) # 进程对象.add_done_callback(回调函数) ''' add_done_callback 能够把res本对象和回调函数自动传递到函数里来 ''' res.add_done_callback(call_back1) p.shutdown() print("主进程执行结束 ... ", os.getpid()) # (2) 线程池的回调函数: 由当前子线程执行调用完成 if __name__ == "__main__": tp = ThreadPoolExecutor(5) for i in range(1, 11): res = tp.submit(func2, i) # 线程对象.add_done_callback(回调函数) ''' add_done_callback 能够把res本对象和回调函数自动传递到函数里来 ''' res.add_done_callback(call_back2) tp.shutdown() print("主线程执行结束 ... ", cthread().ident)
from multiprocessing import Pool import random import time def download(f): for i in range(1, 4): print(f"{f}下载文件{i}") time.sleep(random.randint(1, 3)) return "下载完成" def alterUser(msg): print(msg) if __name__ == "__main__": p = Pool(3) # 当func执行完毕后,return的东西会给到回调函数callback p.apply_async(func=download, args=("线程1",), callback=alterUser) p.apply_async(func=download, args=("线程2",), callback=alterUser) p.apply_async(func=download, args=("线程3",), callback=alterUser) p.close() p.join()