Saltar al contenido

python concurrent.futures.ProcessPoolExecutor: Rendimiento de .submit () vs .map ()

Recabamos en el mundo online para así traerte la respuesta para tu dilema, si continúas con alguna pregunta puedes dejar la pregunta y te contestaremos con gusto.

Visión general:

Hay 2 partes en mi respuesta:

  • La parte 1 muestra cómo ganar más velocidad con @ niemmi’s ProcessPoolExecutor.map() solución.
  • La parte 2 muestra cuando ProcessPoolExecutorsubclases de .submit() y .map() producir tiempos de cálculo no equivalentes.

================================================ =====================

Parte 1: Más aceleración para ProcessPoolExecutor.map ()

Fondo:
Esta sección se basa en @ niemmi’s .map() solución, que en sí misma es excelente. Mientras investigaba un poco sobre su esquema de discretización para comprender mejor cómo interactúa con el argumento de .map () fragmentaciones, encontré esta interesante solución.

Considero la definición de @ niemmi de chunk = nmax // workers para ser una definición de chunksize, es decir, un tamaño más pequeño del rango de números reales (tarea dada) que debe abordar cada trabajador en el grupo de trabajadores. Ahora, esta definición se basa en el supuesto de que si una computadora tiene x número de trabajadores, dividir la tarea equitativamente entre cada trabajador dará como resultado un uso óptimo de cada trabajador y, por lo tanto, la tarea total se completará más rápido. Por lo tanto, el número de fragmentos para dividir una tarea determinada siempre debe ser igual al número de trabajadores del grupo. Sin embargo, ¿es correcta esta suposición?

Proposición: Aquí, propongo que la suposición anterior no siempre conduce al tiempo de cálculo más rápido cuando se usa con ProcessPoolExecutor.map(). Bastante, discretizar una tarea en una cantidad mayor que la cantidad de trabajadores de la piscina puede conducir a una aceleración, es decir, a una finalización más rápida de una tarea determinada.

Experimentar: He modificado el código de @ niemmi para permitir que el número de tareas discretizadas supere el número de trabajadores del grupo. Este código se proporciona a continuación y se utiliza para determinar el número de veces que aparece el número 5 en el rango de números de 0 a 1E8. He ejecutado este código usando 1, 2, 4 y 6 trabajadores de grupo y para varias proporciones de número de tareas discretizadas frente al número de trabajadores de grupo. Para cada escenario, se realizaron 3 corridas y se tabularon los tiempos de cálculo. “Acelerar“se define aquí como el tiempo de cálculo promedio utilizando el mismo número de fragmentos y trabajadores del grupo durante el tiempo de cálculo promedio cuando el número de tareas discretizadas es mayor que el número de trabajadores del grupo.

Recomendaciones:

nchunk sobre nworkers

  1. La figura de la izquierda muestra el tiempo de cálculo que tomaron todos los escenarios mencionados en la sección del experimento. Demuestra que el calcular el tiempo empleado por número de trozos / número de trabajadores = 1 es siempre mayor que el tiempo de cálculo que toma número de trozos> número de trabajadores. Es decir, el primer caso es siempre menos eficiente que el segundo.

  2. La figura de la derecha muestra que se obtuvo una aceleración de 1,2 veces o más cuando el número de trozos / número de trabajadores alcanzar un valor umbral de 14 o más. Es interesante observar que la tendencia de aceleración también ocurrió cuando ProcessPoolExecutor.map() se ejecutó con 1 trabajador.

Conclusión: Al personalizar el número de tareas discretas que ProcessPoolExecutor.map () `debería usar para resolver una tarea determinada, es prudente asegurarse de que este número sea mayor que el número de trabajadores del grupo, ya que esta práctica acorta el tiempo de cálculo.

código concurrent.futures.ProcessPoolExecutor.map (). (solo partes revisadas)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('n within statement of def _concurrent(nmax, number):')
        print("found 0 in 1:.4fsec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('n main')
    print('nmax=, workers=, num_of_chunks='.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found 0 in 1:.4fsec".format(len(a),end))

================================================ =====================

Parte 2: El tiempo de cálculo total desde el uso de las subclases de ProcessPoolExecutor .submit () y .map () puede ser diferente cuando se devuelve una lista de resultados ordenada / ordenada.

Fondo: He enmendado tanto el .submit() y .map() códigos para permitir una comparación “manzana a manzana” de su tiempo de cómputo y la capacidad de visualizar el tiempo de cómputo del código principal, el tiempo de cómputo del método _concurrent llamado por el código principal para realizar las operaciones concurrentes, y el cómputo tiempo para cada tarea / trabajador discretizado llamado por el método _concurrent. Además, el método concurrente en estos códigos se estructuró para devolver una lista ordenada y desordenada del resultado directamente del objeto futuro de .submit() y el iterador de .map(). El código fuente se proporciona a continuación (Espero que te ayude.).

Experimentos Estos dos códigos recientemente mejorados se usaron para realizar el mismo experimento descrito en la Parte 1, salvo que solo se consideraron 6 trabajadores de grupo y el python incorporado list y sorted Se utilizaron métodos para devolver una lista ordenada y desordenada de los resultados a la sección principal del código, respectivamente.

Recomendaciones:.submit vs .map más lista vs ordenados

  1. A partir del resultado del método _concurrent, podemos ver los tiempos de cálculo del método _concurrent utilizado para crear todos los objetos Future de ProcessPoolExecutor.submit(), y para crear el iterador de ProcessPoolExecutor.map(), en función del número de tareas discretizadas sobre el número de trabajadores del grupo, son equivalentes. Este resultado simplemente significa que el ProcessPoolExecutor subclases .submit() y .map() son igualmente eficientes / rápidos.
  2. Comparando los tiempos de cálculo de main y su método _concurrent, podemos ver que main se ejecutó más tiempo que su método _concurrent. Esto es de esperar ya que su diferencia de tiempo refleja la cantidad de tiempos de cómputo del list y sorted métodos (y el de los otros métodos incluidos dentro de estos métodos). Claramente visto, el list El método tomó menos tiempo de cálculo para devolver una lista de resultados que el sorted método. Los tiempos de cálculo promedio del list El método para los códigos .submit () y .map () fue similar, en ~ 0.47seg. El tiempo de cálculo promedio del método ordenado para los códigos .submit () y .map () fue 1.23sec y 1.01sec, respectivamente. En otras palabras, el list El método se realizó 2,62 veces y 2,15 veces más rápido que sorted método para los códigos .submit () y .map (), respectivamente.
  3. No está claro por qué sorted método generó una lista ordenada de
    .map() más rápido que desde .submit(), ya que el número de tareas discretizadas aumentó más que el número de trabajadores del grupo, salvo cuando el número de tareas discretizadas fue igual al número de trabajadores del grupo. Dicho esto, estos hallazgos muestran que la decisión de utilizar el sistema igualmente rápido .submit() o .map() las subclases se pueden gravar con el método ordenado. Por ejemplo, si la intención es generar una lista ordenada en el menor tiempo posible, se debe preferir el uso de ProcessPoolExecutor.map () sobre ProcessPoolExecutor.submit() como .map() puede permitir el tiempo de cálculo total más corto.
  4. El esquema de discretización mencionado en la Parte 1 de mi respuesta se muestra aquí para acelerar el desempeño de ambos .submit() y .map() subclases. La cantidad de aceleración puede ser hasta un 20% en el caso en que el número de tareas discretizadas iguale el número de trabajadores de la piscina.

Código .map () mejorado

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("n def _findmatch 0:<10 1:<10 2:<3 found 3:8 in 4:.4fsec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in 0:.4fsec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('n main')
    print('nmax=, workers=, num_of_chunks='.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found 0 in 1:.4fsec".format(len(found),end))    

Código .submit () mejorado.

Este código es el mismo que el código .map, excepto que reemplaza el método _concurrent con lo siguiente:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in 0:.4fsec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

================================================ =====================

Estás comparando manzanas con naranjas aquí. Cuando usas map tu produces todo el 1E8 números y transferirlos a procesos de trabajo. Esto lleva mucho tiempo en comparación con la ejecución real. Cuando usas submit simplemente crea 6 conjuntos de parámetros que se transfieren.

Si cambias map para operar con el mismo principio obtendrás números cercanos entre sí:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found 0 in 1:.4fsec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found 0 in 1:.4fsec".format(foundsize, end))
    return found

Puede mejorar el rendimiento del envío utilizando as_completed correctamente. Para un iterable de futuros dado, devolverá un iterador que yield futuros en el orden en que se completan.

También puede omitir la copia de los datos a otro array y use itertools.chain.from_iterable para combinar los resultados de futuros a iterables individuales:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found 0 in 1:.4fsec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('n main')
    print('workers = ', workers)
    print("found 0 in 1:.4fsec".format(sum(1 for x in a),end))

Comentarios y puntuaciones

Nos puedes añadir valor a nuestro contenido participando con tu experiencia en las críticas.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)


Tags :

Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *