$ cat ~/articles/85 _

python的线程池两种实现方式

作者:jaifire 2019-04-21 11:39 977 阅读

原生支持

参考:https://www.jianshu.com/p/b9b3d66aa0be

from concurrent.futures import ThreadPoolExecutor,as_completed,wait

def call(params):     pass     # do something

max_workers = 2 executor = ThreadPoolExecutor(max_workers=max_workers) executor.submit(call, (1))  # submit是非阻塞的

1、max_workers参数来设置线程池中最多能同时运行的线程数目 2、submit()不是阻塞的,而是立即返回 3、done()方法判断该任务是否结束 4、cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了 5、result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的

 as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,

 在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束

for future in as_completed(all_task):     data = future.result()

wait方法可以让主线程阻塞 all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=ALL_COMPLETED)

自己封装

在实际应用中发现原生的线程池会屏蔽异常显示,导致出现一些问题,不知道怎么回事,所以有了以下代码

# --coding-8--
import traceback
import threading
import time

TH_LOCK = threading.RLock()  # 线程锁 TH_RUNNING_BOX = []  # 正在运行的线程 PROGRESS = 0  # 完成数

class ThreadPoolExecutor:     tasks_count = 0  # 任务数     done_count = 0  # 完成数     running_count = 0  # 正在执行的线程数     max_workers = 2  # 线程池最大执行数量     __all_thread_box = []  # 所有待运行的线程

    def init(self, max_workers=None):         if max_workers:             self.max_workers = max_workers

    def submit(self, target, args):         def wrapper(func, args):             global TH_LOCK, TH_RUNNING_BOX, PROGRESS             try:                 th_run = threading.Thread(target=func, args=args)                 th_run.start()                 th_run.join()             except Exception as e:                 print(e)                 traceback.print_exc()             finally:                 TH_LOCK.acquire()                 TH_RUNNING_BOX.remove(threading.current_thread())                 PROGRESS += 1                 TH_LOCK.release()

        self.__all_thread_box.append(threading.Thread(target=wrapper, args=(target,) + args))

    def run(self):         global TH_RUNNING_BOX, PROGRESS         self.tasks_count = len(self.all_thread_box)         while len(TH_RUNNING_BOX) > 0 or len(self.all_thread_box) > 0:             rm_th = []  # 准备运行的线程             for th in self.__all_thread_box:                 if len(TH_RUNNING_BOX) >= self.max_workers:                     break                 TH_RUNNING_BOX.append(th)                 th.start()                 rm_th.append(th)             # 删除已经运行了的线程             for rm in rm_th:                 self.__all_thread_box.remove(rm)             time.sleep(0.001)             self.done_count = PROGRESS             self.running_count = len(TH_RUNNING_BOX)

if 'main' == name:     pass