python的线程池两种实现方式

原生支持

参考:https://www.jianshu.com/p/b9b3d66aa0be

from concurrent.futures import ThreadPoolExecutor,as_completed,wait

def call(params):
    pass
    # do something

max_workers = 2
executor = ThreadPoolExecutor(max_workers=max_workers)
executor.submit(call, (1))  # submit是非阻塞的



1、max_workers参数来设置线程池中最多能同时运行的线程数目
2、submit()不是阻塞的,而是立即返回
3、done()方法判断该任务是否结束
4、cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了
5、result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的

# as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,
# 在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束
for future in as_completed(all_task):
    data = future.result()


wait方法可以让主线程阻塞
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)

自己封装

在实际应用中发现原生的线程池会屏蔽异常显示,导致出现一些问题,不知道怎么回事,所以有了以下代码

# -*-coding-8-*-
import traceback
import threading
import time

TH_LOCK = threading.RLock()  # 线程锁
TH_RUNNING_BOX = []  # 正在运行的线程
PROGRESS = 0  # 完成数


class ThreadPoolExecutor:
    tasks_count = 0  # 任务数
    done_count = 0  # 完成数
    running_count = 0  # 正在执行的线程数
    max_workers = 2  # 线程池最大执行数量
    __all_thread_box = []  # 所有待运行的线程

    def __init__(self, max_workers=None):
        if max_workers:
            self.max_workers = max_workers

    def submit(self, target, *args):
        def wrapper(func, *args):
            global TH_LOCK, TH_RUNNING_BOX, PROGRESS
            try:
                th_run = threading.Thread(target=func, args=args)
                th_run.start()
                th_run.join()
            except Exception as e:
                print(e)
                traceback.print_exc()
            finally:
                TH_LOCK.acquire()
                TH_RUNNING_BOX.remove(threading.current_thread())
                PROGRESS += 1
                TH_LOCK.release()

        self.__all_thread_box.append(threading.Thread(target=wrapper, args=(target,) + args))

    def run(self):
        global TH_RUNNING_BOX, PROGRESS
        self.tasks_count = len(self.__all_thread_box)
        while len(TH_RUNNING_BOX) > 0 or len(self.__all_thread_box) > 0:
            rm_th = []  # 准备运行的线程
            for th in self.__all_thread_box:
                if len(TH_RUNNING_BOX) >= self.max_workers:
                    break
                TH_RUNNING_BOX.append(th)
                th.start()
                rm_th.append(th)
            # 删除已经运行了的线程
            for rm in rm_th:
                self.__all_thread_box.remove(rm)
            time.sleep(0.001)
            self.done_count = PROGRESS
            self.running_count = len(TH_RUNNING_BOX)


if '__main__' == __name__:
    pass


发表评论

能用钱来说话的地方总想讲人情来回避矛盾,最后必然是人情留不住,经济上也受损失