##########################线程#################################
进程是自愿分配的最小单位,线程是CPU调度的最小单位,每个进程中至少有一个线程
线程属于进程,进程负责获取操做系统分配给个人资源,线程负责执行代码
进程的弊端:
开启和关闭 以及 切换 都会带来很大的时间开销
过多的进程还会形成操做系统调度的压力
进程和线程有什么关系?
1)地址空间和其余自愿(如打开文件):进程间相互独立,同一进程的各线程间共享.某进程内的线程在其余进程不可见
2)通讯: 进程间通讯IPC,线程间能够直接读写进程数据段(如全局变量)来进行通讯---须要进程同步和互斥手段的辅助
以保证数据的一致性
3)调度和切换:线程上下文切换比进程上下文切换要快得多
4)在多线程操做系统中,进程不是一个可执行的实体
线程的概念
进程是操做系统中最小的资源分配单位
线程是CPU调度的最小单位
线程进程之间的对比
线程不能独立存在,必须在一个进程里
线程的开启 关闭以及切换的开销要远远小于进程
同一个进程之间的多个线程之间数据共享
全局解释器锁GIL
使得一个进程中的多个线程不能充分的利用多核
从代码的角度上来看
多进程
开启和结束 时间开销大
切换的效率低
内存隔离
多线程
开启和结束 时间开销很是小
切换效率高
内存不隔离
线程的特色:
1)轻型实体 2) 独立调度和分派的基本单位 3)共享进程资源 4)可并发执行
Cputhon解释器下的全局解释器锁
在同一进程中的多个线程在同一时刻只能有一个线程访问CPU
多线程没法造成并行
锁的线程
Jpython 解释器就没有全局解释器锁
何时才会有到CPU
程序计算的时候
IO阻塞
是不会用到CPU的
在多个进程\线程同时访问一个数据的时候就会产生数据不安全的现象
多进程 访问文件
多线程
同时去访问一个数据
GIL 全局解释器锁
在同一个进程里的每个线程同一时间只能有一个线程访问cpu
尽可能不要设置全局变量
只要在多线程/进程之间用到全局变量 就加上锁
from threading import Lock,Thread
死锁
lock = Lock()
lock.acquire()
lock.acquire()
noodle = 100
def func(name,lock):
global noodle
lock.acquire()
noodle -= 1
lock.release()
print('%s吃到面了'%name)
if __name__ == '__main__':
lock = Lock() # 线程锁 互斥锁
t_lst = []
for i in range(10):
t = Thread(target=func,args=(i,lock))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
print(noodle)
科学家吃面问题
import time
from threading import Thread,Lock
lock = Lock()
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s拿到了面' % name)
fork_lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃面'%name)
time.sleep(0.5)
fork_lock.release() # 0.01
noodle_lock.release() # 0.01
def eat2(name):
fork_lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
noodle_lock.acquire()
print('%s拿到了面' % name)
print('%s在吃面'%name)
time.sleep(0.5)
noodle_lock.release()
fork_lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
递归锁
from threading import RLock
rlock = RLock()
rlock.acquire()
print(1)
rlock.acquire()
print(2)
rlock.acquire()
print(3)
递归锁解决死锁问题
import time
from threading import Thread,RLock
lock = RLock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
lock.acquire()
print('%s拿到了叉子' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release() # 0.01
lock.release() # 0.01
def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
lock.acquire()
print('%s拿到了面' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release()
lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
互斥锁解决死锁问题
import time
from threading import Thread,Lock
lock = Lock()
def eat1(name):
lock.acquire()
print('%s拿到了面' % name)
print('%s拿到了叉子' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release() # 0.01
def eat2(name):
lock.acquire() # 0.01
print('%s拿到了叉子' % name) # 0.01
print('%s拿到了面' % name)
print('%s在吃面'%name)
time.sleep(0.5)
lock.release()
eat_lst = ['alex','wusir','太白','yuan']
for name in eat_lst: # 8个子线程 7个线程 3个线程eat1,4个线程eat2
Thread(target=eat1,args=(name,)).start()
Thread(target=eat2,args=(name,)).start()
死锁
多把锁同时应用在多个线程中
互斥锁和递归锁哪一个好
递归锁 快速恢复服务
死锁问题的出现 是程序的设计或者逻辑的问题
还应该进一步的排除和重构逻辑来保证使用互斥锁也不会发生死锁
互斥锁和递归锁的区别
互斥锁 就是在一个线程中不能连续屡次ACQUIRE
递归锁 能够在同一个线程中acquire任意次,注意acquire多少次就须要release多少次
#####################threading模块 thread类###############
Thread类的其余方法
Thread实例对象的方法 # isAlive(): 返回线程是否活动的 # getName(): 返回线程名 #setName() : 设置线程名 threading 模块提供的一些方法: #threading.currentThread():返回当前的线程变量 #threading.enumerate(): 返回一个包含正在运行的线程的list,正在运行指线程启动后,结束前,不包括启动前和终止后的线程. #threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
##信号量###
锁+计时器
和进程的同样, Semaphore 管理一个内置的计数器,
每当调用acquire()时 内置计数器 -1;
调用release()时 内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()
池和信号量
import time
from multiprocessing import Semaphore,Process,Pool
def ktv1(sem,i):
sem.acquire()
i +=1
sem.release()
def ktv2(i):
i +=1
if __name__=='__main__':
sem = Semaphore(5)
start = time.time()
p_1 =[]
for i in range(100):
p = Process(target=ktv1,args=(sem,i))
p.start()
p_1.append(p)
for p in p_1:p.join()
print('###信号量',time.time() - start)
start = time.time()
p = Pool(5)
p_1 = []
for i in range(100):
ret = p.apply_async(func = ktv2,args=(sem,i))
p_1.append(ret)
p.close()
p.join()
print('***池',time.time() - start)
池 效率高
池子里面有几个一共就起几个
无论多少任务,池子的个数是固定的
开启进程和关闭进程这些事都是须要固定的开销
就不产生额外的时间开销
且进程池中的进程数控制的好,name操做系统的压力也小
信号量
有多少个任务就起多少进程/线程
能够帮助你减小操做系统切换的负担
可是并不能帮助你减小进/线程开启和关闭的时间
#####事件
和进程同样
wait
等 到时间内部的信号变成True就不阻塞了
set
设置信号变成True
clear
设置信号变成False
is_get
查看信号是否为True
#数据库链接 import time import random from threading import Event,Thread def check(e): """检测一下数据库的网络和个人网络是否通""" print('正在检测两台机器之间的网络状况....') time.sleep(random.randint(1,3)) e.set() def connet_db(e): e.wait() print('链接数据库...') print('链接数据库成功~~~') e = Event() Thread(target=connet_db,args=(e,)).start() Thread(target=check,args=(e,)).start()
######条件
使得线程等待,只有知足某条件时,才释放n个线程
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,
除了提供与Lock相似的acquire和
release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,而后判断一些条件。
若是条件不知足则wait;若是
条件知足,进行一些处理改变条件后,经过notify方法通知其余线程,其余处于wait状态的线程接到通知后
会从新判断条件。不断的重
复这一过程,从而解决复杂的同步问题。
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" % n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release() print('****')
##定时器
定时器,指定n秒后执行某个操做
from threading import Timer def func(): print('执行我啦') interval 时间间隔 Timer(0.2,func).start() # 定时器 建立线程的时候,就规定它多久以后去执行
####线程队列python
from multiprocessing import Queue,JoinableQueue #进程IPC队列
from queue import Queue #线程队列,先进先出的
from queue import LifoQueue #后进先出的
队列Queue
先进先出
自带锁, 数据安全
栈 LifoQueue
后进先出
自带锁,数据安全
lq = LifoQueue() ##里面若是带有数字, lq里面的值的数量不能超过这个数字,不然它会一直等着
lq.put(123)
lq.put(456)
lq.put('abc')
lq.put('cba')
lq.put('ccc')
lq.put('aaa')
print(lq)
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())
print(lq.get())
from queue import PriorityQueue # 优先级队列
pq = PriorityQueue()
pq.put((10,'aaa')) ##里面是元组(不是必须),判断优先级,若是是字母根据ACCII码来判断大小
pq.put((5,'zzz'))
print(pq.get())
print(pq.get())
# #python标准模块--concurrent.futures
#1 介绍 concurrent.futures 模块提供了高度封装的异步调用接口(高度封装:省事) #进程池/线程池的同一使用方式 ThreadPoolExecutor: 线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 #2,基本方法 #submit(fn,*args,**kwargs) 异步提交任务 #map(func,*iterables,timeout = None ,chunksize = 1) 取代for 循环 submit的操做 #shutdown(wait = True) 至关于进程池的 pool.close()+pool.join()操做 wait = True , 等待池内全部任务执行完毕回收完资源后才继续 wait = False, 当即返回,并不会等待池内的任务执行完毕 但无论wait参数为什么值,整个程序都会等到全部任务执行完毕 submit和map必须在shutdown以前 #result(timeout = None) 取得结果 #add_done_callback(fn) 回调函数
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #帮助你启动线程池的类 from concurrent.futures import ProcessPoolExecutor #帮助你启动线程池的类 def func(i): time.sleep(1) print('in %s %s'%(i,currentThread())) return i**2 t = ThreadPoolExecutor(5) t.map(func,range(20)) for i in range(20): t.submit((func,i))
import time from threading import currentThread,get_ident from concurrent.futures import ThreadPoolExecutor #帮助你启动线程池的类 from concurrent.futures import ProcessPoolExecutor #帮助你启动线程池的类 def func(i): time.sleep(1) print('in %s %s'%(i,currentThread())) return i**2 def back(fn): print(fn.result(),currentThread()) 获取任务结果 t = ThreadPoolExecutor(20) ret_l = [] for i in range(20): ret = t.submit(func,i) ret_l.append(ret) t.shutdown() for ret in ret_l: print(ret.result()) print('main : ',currentThread())
import os import time from concurrent.futures import ProcessPoolExecutor # 帮助你启动线程池的类 def func(i): time.sleep(1) print('in %s %s'%(i,os.getpid())) return i**2 def back(fn): print(fn.result(),os.getpid()) if __name__ == '__main__': print('main : ',os.getpid()) t = ProcessPoolExecutor(20) for i in range(100): t.submit(func,i).add_done_callback(back)