Day-12: 进程和线程

  • 进程和线程

  在操做系统看来,一个任务就是一个进程,而一个进程内部若是要作多个任务就是有多个线程。一个进程至少有一个线程。python

  真正的并行执行任务是由多个CUP分别执行任务,实际中是由,操做系统轮流让各个任务交替执行,任务1执行0.01秒,任务2执行0.01秒,以后再依次切换。linux

  Python中支持两种模式:windows

  多进程模式服务器

  多线程模式网络

 

  • 多进程

  Linux操做系统下,提供了一个fork()系统调用。调用一次fork(),返回两次,由于操做系统自动把当前的进程(做为父进程)复制了一份(称为子进程),而后子进程返回0,父进程返回子进程的ID。多线程

# multiprocessing.py
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

  因为windows下没有fork()调用,提供了multiprocessing模块进行跨平台版本的多进程模块。并发

用Process类表明建立进程对象,传入一个执行函数和函数的参数。以后再用start()方法启动,jion()方法能够等待子进程结束后再继续往下进行,一般用于进程间的同步。app

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

  Pool进程池建立多个子进程dom

  对Pool对象建立多个子进程后,用close()方法结束建立,再用join()方法等待全部子进程执行完毕。在每一个子进程中会随机休眠一段时间,其余的子进程在这段休眠时间里就会调用。异步

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

  进程间通讯

  Python的miltiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

  在父进程中建立两个子进程,一个往Queue中写数据,一个从Queue里读数据。

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print 'Get %s from queue.' % value

if __name__=='__main__':
    # 父进程建立Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,没法等待其结束,只能强行终止:
    pr.terminate()
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
  • 多线程

  Python中提供两个模块,thread是低级模块,threading是高级模块,对thread进行了封装。

import time, threading

# 新线程执行的代码:
def loop():
    print 'thread %s is running...' % threading.current_thread().name
    n = 0
    while n < 5:
        n = n + 1
        print 'thread %s >>> %s' % (threading.current_thread().name, n)
        time.sleep(1)
    print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

  多进程和多线程最大的不一样在于,多进程中,同一个变量,各自有一份拷贝存在于每一个进程中,互不影响,而多线程中,全部变量都由全部线程共享,因此,任何一个变量均可以被任何一个线程修改。所以,线程之间共享数据最大的危险在于多个线程同时该变一个变量,把内容给改乱了。  

  所以得加上一把锁lock

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了必定要释放锁:
            lock.release()

   当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,而后继续执行代码,其余线程就继续等待直到得到锁为止。

  可是,这样实际上就不是并行处理了。

  Python的多进程因为存在GIL锁的问题,因此多线程实际上不能有效利用多核。多线程的并发在Python中是无用的。

  • ThreadLocal

  全局变量local_school就是一个ThreadLoacl对象,每一个Thread对它均可以读写student属性,可是互不影响。能够把local_school当作全局变量,但每一个属性如local_school.student都是线程的局部变量,能够任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

import threading

# 建立全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
  • 进程vs线程

  多进程的优势是稳定性高,一个崩溃,不会影响其余的进程,可是,代价大,在linux下,调用fork还能够,可是windows下进程开销巨大。

  多线程模式比多进程快一点,可是也快不了多少,缺点十分明显,因为共享进程的内存,一个线程崩了,就都崩了。

  计算密集型和IO密集型:

  计算密集型会消耗大量的CPU资源,代码的运行效率就相当重要,Python等脚本语言运行效率低,不适合。

  IO密集型涉及到网络、磁盘IO的任务,它们的CUP消耗较少,任务的主要时间在等待IO操做完成,CUP效率没法彻底使用,因此适合开发效率高的语言。

  现代操做系统对IO操做进行了巨大的改进,支持异步IO。利用异步IO,就能够用单进程模型来执行多任务,这种全新的模型称为事件驱动型。

  • 分布式进程

  多台电脑协助工做,一台电脑做为调度者,依靠网络通讯,将任务分布到其余电脑的进程中。

  经过manager模块把Queue经过网络暴露出去,让其余机器的进程能够访问Queue

服务器继承中,负责启动Queue,把Queue注册到网络上,而后往Queue里面写入任务:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动Queue:
manager.start()
# 得到经过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()

在另外一台机器上启动任务进程:

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 建立相似的QueueManager:
class QueueManager(BaseManager):
    pass

# 因为这个QueueManager只从网络上获取Queue,因此注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 链接到服务器,也就是运行taskmanager.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与taskmanager.py设置的彻底一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络链接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

服务进程启动以下:

$ python taskmanager.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

工做进程启动以下:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

等到工做进程结束后,服务进程以下:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

  注意Queue的做用是来传递任务和接受结果的,每一个任务的描述量要尽可能小。好比发送一个处理日志文件的任务,不要发送几百兆的日志文件自己,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

 注:本文为学习廖雪峰Python入门整理后的笔记

相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息