VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • Python3标准库:concurrent.futures管理并发任务池

作者:@小灰灰
本文为作者原创,转载请注明出处:https://www.cnblogs.com/liuhui0308/p/12602053.html


回到顶部(go to top)

1. concurrent.futures管理并发任务池

concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池的API是一样的,所以应用只做最小的修改就可以在线程和进程之间顺利地切换。

这个模块提供了两种类型的类与这些池交互。执行器(executor)用来管理工作线程或进程池,future用来管理工作线程或进程计算的结果。要使用一个工作线程或进程池,应用要创建适当的执行器类的一个实例,然后向它提交任务来运行。每个任务启动时,会返回一个Future实例。需要任务的结果时,应用可以使用Future阻塞,直到得到结果。目前已经提供了不同的API,可以很方便地等待任务完成,所以不需要直接管理Future对象。

1.1 利用基本线程池使用map()

ThreadPooLExecutor管理一组工作线程,当这些线程可用于完成更多工作时,可以向它们传入任务。下面的例子使用map()并发地从一个输入迭代器生成一组结果。这个任务使用time.sleep()暂停不同的时间,从而展示不论任务的执行顺序如何,map()总是根据输入按顺序返回值。


  1. from concurrent import futures
  2. import threading
  3. import time
  4.  
  5. def task(n):
  6. print('{}: sleeping {}'.format(
  7. threading.current_thread().name,
  8. n)
  9. )
  10. time.sleep(n / 10)
  11. print('{}: done with {}'.format(
  12. threading.current_thread().name,
  13. n)
  14. )
  15. return n / 10
  16.  
  17. ex = futures.ThreadPoolExecutor(max_workers=2)
  18. print('main: starting')
  19. results = ex.map(task, range(5, 0, -1))
  20. print('main: unprocessed results {}'.format(results))
  21. print('main: waiting for real results')
  22. real_results = list(results)
  23. print('main: results: {}'.format(real_results))

map()的返回值实际上是一种特殊类型的迭代器,它知道主程序迭代处理时要等待各个响应。

 1.2 调度单个任务

除了使用map(),还可以借助submit()利用一个执行器调度单个任务。然后可以使用返回的Future实例等待这个任务的结果。


  1. from concurrent import futures
  2. import threading
  3. import time
  4.  
  5. def task(n):
  6. print('{}: sleeping {}'.format(
  7. threading.current_thread().name,
  8. n)
  9. )
  10. time.sleep(n / 10)
  11. print('{}: done with {}'.format(
  12. threading.current_thread().name,
  13. n)
  14. )
  15. return n / 10
  16.  
  17. ex = futures.ThreadPoolExecutor(max_workers=2)
  18. print('main: starting')
  19. f = ex.submit(task, 5)
  20. print('main: future: {}'.format(f))
  21. print('main: waiting for results')
  22. result = f.result()
  23. print('main: result: {}'.format(result))
  24. print('main: future after result: {}'.format(f))

任务完成之后,Future的状态会改变,并得到结果。

1.3 按任意顺序等待任务

调用Future的result()方法会阻塞,直到任务完成(可能返回一个值,也可能抛出一个异常)或者撤销。可以使用map()按调度任务的顺序访问多个任务的结果。如果处理结果的顺序不重要,则可以使用as_completed()在每个任务完成时处理它的结果。


  1. from concurrent import futures
  2. import random
  3. import time
  4.  
  5. def task(n):
  6. time.sleep(random.random())
  7. return (n, n / 10)
  8.  
  9. ex = futures.ThreadPoolExecutor(max_workers=5)
  10. print('main: starting')
  11.  
  12. wait_for = [
  13. ex.submit(task, i)
  14. for i in range(5, 0, -1)
  15. ]
  16.  
  17. for f in futures.as_completed(wait_for):
  18. print('main: result: {}'.format(f.result()))

因为池中的工作线程与任务同样多,故而所有任务都可以启动。它们会按随机的顺序完成,所以每次运行这个示例程序时as_completed()生成的值都不同。

1.4 回调

要在任务完成时采取某个动作,不用显式地等待结果,可以使用add_done_callback()指示Future完成时要调用一个新函数。这个回调应当是有一个参数(Future实例)的callable函数。


  1. from concurrent import futures
  2. import time
  3.  
  4. def task(n):
  5. print('{}: sleeping'.format(n))
  6. time.sleep(0.5)
  7. print('{}: done'