Recabamos en el mundo online para así traerte la respuesta para tu dilema, si continúas con alguna pregunta puedes dejar la pregunta y te contestaremos con gusto.
Visión general:
Hay 2 partes en mi respuesta:
- La parte 1 muestra cómo ganar más velocidad con @ niemmi’s
ProcessPoolExecutor.map()
solución. - La parte 2 muestra cuando
ProcessPoolExecutor
subclases de.submit()
y.map()
producir tiempos de cálculo no equivalentes.
================================================ =====================
Parte 1: Más aceleración para ProcessPoolExecutor.map ()
Fondo:
Esta sección se basa en @ niemmi’s .map()
solución, que en sí misma es excelente. Mientras investigaba un poco sobre su esquema de discretización para comprender mejor cómo interactúa con el argumento de .map () fragmentaciones, encontré esta interesante solución.
Considero la definición de @ niemmi de chunk = nmax // workers
para ser una definición de chunksize, es decir, un tamaño más pequeño del rango de números reales (tarea dada) que debe abordar cada trabajador en el grupo de trabajadores. Ahora, esta definición se basa en el supuesto de que si una computadora tiene x número de trabajadores, dividir la tarea equitativamente entre cada trabajador dará como resultado un uso óptimo de cada trabajador y, por lo tanto, la tarea total se completará más rápido. Por lo tanto, el número de fragmentos para dividir una tarea determinada siempre debe ser igual al número de trabajadores del grupo. Sin embargo, ¿es correcta esta suposición?
Proposición: Aquí, propongo que la suposición anterior no siempre conduce al tiempo de cálculo más rápido cuando se usa con ProcessPoolExecutor.map()
. Bastante, discretizar una tarea en una cantidad mayor que la cantidad de trabajadores de la piscina puede conducir a una aceleración, es decir, a una finalización más rápida de una tarea determinada.
Experimentar: He modificado el código de @ niemmi para permitir que el número de tareas discretizadas supere el número de trabajadores del grupo. Este código se proporciona a continuación y se utiliza para determinar el número de veces que aparece el número 5 en el rango de números de 0 a 1E8. He ejecutado este código usando 1, 2, 4 y 6 trabajadores de grupo y para varias proporciones de número de tareas discretizadas frente al número de trabajadores de grupo. Para cada escenario, se realizaron 3 corridas y se tabularon los tiempos de cálculo. “Acelerar“se define aquí como el tiempo de cálculo promedio utilizando el mismo número de fragmentos y trabajadores del grupo durante el tiempo de cálculo promedio cuando el número de tareas discretizadas es mayor que el número de trabajadores del grupo.
Recomendaciones:
-
La figura de la izquierda muestra el tiempo de cálculo que tomaron todos los escenarios mencionados en la sección del experimento. Demuestra que el calcular el tiempo empleado por número de trozos / número de trabajadores = 1 es siempre mayor que el tiempo de cálculo que toma número de trozos> número de trabajadores. Es decir, el primer caso es siempre menos eficiente que el segundo.
-
La figura de la derecha muestra que se obtuvo una aceleración de 1,2 veces o más cuando el número de trozos / número de trabajadores alcanzar un valor umbral de 14 o más. Es interesante observar que la tendencia de aceleración también ocurrió cuando
ProcessPoolExecutor.map()
se ejecutó con 1 trabajador.
Conclusión: Al personalizar el número de tareas discretas que ProcessPoolExecutor.map () `debería usar para resolver una tarea determinada, es prudente asegurarse de que este número sea mayor que el número de trabajadores del grupo, ya que esta práctica acorta el tiempo de cálculo.
código concurrent.futures.ProcessPoolExecutor.map (). (solo partes revisadas)
def _concurrent_map(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop,
itertools.repeat(number))
# 2.2. Consolidate result as a list and return this list.
for future in futures:
#print('type(future)=',type(future))
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('n within statement of def _concurrent(nmax, number):')
print("found 0 in 1:.4fsec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 4 # Pool of workers
chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
a = _concurrent_map(nmax, number, workers, num_of_chunks)
end = time() - start
print('n main')
print('nmax=, workers=, num_of_chunks='.format(
nmax, workers, num_of_chunks))
print('workers = ', workers)
print("found 0 in 1:.4fsec".format(len(a),end))
================================================ =====================
Parte 2: El tiempo de cálculo total desde el uso de las subclases de ProcessPoolExecutor .submit () y .map () puede ser diferente cuando se devuelve una lista de resultados ordenada / ordenada.
Fondo: He enmendado tanto el .submit()
y .map()
códigos para permitir una comparación “manzana a manzana” de su tiempo de cómputo y la capacidad de visualizar el tiempo de cómputo del código principal, el tiempo de cómputo del método _concurrent llamado por el código principal para realizar las operaciones concurrentes, y el cómputo tiempo para cada tarea / trabajador discretizado llamado por el método _concurrent. Además, el método concurrente en estos códigos se estructuró para devolver una lista ordenada y desordenada del resultado directamente del objeto futuro de .submit()
y el iterador de .map()
. El código fuente se proporciona a continuación (Espero que te ayude.).
Experimentos Estos dos códigos recientemente mejorados se usaron para realizar el mismo experimento descrito en la Parte 1, salvo que solo se consideraron 6 trabajadores de grupo y el python incorporado list
y sorted
Se utilizaron métodos para devolver una lista ordenada y desordenada de los resultados a la sección principal del código, respectivamente.
Recomendaciones:
- A partir del resultado del método _concurrent, podemos ver los tiempos de cálculo del método _concurrent utilizado para crear todos los objetos Future de
ProcessPoolExecutor.submit()
, y para crear el iterador deProcessPoolExecutor.map()
, en función del número de tareas discretizadas sobre el número de trabajadores del grupo, son equivalentes. Este resultado simplemente significa que elProcessPoolExecutor
subclases.submit()
y.map()
son igualmente eficientes / rápidos. - Comparando los tiempos de cálculo de main y su método _concurrent, podemos ver que main se ejecutó más tiempo que su método _concurrent. Esto es de esperar ya que su diferencia de tiempo refleja la cantidad de tiempos de cómputo del
list
ysorted
métodos (y el de los otros métodos incluidos dentro de estos métodos). Claramente visto, ellist
El método tomó menos tiempo de cálculo para devolver una lista de resultados que elsorted
método. Los tiempos de cálculo promedio dellist
El método para los códigos .submit () y .map () fue similar, en ~ 0.47seg. El tiempo de cálculo promedio del método ordenado para los códigos .submit () y .map () fue 1.23sec y 1.01sec, respectivamente. En otras palabras, ellist
El método se realizó 2,62 veces y 2,15 veces más rápido quesorted
método para los códigos .submit () y .map (), respectivamente. - No está claro por qué
sorted
método generó una lista ordenada de
.map()
más rápido que desde.submit()
, ya que el número de tareas discretizadas aumentó más que el número de trabajadores del grupo, salvo cuando el número de tareas discretizadas fue igual al número de trabajadores del grupo. Dicho esto, estos hallazgos muestran que la decisión de utilizar el sistema igualmente rápido.submit()
o.map()
las subclases se pueden gravar con el método ordenado. Por ejemplo, si la intención es generar una lista ordenada en el menor tiempo posible, se debe preferir el uso de ProcessPoolExecutor.map () sobreProcessPoolExecutor.submit()
como.map()
puede permitir el tiempo de cálculo total más corto. - El esquema de discretización mencionado en la Parte 1 de mi respuesta se muestra aquí para acelerar el desempeño de ambos
.submit()
y.map()
subclases. La cantidad de aceleración puede ser hasta un 20% en el caso en que el número de tareas discretizadas iguale el número de trabajadores de la piscina.
Código .map () mejorado
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from itertools import repeat, chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurence of number in range nmin to nmax and return
the found occurences in a list.'''
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
#print("n def _findmatch 0:<10 1:<10 2:<3 found 3:8 in 4:.4fsec".
# format(nmin, nmax, number, len(match),end))
return match
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop, repeat(number))
end = time() - start
print('n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
print("found in 0:.4fsec".format(end))
return list(chain.from_iterable(futures)) #Return an unordered result list
#return sorted(chain.from_iterable(futures)) #Return an ordered result list
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
found = _concurrent(nmax, number, workers, num_of_chunks)
end = time() - start
print('n main')
print('nmax=, workers=, num_of_chunks='.format(
nmax, workers, num_of_chunks))
#print('found = ', found)
print("found 0 in 1:.4fsec".format(len(found),end))
Código .submit () mejorado.
Este código es el mismo que el código .map, excepto que reemplaza el método _concurrent con lo siguiente:
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(num_of_chunks):
cstart = chunksize * i
cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
end = time() - start
print('n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
print("found in 0:.4fsec".format(end))
return list(chain.from_iterable(f.result() for f in cf.as_completed(
futures))) #Return an unordered list
#return list(chain.from_iterable(f.result() for f in cf.as_completed(
# futures))) #Return an ordered list
================================================ =====================
Estás comparando manzanas con naranjas aquí. Cuando usas map
tu produces todo el 1E8
números y transferirlos a procesos de trabajo. Esto lleva mucho tiempo en comparación con la ejecución real. Cuando usas submit
simplemente crea 6 conjuntos de parámetros que se transfieren.
Si cambias map
para operar con el mismo principio obtendrás números cercanos entre sí:
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found 0 in 1:.4fsec".format(len(match),end))
return match
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunk * i for i in range(workers))
cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent(nmax, number):')
print("found 0 in 1:.4fsec".format(foundsize, end))
return found
Puede mejorar el rendimiento del envío utilizando as_completed
correctamente. Para un iterable de futuros dado, devolverá un iterador que yield
futuros en el orden en que se completan.
También puede omitir la copia de los datos a otro array y use itertools.chain.from_iterable
para combinar los resultados de futuros a iterables individuales:
import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found 0 in 1:.4fsec".format(len(match),end))
return match
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
return chain.from_iterable(f.result() for f in cf.as_completed(futures))
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_map(nmax, number, workers)
end = time() - start
print('n main')
print('workers = ', workers)
print("found 0 in 1:.4fsec".format(sum(1 for x in a),end))
Comentarios y puntuaciones
Nos puedes añadir valor a nuestro contenido participando con tu experiencia en las críticas.