Python 自定义线程池

"""思路1,将任务放在队列    1)建立队列:(初始化)    2)设置大小,线程池的最大容量    3)真实建立的线程 列表    4)空闲的线程数量2,着手开始处理任务    1)建立线程        2)空闲线程数量大于0,则再也不建立线程        3)建立线程池的数量 不能高于线程池的限制        4)根据任务个数判断  建立线程的数量    2)线程去队列中取任务        1)取任务包(任务包是一个元祖)        2)任务为空时,再也不取(终止)"""import timeimport threadingimport queuestopEvent = object()  # 中止任务的标志class ThreadPool(object):    def __init__(self, max_thread):        # 建立任务队列,能够放无限个任务        self.queue = queue.Queue()        # 指定最大线程数        self.max_thread = max_thread        # 中止标志        self.terminal = False        # 建立真实线程数        self.generate_list = []        # 空闲线程数        self.free_thread = []    def run(self, action, args, callback=None):        """        线程池执行一个任务        :param action:任务函数        :param args:任务参数        :param callback:执行完任务的回调函数,成功或者失败的返回值。        :return:        """        # 线程池运行的条件:1)        if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:            self.generate_thread()        task = (action, args, callback)        self.queue.put(task)    def callback(self):        """        回调函数:循环取获取任务,并执行任务函数        :return:        """        # 获取当前线程        current_thread = threading.current_thread()        self.generate_list.append(current_thread)        # 取任务并执行        event = self.queue.get()        # 事件类型是任务        while event != stopEvent:  # 重点是这个判断  使任务终止            # 解开任务包 ,(任务是一个元祖)            # 执行任务            # 标记:执行任务前的状态,执行任务后的状态            action, args, callback = event            try:                ret = action(*args)                success = True            except Exception as x:                success = False                ret = x            if callback is not None:                try:                    callback(success, ret)                except Exception as e:                    print(e)            else:                pass            if not self.terminal:                self.free_thread.append(current_thread)                event = self.queue.get()                self.free_thread.remove(current_thread)            else:                # 中止进行取任务                event = stopEvent        else:            # 不是元祖,不是任务,则清空当前线程,不在去取任务            self.generate_list.remove(current_thread)    def generate_thread(self):        """        建立一个线程        :return:        """        t = threading.Thread(target=self.callback)        t.start()    # 终止取任务    def terminals(self):        """        不管是否还有任务,终止线程        :return:        """        self.terminal = True    def close(self):        """        执行完全部的任务后,全部线程中止        :return:        """        num = len(self.generate_list)        self.queue.empty()        while num:            self.queue.put(stopEvent)            num -= 1def test(pi):    time.sleep(0.5)    print(pi)pool = ThreadPool(10)for i in range(100):    pool.run(action=test, args=(i,))pool.terminals()pool.close()
相关文章
相关标签/搜索