原生支持
from concurrent.futures import ThreadPoolExecutor,as_completed,waitdef call(params): pass # do something
max_workers = 2 executor = ThreadPoolExecutor(max_workers=max_workers) executor.submit(call, (1)) # submit是非阻塞的
1、max_workers参数来设置线程池中最多能同时运行的线程数目 2、submit()不是阻塞的,而是立即返回 3、done()方法判断该任务是否结束 4、cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了 5、result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的
as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,
在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束
for future in as_completed(all_task): data = future.result()
wait方法可以让主线程阻塞 all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=ALL_COMPLETED)
自己封装
在实际应用中发现原生的线程池会屏蔽异常显示,导致出现一些问题,不知道怎么回事,所以有了以下代码
# --coding-8-- import traceback import threading import timeTH_LOCK = threading.RLock() # 线程锁 TH_RUNNING_BOX = [] # 正在运行的线程 PROGRESS = 0 # 完成数
class ThreadPoolExecutor: tasks_count = 0 # 任务数 done_count = 0 # 完成数 running_count = 0 # 正在执行的线程数 max_workers = 2 # 线程池最大执行数量 __all_thread_box = [] # 所有待运行的线程
def init(self, max_workers=None): if max_workers: self.max_workers = max_workers
def submit(self, target, args): def wrapper(func, args): global TH_LOCK, TH_RUNNING_BOX, PROGRESS try: th_run = threading.Thread(target=func, args=args) th_run.start() th_run.join() except Exception as e: print(e) traceback.print_exc() finally: TH_LOCK.acquire() TH_RUNNING_BOX.remove(threading.current_thread()) PROGRESS += 1 TH_LOCK.release()
self.__all_thread_box.append(threading.Thread(target=wrapper, args=(target,) + args))
def run(self): global TH_RUNNING_BOX, PROGRESS self.tasks_count = len(self.all_thread_box) while len(TH_RUNNING_BOX) > 0 or len(self.all_thread_box) > 0: rm_th = [] # 准备运行的线程 for th in self.__all_thread_box: if len(TH_RUNNING_BOX) >= self.max_workers: break TH_RUNNING_BOX.append(th) th.start() rm_th.append(th) # 删除已经运行了的线程 for rm in rm_th: self.__all_thread_box.remove(rm) time.sleep(0.001) self.done_count = PROGRESS self.running_count = len(TH_RUNNING_BOX)
if 'main' == name: pass