Python全栈开发之十一、进程和线程

1、线程

  多任务能够由多进程完成,也能够由一个进程内的多线程完成,一个进程内的全部线程,共享同一块内存python中建立线程比较简单,导入threading模块,下面来看一下代码中如何建立多线程。html

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.start()
    print('start')          # 主线程等待子线程完成,子线程并发执行


>>start
>>2
>>1
>>3
>>0
>>4

  主线程从上到下执行,建立5个子线程,打印出'start',而后等待子线程执行完结束,若是想让线程要一个个依次执行完,而不是并发操做,那么就要使用join方法。下面来看一下代码前端

import threading
import time

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.start()
        t.join()
    print('start')      # 线程从上到下依次执行,最后打印出start

>>0
>>1
>>2
>>3
>>4
>>start

  上面的代码不适用join的话,主线程会默认等待子线程结束,才会结束,若是不想让主线程等待子线程的话,能够子线程启动以前设置将其设置为后台线程,若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止,前台线程则相反,若果不加指定的话,默认为前台线程,下面从代码来看一下,如何设置为后台线程。例以下面的例子,主线程直接打印start,执行完后就结束,而不会去等待子线程,子线程中的数据也就不会打印出来python

import threading
import time

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.setDaemon(True)
        t.start()

    print('start')      # 主线程不等待子线程

>> start 

  除此以外,本身还能够为线程自定义名字,经过 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name参数,除此以外,Thread还有一下一些方法git

  • t.getName() : 获取线程的名称
  • t.setName() : 设置线程的名称 
  • t.name : 获取或设置线程的名称
  • t.is_alive() : 判断线程是否为激活状态
  • t.isAlive() :判断线程是否为激活状态
  • t.isDaemon() : 判断是否为守护线程

2、线程锁

  因为线程是共享同一分内存的,因此若是操做同一份数据,很容易形成冲突,这时候就能够为线程加上一个锁了,这里咱们使用Rlock,而不使用Lock,由于Lock若是屡次获取锁的时候会出错,而RLock容许在同一线程中被屡次acquire,可是须要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放以前,其余线程只有等待。 github

import threading
G = 1
lock = threading.RLock()
def fun():
    lock.acquire()    # 获取锁
    global G
    G += 2
    print(G, threading.current_thread().name)
    lock.release()   # 释放锁
    return


for i in range(10):
    t = threading.Thread(target=fun, name='t-{}'.format(i))
    t.start()

3 t-0
5 t-1
7 t-2
9 t-3
11 t-4
13 t-5
15 t-6
17 t-7
19 t-8
21 t-9    

3、线程间通讯Event

Event是线程间通讯最间的机制之一,主要用于主线程控制其余线程的执行,主要用过wait,clear,set,这三个方法来实现的的,下面来看一个简单的例子,windows

import threading
import time

def f1(event):
    print('start:')
    event.wait()            # 阻塞在,等待 set
    print('end:')

if __name__ == '__main__':
    event_obj  = threading.Event()
    for i in range(5):
        t = threading.Thread(target=f1, args=(event_obj,))
        t.start()

    event_obj.clear()      # 清除标志位 
    inp = input('>>>>:')
    if inp == 'true':
        event_obj.set()   # 设置标志位

4、队列  

  能够简单的理解为一种先进先出的数据结构,好比用于生产者消费者模型,或者用于写线程池,以及前面写select的时候,读写分离时候可用队列存储数据等等,之后用到队列的地方不少,所以对于队列的用法要熟练掌握。下面首先来看一下队列提供了哪些用法数组

q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0时,表示队列长度无限制。

q.join()        # 等到队列为kong的时候,在执行别的操做
q.qsize()       # 返回队列的大小 (不可靠)
q.empty()       # 当队列为空的时候,返回True 不然返回False (不可靠)
q.full()        # 当队列满的时候,返回True,不然返回False (不可靠)
q.put(item, block=True, timeout=None)   # 将item放入Queue尾部,item必须存在,参数block默认为True,表示当队列满时,会等待
                        # 为False时为非阻塞,此时若是队列已满,会引起queue.Full 异常。 可选参数timeout,表示会阻塞设置的时间,
                        # 若是在阻塞时间里 队列仍是没法放入,则引起 queue.Full 异常

q.get(block=True, timeout=None)     #  移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,若是队列为空,则阻塞
                       #  阻塞的话若此时队列为空,则引起queue.Empty异常。 可选参数timeout,表示会阻塞设置的时间,
q.get_nowait()               #  等效于 get(item,block=False) 

下面用代码来简单的演示下,消费者生成者模型,只是简单的演示下。数据结构

message = queue.Queue(10)

def product(num):
    for i in range(num):
        message.put(i)
        print('将{}添加到队列中'.format(i))
        time.sleep(random.randrange(0, 1))


def consume(num):
    count = 0
    while count<num:
        i = message.get()
        print('将{}从队列取出'.format(i))
        time.sleep(random.randrange(1, 2))
        count += 1


t1 = threading.Thread(target=product, args=(10, ))
t1.start()

t2 = threading.Thread(target=consume, args=(10, ))
t2.start()

5、进程 

  线程的上一级就是进程,进程可包含不少线程,进程和线程的区别是进程间的数据不共享,多进程也能够用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心说保持一致。多线程

在windows中不能用fork来建立多进程,所以只能导入multiprocessing,来模拟多进程,下面首先来看一下怎么建立进程,你们能够先猜一下下面的结果是什么并发

l = []

def f(i):
    l.append(i)
    print('hi', l)

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=f, args=(i,))        # 数据不共享,建立10份 l列表
        p.start()

6、进程间数据共享  

进程间的数据是不共享的,可是我若是非要数据共享了,那么就须要用其余方式了 

一、Value,Array

def f(a, b):
    a.value = 3.111
    for i in range(len(b)):
        b[i] += 100

if __name__ == '__main__':
    num = Value('f', 3.333)        # 相似C语言中的 浮点型数
    l = Array('i', range(10))       # 相似C语言中的整形数组,长度为10
    print(num.value)
    print(l[:])

    p = Process(target=f, args=(num, l))
    p.start()
    p.join()
    print(num.value)              # 你们本身运行一下,看下两次打印结果是否同样
    print(l[:])

二、manage  

方式一,使用的都是C语言中的数据结构,若是你们对c不熟悉的话,用起来比较麻烦,方式2就能够支持python自带的数据,下面来看一下

from multiprocessing import Process,Manager

def Foo(dic, i):
    dic[i] = 100 + i
    print(dic.values())

if __name__ == '__main__':
    manage = Manager()
    dic = manage.dict()

    for i in range(2):
        p = Process(target=Foo, args=(dic, i))
        p.start()
        p.join()

7、进程池  

  实际应用中,并非每次执行任务的时候,都去建立多进程,而是维护了一个进程池,每次执行的时候,都去进程池取一个,若是进程池里面的进程取光了,就会阻塞在那里,直到进程池中有可用进程为止。首先来看一下进程池提供了哪些方法

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,因为这个缘由,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。若是callback被指定,那么callback能够接收一个参数而后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被当即完成,不然处理结果的线程会被阻塞。

  • close() : 等待任务完成后在中止工做进程,阻止更多的任务提交到pool,待任务完成后,工做进程会退出。

  • terminate() : 无论任务是否完成,当即中止工做进程。在对pool对象进程垃圾回收的时候,会当即调用terminate()。

  • join() : 等待工做线程的退出,在调用join()前,必须调用close() or terminate()。这样是由于被终止的进程须要被父进程调用wait(join等价与wait,不然进程会成为僵尸进程。

下面来简单的看一下代码怎么用的

from multiprocessing import Pool
import time

def f1(i):
    time.sleep(1)
    # print(i)
    return i

def cb(i):
    print(i)

if __name__ == '__main__':
    poo = Pool(5)
    for i in range(20):
        # poo.apply(func=f1, args=(i,))   # 串行执行,排队执行 有join
        poo.apply_async(func=f1, args=(i,), callback=cb)  # 并发执行 主进程不等子进程,无join
    print('**********')

    poo.close()
    poo.join()

8、线程池 

  对于前面的进程池,python自带了一个模块Pool供咱们使用,可是对于线程池,则没有提供,所以须要咱们本身写,本身写的话,就须要用到队列,下面咱们来看一下本身怎么实现一个线程池,首先写一个最简单的版本。 

import threading
import time
import queue

class ThreadPool:
    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.add()

    def add(self):
        self.queue.put(threading.Thread)

    def get(self):
        return self.queue.get()

def f(tp, i):
    time.sleep(1)
    print(i)
    tp.add()

p = ThreadPool(10)
for i in range(20):
    thread = p.get()
    t = thread(target=f, args=(p, i))
    t.start()

上述代码写了一个线程池类,基本实现了线程池的功能,可是有不少缺点,没有实现回掉函数,每次执行任务的时候,任务处理函数每次执行完都须要自动执行对象的add方法,将线程对象添加到队列中去,并且类初始化的时候,一次性将全部的线程类都添加到队列中去了,总之上面的线程池虽然实现简单,可是实际上却有不少问题,下面来看一个真正意义上的线程池。

  在写代码以前,咱们先来看一下该怎么设计这样一个线程池,上面的线程池,咱们的队列中,存的是线程类,咱们每处理一个任务都实例化一个线程,而后执行完了以后,该线程就被丢弃了,这样有点不合适。咱们此次设计的时候,

  1. 队列中存的不是线程类,而是任务,咱们从队列中拿取的都是任务
  2. 每次执行任务的时候,不是都要生成一个线程,而是若是之前生成的线程有空闲的话,就用之前的线程
  3. 支持回掉机制,支持close,terminate

下面来一下代码是怎么实现的

import threading
import queue
import time
import contextlib

class ThreadingPool:
    def __init__(self, num):
        self.max = num
        self.terminal = False
        self.q = queue.Queue()
        self.generate_list = []         # 保存已经生成的线程
        self.free_list = []             # 保存那些已经完成任务的线程

    def run(self, func, args=None, callbk=None):
        self.q.put((func, args, callbk))            # 将任务信息做为一个元祖放到队列中去
        if len(self.free_list) == 0 and len(self.generate_list) < self.max:
           self.threadstart()

    def threadstart(self):
        t = threading.Thread(target=self.handel)
        t.start()

    def handel(self):
        current_thread = threading.current_thread()
        self.generate_list.append(current_thread)
        event = self.q.get()
        while event != 'stop':
            func, args, callbk = event
            flag = True
            try:
                ret = func(*args)
            except Exception as e:
                flag = False
                ret = e

            if callbk is not None:
                try:
                    callbk(ret)
                except Exception as e:
                    pass

            if not self.terminal:
                with self.auto_append_remove(current_thread):
                    event = self.q.get()
            else:
                event = 'stop'
        else:
            self.generate_list.remove(current_thread)

    def terminate(self):
        self.terminal = True

        while self.generate_list:
            self.q.put('stop')
        self.q.empty()

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put('stop')
            num -= 1

    @contextlib.contextmanager
    def auto_append_remove(self, thread):
        self.free_list.append(thread)
        try:
            yield
        finally:
            self.free_list.remove(thread)

def f(i):
    # time.sleep(1)
    return i

def f1(i):
    print(i)

p = ThreadingPool(5)
for i in range(20):
    p.run(func=f, args=(i,), callbk=f1)

p.close()

9、协程 

协程,又称微线程,协程执行看起来有点像多线程,可是事实上协程就是只有一个线程,所以,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优点就越明显,此外由于只有一个线程,不须要多线程的锁机制,也不存在同时写变量冲突。协程的适用场景:当程序中存在大量不须要CPU的操做时(IO)下面来看一个利用协程例子

from gevent import monkey
import gevent
import requests

# 把标准库中的thread/socket等给替换掉
# 这样咱们在后面使用socket的时候能够跟日常同样使用,无需修改任何代码,可是它变成非阻塞的了.
monkey.patch_all()      # 猴子补丁

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

上面的例子,利用协程,一个线程完成全部的请求,发出请求的时候,不会等待回复,而是一次性将全部的请求都发出求,收到一个回复就处理一个回复,这样一个线程就解决了全部的事情,效率极高。

10、小结 

这篇博文是pyton基础知识的最后一篇,后面会讲的博文会讲开始讲前端的知识,这里附上目录http://www.cnblogs.com/Wxtrkbc/p/5606048.html,之后会继续更新的,

相关文章
相关标签/搜索