对于操做系统来讲, 一个任务就是一个进程(Process)python
进程内的这些“子任务”称为线程(Thread)linux
真正的并行执行多任务只能在多核CPU上实现web
多任务的实现有3种方式:编程
多进程模式;windows
多线程模式;服务器
多进程+多线程模式网络
Python既支持多进程, 又支持多线程多线程
Unix/Linux操做系统提供了一个fork()系统调用,它很是特殊。普通的函数调用,调用一次,返回一次,可是fork()调用一次,返回两次, 由于操做系统自动把当前进程(称为父进程)复制了一份(称为子进程),而后,分别在父进程和子进程内返回。并发
子进程永远返回0, 而父进程返回子进程的ID, 进程只须要调用getppid()就能够拿到父进程的IDapp
在python中能够经过导入os模块来完成一些系统的调用
os.getpid()能够返回当前进程的pid
os.fork()能够调用fork系统调用, 只不过只是支持linux系列的系统
因为在windows上没法使用fork(), 因此在python中提供了模块multiprocessing来造成子进程
导入multiprocessing模块的方法是使用from multiprocessing import导入
利用process函数来建立一个子进程
第一个参数能够是用target用于传递一个函数, 用于生成进程以后调用该方法
第二个参数是args传递的剩余参数
使用start()方法来启动子进程
join()方法表示父进程要等待子进程执行完毕以后才能继续往下执行, 一般用于进程间的同步
具体的使用实例以下
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
要建立大量的进程就须要使用进程池
一样是multiprocessing模块下的, 可是使用的函数是Pool
具体是Pool()能够传入一个值用于设定子进程同时执行的数量, 返回一个进程池
Pool默认的大小是CPU的内核数量
进程池能够调用apply_async()函数来建立子进程, 一样第一个参数能够绑定一个方法, 第二个参数args
对Pool
对象调用join()
方法会等待全部子进程执行完毕,调用join()
以前必须先调用close()
,调用close()
以后就不能继续添加新的Process
了
具体建立代码
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
若是不只要建立执行子进程, 还须要控制进程的输入和输出, 那就须要使用subprocess模块
具体代码以下
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
进程之间还须要通讯, python经过Queue和Pipes来交换数据
下面是建立两个进程, 一个是往Queue里写入数据, 一个是从Queue里读数据
具体代码以下
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程建立Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,没法等待其结束,只能强行终止:
pr.terminate()
一个进程至少有一个线程
线程是操做系统直接支持的执行单元
在python中提供两个模块进程线程的操做, 一个是_thread, 一个是threading
其中_thread是低级模块, threading是高级模块, 对_thread进程了封装, 通常只使用threading就行
启动一个线程就是把一个函数传入并建立Thread实例, 而后调用start()开始执行
因为任何进程默认就会启动一个线程,咱们把该线程称为主线程, 主线程又能够启动新的线程
Python的threading模块有个current_thread()函数,它永远返回当前线程的实例
主线程实例的名字叫MainThread,子线程的名字在建立时指定,咱们用LoopThread命名子线程
名字仅仅在打印时用来显示,彻底没有其余意义,若是不起名字Python就自动给线程命名为Thread-1,Thread-2……
具体代码以下
import time, threading
# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
多线程和多进程的区别
多进程中, 同一个变量, 各自有一份拷贝, 互相不影响
多线程中, 全部变量都是有全部线程共享, 任何一个变量均可以被任何一个线程修改, 因此必定要注意同时修改一个变量的状况
所以可使用锁来实现对并发修改的控制
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了必定要释放锁:
lock.release()
通常地, 一个死循环线程会100%占用一个CPU, 若是有两个死循环线程的话, 就会监控到占用200%的CPU
可是在Python中, 因为GIL的限制, 一个进行当前的线程只能有一个
在多线程环境下, 每一个线程都有本身的数据, 且这些数据都是局部变量
可是大多时候, 一个进程的多个线程可能须要共用有个数据, 这个时候若是不断传递参数就显得臃肿, 建立一个全局变量经过键值对来保存尽管能够解决这一问题, 可是代码不够美观
所以能够在多线程中, 使用threading.local()建立一个ThreadLocal对象来当作那个全局变量
import threading
# 建立全局ThreadLocal对象:
local_school = threading.local()
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
通常的多任务, 一般会设计 Master-Worker 模式来处理
Master用于分配任务, Worker用于执行任务, 通常多任务环境下有一个Master多个Worker
稳定性上:
多进程: 稳定性好, 一个子进程崩溃了不会影响主进程, 通常Master进程很低可能崩溃, Apache就是使用的多线程
多线程: 稳定性不如多进程, 一个子线程崩溃程序就会挂掉
资源开销上:
多进程: 进程开销大, 一个操做系统可以同时运行的进程是有限的
多线程: 线程开销小, 所以通常地处理速度也较快, IIS就是使用的多线程
关于线程切换
不管是多线程仍是多进程, 一旦数据过量, 效率就会下降
所以进程或者线程的切换, 都是须要时间的, 若是数量过多, 切换花费的时间就更多了
关于计算密集型和IO密集型
计算密集型主要消耗CPU资源, 所以任务切换的越频繁, 效率就越低, 通常计算密集型同时进行的数量至关于CPU核心数
相对的IO密集型就有所不一样, 因为IO操做(网络, 磁盘IO等)比较浪费时间, 此时python就颇有优点
关于异步IO
若是是同步IO的话, 那么在IO没有执行完毕以前程序是没法继续往下执行的
异步IO可使得程序在不用等待IO操做完成程序能够继续往下执行
现代操做系统对IO操做的支持已经作了巨大的改进, 利用异步IO可使得单线程模型执行多任务, 这也就是事件驱动模型
常见的异步IO的web服务器是Nginx, 单核CPU采用单线程进行, 多核CPU通常运行与CPU核心相同数量的进程数
在Python中, 单线程的异步编程模型就是协程
通常在Python中, 线程和进程通常会选择进程来编写代码
同时multiprocessing模块不但支持多进程, 还支持多进程分布到多台机器当中
共享消息队列的多线程的使用方法以下
import random, queue
from multiprocessing.managers import BaseManager
# 建立两个队列, 发送任务的队列 和 接受消息队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 得到经过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
获取消息队列的任务并执行的多进程以下
import time, sys, queue
from multiprocessing.managers import BaseManager
# 建立相似的QueueManager:
class QueueManager(BaseManager):
pass
# 因为这个QueueManager只从网络上获取Queue,因此注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 链接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的彻底一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络链接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')