Python之路【第八篇】python实现线程池

线程池概念

什么是线程池?
诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。
构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就建立一个新的服务对象,而后在新的服务对象中为请求服务。
但当有大量请求并发访问时,服务器不断的建立和销毁对象的开销很大。
因此提升服务器效率的一个手段就是尽量减小建立和销毁对象的次数,特别是一些很耗资源的对象建立和销毁,这样就引入了“池”的概念,
“池”的概念使得人们能够定制必定量的资源,而后对这些资源进行复用,而不是频繁的建立和销毁。

线程池是预先建立线程的一种技术
这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。
当请求到来以后,缓冲池给此次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。
当预先建立的线程都处于运行状态,即预制线程不够,线程池能够自由建立必定数量的新线程,用于处理更多的请求。
当系统比较闲的时候,也能够经过移除一部分一直处于停用状态的线程。

线程池的注意事项
虽然线程池是构建多线程应用程序的强大机制,但使用它并非没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。
一、线程池大小。多线程应用并不是线程越多越好,须要根据系统运行的软硬件环境以及应用自己的特色决定线程池的大小。html

通常来讲,若是代码结构合理的话,线程数目与CPU 数量相适合便可。
若是线程运行时可能出现阻塞现象,可相应增长池的大小;若有必要可采用自适应算法来动态调整线程池的大小,以提升CPU 的有效利用率和系统的总体性能。
二、并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
三、线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。python

线程池要点

线程池要点:web

线程池要点:
1、经过判断等待的任务数量和线程池中的最大值,取最小值来判断开启多少线程来工做
好比:
任务数是3,进程池最大20  ,那么我们只须要开启3个线程就好了。
任务数是500,进程池是20,那么我们只开20个线程就能够了。
取最小值

2、实现线程池正在运行,有一个查看的功能,查看一下如今线程里面活跃的线程是多少等待的是多少?

线程总共是多少,等待中多少,正在运行中多少
做用:
方便查看当前线程池状态
能获取到这个以后就能够当线程一直处于空闲状态

查看状态用:上下文管理来作,很是nice的一点

三、关闭线程

简单线程池实现算法

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import Queue
import threading
import time

'''
这个简单的例子的想法是经过:
一、利用Queue特性,在Queue里建立多个线程对象
二、那我执行代码的时候,去queue里去拿线程!
若是线程池里有可用的,直接拿。
若是线程池里没有可用,那就等。
三、线程执行完毕,归还给线程池
'''

class ThreadPool(object): #建立线程池类
    def __init__(self,max_thread=20):#构造方法,设置最大的线程数为20
        self.queue = Queue.Queue(max_thread) #建立一个队列
        for i in xrange(max_thread):#循环把线程对象加入到队列中
            self.queue.put(threading.Thread)
            #把线程的类名放进去,执行完这个Queue

    def get_thread(self):#定义方法从队列里获取线程
        return self.queue.get()

    def add_thread(self):#定义方法在队列里添加线程
        self.queue.put(threading.Thread)

pool = ThreadPool(10)

def func(arg,p):
    print arg
    time.sleep(2)
    p.add_thread() #当前线程执行完了,我在队列里加一个线程!

for i in xrange(300):
    thread = pool.get_thread() #线程池10个线程,每一次循环拿走一个!默认queue.get(),若是队列里没有数据就会等待。
    t = thread(target=func,args=(i,pool))
    t.start()


'''
self.queue.put(threading.Thread) 添加的是类不是对象,在内存中若是相同的类只占一分内存空间
而且若是这里存储的是对象的话每次都的新增都得在内存中开辟一段内存空间

还有若是是对象的话:下面的这个语句就不能这么调用了!
for i in xrange(300):
    thread = pool.get_thread()
    t = thread(target=func,args=(i,pool))
    t.start()
    经过查看源码能够知道,在thread的构造函数中:self.__args = args  self.__target = target  都是私有字段那么调用就应该这么写

for i in xrange(300):
    ret = pool.get_thread()
    ret._Thread__target = func
    ret._Thread__args = (i,pool)
    ret.start()
'''
simple_pool.py

复杂线程池须要知道的知识点数据库

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'

import Queue
obj = object() #object也是一个类,我建立了一个对象obj

q = Queue.Queue()
for i in range(10):
    print id(obj)#看萝卜号
    q.put(obj)
'''
这个队列里有10个萝卜(萝卜=obj),可是这10个萝卜只是个投影。
咱们在for循环的时候put到队列里,obj有变化吗?是否有新开辟空间?显然没有
'''
knowledge_point_1.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import contextlib
import threading
import time
import random

doing = []
def number(l2):
    while True:
        print len(l2)
        time.sleep(1)

t = threading.Thread(target=number,args=(doing,))  #开启一个线程,每一秒打印列表,当前工做中的线程数量
t.start()


#添加管理上下文的装饰器
@contextlib.contextmanager
def show(li,iterm):
    li.append(iterm)
    yield
    '''
    yield冻结此次操做,就出去了,with就会捕捉到,而后就会执行with下的代码块,当with下的代码块
    执行完毕后就会回来继续执行yield下面没有执行的代码块!
    而后就执行完毕了
    若是with代码块中的很是耗时,那么doing的长度是否是一直是1,说明他没执行完呢?咱们就能够获取到正在执行的数量,当他with执行完毕后
    执行yield的后续的代码块。把他移除后就为0了!
    '''
    li.remove(iterm)


def task(arg):
    with show(doing,1):#经过with管理上下文进行切换
        print len(doing)
        time.sleep(10) #等待10秒这里可使用random模块来操做~

for i in range(20): #开启20个线程执行
    temp = threading.Thread(target=task,args=(i,))
    temp.start()

'''
做用:咱们要记录正在工做的的列表
好比正在工做的线程我把加入到doing这个列表中,若是工做完成的把它从doing列表中移除。
经过这个机制,就能够获取如今正在执行的线程都有多少
'''
knowledge_point_2.py

线程池实现

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
from Queue import Queue
import contextlib
import threading

WorkerStop = object()


class ThreadPool:
    workers = 0
    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)

    def __init__(self, maxthreads=20, name=None):
        self.q = Queue(0) #这里建立一个队列,若是是0的话表示不限制,如今这个队列里放的是任务
        self.max = maxthreads #定义最大线程数
        self.name = name
        self.waiters = []#这两个是用来计数的
        self.working = []#这两个是用来技术的

    def start(self):
        #self.max 最大线程数
        #q.qisze(),任务个数
        needSize = self.q.qsize()
        while self.workers < min(self.max, needSize):#min(10,20)取最小值
            #wokers默认为0  【workers = 0】
            '''
            举例来讲:
            while self.workers < min(self.max, needSize):
            这个循环,好比最大线程为20,我们的任务个数为10,取最小值为10
            每次循环开1个线程,而且workers自增1,那么循环10次后,开了10个线程了workers = 10 ,那么workers就不小于10了
            就不开线程了,我线程开到最大了,大家这10个线程去消耗这10个任务去吧
            而且这里不阻塞,建立完线程就去执行了!
            每个线程都去执行_worker方法去了
            '''
            self.startAWorker()

    def startAWorker(self):
        self.workers += 1
        newThread = self.threadFactory(target=self._worker, name='shuaige') #建立一个线程并去执行_worker方法
        newThread.start()

    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)

    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)


    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)

    def _worker(self):
        ct = self.currentThread()
        o = self.q.get() #去队列里取任务,若是有任务就O就会有值,每一个任务是个元组,有方法,有参数
        while o is not WorkerStop:
            with self._workerState(self.working, ct):  #上下文切换
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass
                    else:
                        pass

                del function, args, kwargs

                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        #context.call(ctx, log.err)
                        pass

                del onResult, result

            with self._workerState(self.waiters, ct): #当线程工做完闲暇的时候,在去取任务执行
                o = self.q.get()

    def stop(self): #定义关闭线程方法
        while self.workers: #循环workers值
            self.q.put(WorkerStop) #在队列中增长一个信号~
            self.workers -= 1 #workers值-1 直到全部线程关闭


def show(arg):
    import time
    time.sleep(1)
    print arg


pool = ThreadPool(10)

#建立500个任务,队列里添加了500个任务
#每一个任务都是一个元组(方法名,动态参数,动态参数,默认为NoNe)
for i in range(100):
    pool.callInThread(show, i)

pool.start()  #队列添加完成以后,开启线程让线程一个一个去队列里去拿

pool.stop() #当上面的任务都执行完以后,线程中都在等待着在队列里去数据呢!
'''
咱们要关闭全部的线程,执行stop方法,首先workers这个值是当前的线程数量,咱们给线程发送一个信号“WorkerStop”
在线程的工做里:        while o is not WorkerStop:   若是线程获取到这个值就不执行了,而后这个线程while循环就中止了,等待
python的垃圾回收机制,回收。

而后在self.workers -= 1 ,那么全部的线程收到这个信号以后就会中止!!!
over~
'''

 

更多请参考:http://www.cnblogs.com/wupeiqi/articles/4839959.html服务器

相关文章
相关标签/搜索