with executor as e:
futures = {e.submit(fact, i):i for i in range(0,4000)}
我们将启动并行执行计算 4000 个数字的阶乘。他executor是管理线程(ThreadPoolExecutor)或进程(ProcessPoolExecutor)的人,我们甚至可以让他根据我们 CPU 的核数决定他应该使用的“工人”数量。
完整的例子:
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait
def fact(n):
res = 1
for i in range(2, n+1):
res *= i
return res
if __name__ == "__main__":
t0 = time.time()
with ThreadPoolExecutor(max_workers=50) as e:
fs = {e.submit(fact, i):i for i in range(4000)}
wait(fs)
t1 = time.time()
with ProcessPoolExecutor() as e:
fs = {e.submit(fact, i):i for i in range(4000)}
wait(fs)
t2 = time.time()
print(f"Ejecución con hilos: {t1-t0:.2f}s")
print(f"Ejecución con procesos: {t2-t1:.2f}s")
一般而言,带有进程的窗口中的时间非常糟糕,并且在某些情况下可能比带有线程的情况更糟。
“异步”工作的一种方法是随时向执行程序添加更多任务,如下所示:
import time
from collections import deque
from itertools import islice
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
def fact(n):
res = 1
for i in range(2, n+1):
res *= i
return res
if __name__ == "__main__":
with ProcessPoolExecutor() as e:
# metemos 1000 procesos
fs1 = { e.submit(fact, i):i for i in range(1000) }
print("Lanzados 1000 futuros")
# obtenemos los primeros 10 resultados
res10 = { fs1[f]:f.result() for f in islice(as_completed(fs1), 10)}
# metemos otros 100000 más
fs2 = { e.submit(fact, i):i for i in range(1000, 100000) }
print("Lanzados 100000 futuros más")
# obtenemos 10 resultados de la segunda tanda
res20 = { fs2[f]:f.result() for f in islice(as_completed(fs2), 1000, 1010)}
# obtenemos los últimos 10 resultados de la tanda anterior
res30 = { fs1[f]:f.result() for f in deque(as_completed(fs1), 10)}
# Espera para comprobar cómo sube el consumo de CPU
print("Espera de 3 segundos")
time.sleep(3)
# Cancelamos todos los procesos que no hayan acabado de la segunda tanda
for f in fs2:
f.cancel()
print("Cierre del ejecutor de procesos")
print()
for (n,res) in res10.items():
print(f"fact({n:4d}) = {str(res)[:80]}...")
print()
for (n,res) in res20.items():
print(f"fact({n:4d}) = {str(res)[:80]}...")
print()
for (n,res) in res30.items():
print(f"fact({n:4d}) = {str(res)[:80]}...")
异步运行任务的一种高效且可扩展的方法是使用像celery这样的队列库。使用这个库,您可以定义“工作人员”,即执行繁重任务的进程(而不是线程)。该解决方案的一个有趣方面是可以有许多工作人员(甚至在不同的服务器上)执行任务。
该解决方案的架构如下:
以下是您
redis
用作代理的简单应用程序。consumidor.py
发送消息到productor.py
我假设它
redis
正在工作并且virtualenv
已安装。使用这些命令,您可以安装所有依赖项:消费者和生产者都需要设置,所以我将它们存储在(我正在使用我的本地
config.py
数据库):1
redis
这是 的内容
productor.py
。它只公开一个任务 (ejecutar_tarea
),在打印结果之前需要 10 秒。这是为了看看这个延迟是如何影响消费者的:这是
consumidor.py
. 它所做的只是从控制台接收消息并将其发送给生产者:最好的测试方法是在控制台中,打开dos并激活虚拟环境。
生产者运行如下:
在这里,我从日志级别开始:
info
了解正在发生的事情的详细信息。celery 文档中详细介绍了启动 worker 时可以使用的各种选项。消费者通过运行执行:
向消费者写入消息时,您可以看到生产者收到它并在 10 秒后打印它。有趣的是,消费者不需要等待这 10 秒,而是可以立即处理另一个。如果生产者收到许多消息,那么它们将被排队。
总而言之,这个策略很容易设置并且效果很好。当需要更多的力量时,增加新的工人来帮助负荷是相对容易的。
尽管我之前发表了评论,但我将尝试使用Futures举一个例子:
“ futures”是并发执行代码的抽象,对于线程的执行(Threads)和进程的执行(Process)同样有用。为此,它使用了一个执行管理器,该管理器负责执行并将结果(“承诺”)传递给任何请求它们的人。
例如,我们要计算一个数字序列的阶乘。和
我们将启动并行执行计算 4000 个数字的阶乘。他
executor
是管理线程(ThreadPoolExecutor
)或进程(ProcessPoolExecutor
)的人,我们甚至可以让他根据我们 CPU 的核数决定他应该使用的“工人”数量。完整的例子:
一般而言,带有进程的窗口中的时间非常糟糕,并且在某些情况下可能比带有线程的情况更糟。
“异步”工作的一种方法是随时向执行程序添加更多任务,如下所示:
编辑:我已经简化了代码以使其更好地理解。
需要注意的是,它只
res20
存储第二批期货的结果futures2
。那时,如果其他任何事情都不需要它们,我们本可以取消第一轮未决期货的执行。PS:如果它不适合你,请确保你使用 python 3.6