Ref: Python3 多线程html
Ref: Python3之多进程 # python中的多线程没法利用多核优点python
更多的提升效率的策略,请参见:[Pandas] 01 - A guy based on NumPygit
线程在执行过程当中与进程仍是有区别的。 1. 每一个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。 2. 可是线程不可以独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。 3. 每一个线程都有他本身的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。 4. 指令指针 和 堆栈指针寄存器 是线程上下文中两个最重要的寄存器,线程老是在进程获得上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。
Ref: https://github.com/giampaolo/psutilgithub
from multiprocessing import cpu_count
print(cpu_count())
Python3 经过两个标准库 _thread 和 threading 提供对线程的支持。安全
_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能仍是比较有限的。多线程
提供了低级别,原始的线程以及一个简单的锁。app
#!/usr/bin/python3 import _thread import time # 为线程定义一个函数 def print_time( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
----------------------------------------------------------------
# 建立两个线程,参数是:函数名 以及对应的参数 try: _thread.start_new_thread( print_time, ("Thread-1", 2, ) ) _thread.start_new_thread( print_time, ("Thread-2", 4, ) ) except: print ("Error: 没法启动线程")
# 让主线程不要提早结束 while 1: pass
采用了线程类的手法,该方法比较 engineering。dom
#!/usr/bin/python3 import threading import time exitFlag = 0
# 线程类 class myThread (threading.Thread):
def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter
def run(self): print ("开始线程:" + self.name) print_time(self.name, self.counter, 5) print ("退出线程:" + self.name)
----------------------------------------------------------------
def print_time(threadName, delay, counter): while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1
----------------------------------------------------------------
# (1) 建立新 线程'类‘ thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # (2) 启动新线程 thread1.start() thread2.start()
# (3) 等待全部线程结束 thread1.join() thread2.join() print ("退出主线程")
使用 Thread 对象的 Lock 和 Rlock 能够实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法;ide
对于那些须要每次只容许一个线程操做的数据,能够将其操做放到 acquire 和 release 方法之间。函数
#!/usr/bin/python3 import threading import time class myThread (threading.Thread):
def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter
def run(self): print ("开启线程: " + self.name)
------------------------------------------------------- threadLock.acquire() # <---- print_time(self.name, self.counter, 3)
threadLock.release() # <---- -------------------------------------------------------
# 做为线程共享资源 def print_time(threadName, delay, counter): while counter: time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1
threadLock= threading.Lock() threads = []
# (1) 建立新线程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # (2) 开启新线程 thread1.start() thread2.start() # (3) 等待线程 threads.append(thread1) threads.append(thread2) for t in threads: t.join() print ("退出主线程")
不添加setDaemon时,主线程和子线程分别在执行,约在主线程执行完5秒后子线程也执行完毕。
添加setDaemon的话,主进程执行完后不会等待 “做为守护线程” 的子进程,以下代码中,不会给child thread留有运行的机会。
import threading import time from datetime import datetime class MyThread(threading.Thread): def __init__(self, id): threading.Thread.__init__(self) self.id = id def run(self): time.sleep(5) print "子线程动做",threading.current_thread().name, datetime.now() if __name__ == "__main__": t1 = MyThread(999) t1.setDaemon(True) # 添加守护线程! t1.start() for i in range(5): print "主线程动做",threading.current_thread().name, datetime.now()
只是添加了join函数一行代码,咱们发现主线程和子线程执行的顺序就改变了。
主线程会等待子线程。
if __name__ == "__main__": t1 = MyThread(999) t1.start() t1.join() # 添加join函数! for i in range(5): print "主线程动做",threading.current_thread().name, datetime.now()
Output: 等待child执行完,再执行join()以后main thread的内容。
child thread Thread-4 2019-09-26 17:50:16.049128 main thread MainThread 2019-09-26 17:50:16.050622 main thread MainThread 2019-09-26 17:50:16.050930 main thread MainThread 2019-09-26 17:50:16.051079 main thread MainThread 2019-09-26 17:50:16.051915 main thread MainThread 2019-09-26 17:50:16.05206
主线程一直等待所有的子线程结束以后,主线程自身才结束,程序退出。(其实守护线程的设置就没用了)
if __name__ == "__main__": t1 = MyThread(999) t1.setDaemon(True) # 添加守护线程! t1.start() t1.join() # 添加join函数! for i in range(5): print "主线程动做",threading.current_thread().name, datetime.now()
GIL 的全名是 the Global Interpreter Lock (全局解释锁),是常规 python 解释器(固然,有些解释器没有)的核心部件。
GIL 是 Python 解释器正确运行的保证,Python 语言自己没有提供任何机制访问它。但在特定场合,咱们仍有办法下降它对效率的影响。
经过cpython启动多进程,能 "绕过" GIL。
from multiprocessing import Process def spawn_n_processes(n, target): threads = [] for _ in range(n): thread = Process(target=target) thread.start() threads.append(thread) for thread in threads: thread.join()
经过 cpython 执行以上程序。
def test(target, number=10, spawner=spawn_n_threads): """ 分别启动 1, 2, 3, 4 个控制流,重复 number 次,计算运行耗时 """ for n in (1, 2, 3, 4, ): start_time = time() for _ in range(number): spawner(n, target) end_time = time() print('Time elapsed with {} branch(es): {:.6f} sec(s)'.format(n, end_time - start_time)) test(fib, spawner=spawn_n_processes)
Python 的 “Queue 模块” 中提供了同步的、线程安全的队列类,包括
三种队列均提供以下方法:
这些队列都实现了锁原语,可以在多线程中直接使用,可使用队列来实现线程间的同步。
Queue 模块中的经常使用方法:
import Queue
From: python多线程-queue队列类型优先级队列,FIFO,LIFO
默认队列:Queue.Queue()
#coding=utf8 import Queue queuelist = Queue.Queue() for i in range(5): if not queuelist.full(): queuelist.put(i) print "put list : %s ,now queue size is %s "%(i,queuelist.qsize()) while not queuelist.empty(): print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())
Output:
put list : 0 ,now queue size is 1 put list : 1 ,now queue size is 2 put list : 2 ,now queue size is 3 put list : 3 ,now queue size is 4 put list : 4 ,now queue size is 5 get list : 0 , now queue size is 4 get list : 1 , now queue size is 3 get list : 2 , now queue size is 2 get list : 3 , now queue size is 1 get list : 4 , now queue size is 0
原本是个stack,非要叫成是LIFO队列,汗~
#coding=utf8 import Queue queuelist = Queue.LifoQueue() for i in range(5): if not queuelist.full(): queuelist.put(i) print "put list : %s ,now queue size is %s "%(i,queuelist.qsize()) while not queuelist.empty(): print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())
Output:
put list : 0 ,now queue size is 1 put list : 1 ,now queue size is 2 put list : 2 ,now queue size is 3 put list : 3 ,now queue size is 4 put list : 4 ,now queue size is 5 get list : 4 , now queue size is 4 get list : 3 , now queue size is 3 get list : 2 , now queue size is 2 get list : 1 , now queue size is 1 get list : 0 , now queue size is 0
put方法的参数是个元组 (<优先级> ,<value>)。
#coding=utf8 import queue as Queue import random queuelist = Queue.PriorityQueue() for i in range(5): if not queuelist.full(): x=random.randint(1,20) y=random.randint(1,20) print x queuelist.put((x,y)) while not queuelist.empty(): print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())
Output:
11 5 10 7 10 get list : (5, 10) , now queue size is 4 get list : (7, 10) , now queue size is 3 get list : (10, 10) , now queue size is 2 get list : (10, 10) , now queue size is 1 get list : (11, 10) , now queue size is 0
内容:一个队,三个检票口 (三个线程)
锁机制:不能同时“取”,因此取的过程须要加“锁”。
#coding=utf8 import Queue import threading import time exitsingle = 0 class myThread(threading.Thread):
def __init__(self, threadname, queuelist): threading.Thread.__init__(self) self.threadname = threadname self.queuelist = queuelist def run(self): print "Starting queue %s"%self.threadname queue_enter(self.threadname, self.queuelist) # 每个线程从管道中”取数据“ time.sleep(1) print "close " + self.threadname
def queue_enter(threadname, queuelist): while not exitsingle: queueLock.acquire() if not workQueue.empty(): data = queuelist.get() queueLock.release() # 取完就能够释放“锁”
print "%s check ticket %s" % (threadname, data) else: queueLock.release() time.sleep(1)
####################################################
# 初始化
#################################################### threadList = ["list-1", "list-2", "list-3"] queueLock = threading.Lock() workQueue = Queue.Queue() threads = [] queueLock.acquire() for num in range(100001,100020): workQueue.put(num) # 计入“票的编号” queueLock.release()
print "start .."
# 三个线程从一个管道里取数据,但不能同时取 for name in threadList: thread = myThread( name, workQueue) thread.start() threads.append(thread) while not workQueue.empty(): pass exitsingle = 1 for t in threads: t.join() print "stop enter.."
但这里貌似少了lock相关,具体可参考以上两个栗子。
#!/usr/bin/python3 # -*- coding: utf-8 -*- # @Author : # @File : text.py # @Software : PyCharm # @description : XXX from queue import Queue import random import threading import time # Producer thread class Producer(threading.Thread):
def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue
def run(self): for i in range(5): print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)) self.data.put(i) time.sleep(random.randrange(10) / 5) print("%s: %s finished!" % (time.ctime(), self.getName())) # Consumer thread class Consumer(threading.Thread):
def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue
def run(self): for i in range(5): val = self.data.get() print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val)) time.sleep(random.randrange(10)) print("%s: %s finished!" % (time.ctime(), self.getName()))
# Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer = Consumer('Con.', queue)
producer.start() consumer.start() producer.join() consumer.join() print('All threads terminate!') if __name__ == '__main__': main()
End.