"""思路1,将任务放在队列 1)建立队列:(初始化) 2)设置大小,线程池的最大容量 3)真实建立的线程 列表 4)空闲的线程数量2,着手开始处理任务 1)建立线程 2)空闲线程数量大于0,则再也不建立线程 3)建立线程池的数量 不能高于线程池的限制 4)根据任务个数判断 建立线程的数量 2)线程去队列中取任务 1)取任务包(任务包是一个元祖) 2)任务为空时,再也不取(终止)"""import timeimport threadingimport queuestopEvent = object() # 中止任务的标志class ThreadPool(object): def __init__(self, max_thread): # 建立任务队列,能够放无限个任务 self.queue = queue.Queue() # 指定最大线程数 self.max_thread = max_thread # 中止标志 self.terminal = False # 建立真实线程数 self.generate_list = [] # 空闲线程数 self.free_thread = [] def run(self, action, args, callback=None): """ 线程池执行一个任务 :param action:任务函数 :param args:任务参数 :param callback:执行完任务的回调函数,成功或者失败的返回值。 :return: """ # 线程池运行的条件:1) if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread: self.generate_thread() task = (action, args, callback) self.queue.put(task) def callback(self): """ 回调函数:循环取获取任务,并执行任务函数 :return: """ # 获取当前线程 current_thread = threading.current_thread() self.generate_list.append(current_thread) # 取任务并执行 event = self.queue.get() # 事件类型是任务 while event != stopEvent: # 重点是这个判断 使任务终止 # 解开任务包 ,(任务是一个元祖) # 执行任务 # 标记:执行任务前的状态,执行任务后的状态 action, args, callback = event try: ret = action(*args) success = True except Exception as x: success = False ret = x if callback is not None: try: callback(success, ret) except Exception as e: print(e) else: pass if not self.terminal: self.free_thread.append(current_thread) event = self.queue.get() self.free_thread.remove(current_thread) else: # 中止进行取任务 event = stopEvent else: # 不是元祖,不是任务,则清空当前线程,不在去取任务 self.generate_list.remove(current_thread) def generate_thread(self): """ 建立一个线程 :return: """ t = threading.Thread(target=self.callback) t.start() # 终止取任务 def terminals(self): """ 不管是否还有任务,终止线程 :return: """ self.terminal = True def close(self): """ 执行完全部的任务后,全部线程中止 :return: """ num = len(self.generate_list) self.queue.empty() while num: self.queue.put(stopEvent) num -= 1def test(pi): time.sleep(0.5) print(pi)pool = ThreadPool(10)for i in range(100): pool.run(action=test, args=(i,))pool.terminals()pool.close()