#1.进程和线程 队列: 一、进程之间的通讯: q = multiprocessing.Queue() 二、进程池之间的通讯: q = multiprocessing.Manager().Queue() 三、线程之间的通讯: q = queue.Queue() ##1.功能数据库
##2.定义的不一样安全
##3.区别bash
##4.优缺点 线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。 #2.同步的概念 ##1.多线程开发可能遇到的问题 假设两个线程t1和t2都要对num=0进行增1运算,t1和t2都各对num修改10次,num的最终的结果应该为20。 可是因为是多线程访问,有可能出现下面状况: 在num=0时,t1取得num=0。此时系统把t1调度为”sleeping”状态,把t2转换为”running”状态,t2也得到num=0。而后t2对获得的值进行加1并赋给num,使得num=1。而后系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它以前获得的0加1后赋值给num。这样,明明t1和t2都完成了1次加1工做,但结果仍然是num=1。多线程
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
# time.sleep(3) #取消屏蔽以后 再次运行程序,结果的不一样
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
复制代码
运行结果却不是2000000:并发
---g_num=129699---
---test2---g_num=1126024
---test1---g_num=1135562
复制代码
取消屏蔽以后,再次运行结果以下:函数
---test1---g_num=1000000
---g_num=1025553---
---test2---g_num=2000000
复制代码
问题产生的缘由就是没有控制多个线程对同一资源的访问,对数据形成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。 ##2.同步ui
##3.解决线程不安全的方法 能够经过线程同步来解决spa
#3.互斥锁线程
#建立锁
mutex = threading.Lock()
#锁定
mutex.acquire([blocking])
#释放
mutex.release()
复制代码
其中,锁定方法acquire能够有一个blocking参数。code
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
#True表示堵塞 即若是这个锁在上锁以前已经被上锁了,那么这个线程会在这里一直等待到解锁为止
#False表示非堵塞,即无论本次调用可以成功上锁,都不会卡在这,而是继续执行下面的代码
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num += 1
mutex.release()
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) #True表示堵塞
if mutexFlag:
g_num += 1
mutex.release()
print("---test2---g_num=%d"%g_num)
#建立一个互斥锁
#这个锁默认是未上锁的状态
mutex = Lock()
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
复制代码
运行结果:
---g_num=19446---
---test1---g_num=1699950
---test2---g_num=2000000
复制代码
加入互斥锁后,运行结果与预期相符。 咱们能够模拟一下卖票的程序:
# Python主要经过标准库中的threading包来实现多线程
import threading
import time
import os
def doChore(): # 做为间隔 每次调用间隔0.5s
time.sleep(0.5)
def booth(tid):
global i
global lock
while True:
lock.acquire() # 获得一个锁,锁定
if i != 0:
i = i - 1 # 售票 售出一张减小一张
print(tid, ':now left:', i) # 剩下的票数
doChore()
else:
print("Thread_id", tid, " No more tickets")
os._exit(0) # 票售完 退出程序
lock.release() # 释放锁
doChore()
#全局变量
i = 15 # 初始化票数
lock = threading.Lock() # 建立锁
def main():
# 总共设置了3个线程
for k in range(3):
# 建立线程; Python使用threading.Thread对象来表明线程
new_thread = threading.Thread(target=booth, args=(k,))
# 调用start()方法启动线程
new_thread.start()
if __name__ == '__main__':
main()
复制代码
运行结果:
0 :now left: 14
1 :now left: 13
0 :now left: 12
2 :now left: 11
1 :now left: 10
0 :now left: 9
1 :now left: 8
2 :now left: 7
0 :now left: 6
1 :now left: 5
2 :now left: 4
0 :now left: 3
2 :now left: 2
0 :now left: 1
1 :now left: 0
Thread_id 2 No more tickets
复制代码
#4.多线程-非共享数据 对于多线程中全局变量和局部变量是否共享
#coding=utf-8
import threading
import time
class MyThread(threading.Thread):
# 重写 构造方法
def __init__(self,num,sleepTime):
threading.Thread.__init__(self)
self.num = num
self.sleepTime = sleepTime
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print('线程(%s),num=%d'%(self.name, self.num))
if __name__ == '__main__':
mutex = threading.Lock()
t1 = MyThread(100,5)
t1.start()
t2 = MyThread(200,1)
t2.start()
复制代码
运行结果:
线程(Thread-2),num=201
线程(Thread-1),num=101
复制代码
import threading
from time import sleep
def test(sleepTime):
num = 1
sleep(sleepTime)
num+=1
print('---(%s)--num=%d'%(threading.current_thread(), num))
if __name__ == '__main__':
t1 = threading.Thread(target = test,args=(5,))
t2 = threading.Thread(target = test,args=(1,))
t1.start()
t2.start()
复制代码
运行结果:
---(<Thread(Thread-2, started 10876)>)--num=2
---(<Thread(Thread-1, started 7484)>)--num=2
复制代码
#5.同步应用
from threading import Thread,Lock
from time import sleep
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print("------Task 1 -----")
sleep(0.5)
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("------Task 2 -----")
sleep(0.5)
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("------Task 3 -----")
sleep(0.5)
lock1.release()
#使用Lock建立出的锁默认没有“锁上”
lock1 = Lock()
#建立另一把锁,而且“锁上”
lock2 = Lock()
lock2.acquire()
#建立另一把锁,而且“锁上”
lock3 = Lock()
lock3.acquire()
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
复制代码
运行结果:
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
...........`
复制代码
#6.生产者与消费者模式
import threading,time
from queue import Queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count +1
msg = '生成产品'+str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了 '+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
for i in range(500):
queue.put('初始产品'+str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()
复制代码
运行结果:
生成产品1
生成产品2
生成产品1
生成产品3
生成产品2
生成产品4
Thread-3消费了 初始产品0
生成产品3
生成产品5
Thread-3消费了 初始产品1
生成产品4
生成产品6
Thread-4消费了 初始产品2
Thread-3消费了 初始产品3
生成产品5
生成产品7
Thread-4消费了 初始产品4
生成产品6
生成产品8
Thread-5消费了 初始产品5
Thread-4消费了 初始产品6
............
复制代码
此时就出现生产者与消费者的问题 ##1.Queue的说明 1.对于Queue,在多线程通讯之间扮演重要的角色 2.添加数据到队列中,使用put()方法 3.从队列中取数据,使用get()方法 4.判断队列中是否还有数据,使用qsize()方法 ##2.生产者消费者模式的说明
def process_student(name):
std = Student(name)
# std是局部变量,可是每一个函数都要用它,所以必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
复制代码
说明:用局部变量也有问题,由于每一个线程处理不一样的Student对象,不能共享。 ###2.使用全局字典的方法
import threading
# 建立字典对象:
myDict={}
def process_student():
# 获取当前线程关联的student:
std = myDict[threading.current_thread()]
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
myDict[threading.current_thread()] = name
process_student()
t1 = threading.Thread(target=process_thread, args=('yongGe',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
复制代码
运行结果;
Hello, yongGe (in Thread-A)
Hello, 老王 (in Thread-B)
复制代码
这种方式理论上是可行的,它最大的优势是消除了std对象在每层函数中的传递问题,可是,每一个函数获取std的代码有点low。 ###3.使用ThreadLocal的方法
import threading
# 建立全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread, args=('erererbai',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
复制代码
运行结果:
Hello, erererbai (in Thread-A)
Hello, 老王 (in Thread-B)
复制代码
说明: 全局变量local_school就是一个ThreadLocal对象,每一个Thread对它均可以读写student属性,但互不影响。你能够把local_school当作全局变量,但每一个属性如local_school.student都是线程的局部变量,能够任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。 能够理解为全局变量local_school是一个dict,不但能够用local_school.student,还能够绑定其余变量,如local_school.teacher等等。 ThreadLocal最经常使用的地方就是为每一个线程绑定一个数据库链接,HTTP请求,用户身份信息等,这样一个线程的全部调用到的处理函数均可以很是方便地访问这些资源。