Saltar al contenido

Paralelización de Pandas df.iterrows()

Anduvimos buscando por todo el mundo online para tener para ti la respuesta a tu problema, en caso de alguna difcultad deja la duda y te respondemos porque estamos para ayudarte.

Solución:

Como dijo @Khris en su comentario, debe dividir su marco de datos en algunos fragmentos grandes e iterar sobre cada fragmento en paralelo. Puede dividir arbitrariamente el marco de datos en fragmentos de tamaño aleatorio, pero tiene más sentido dividir el marco de datos en fragmentos de igual tamaño según la cantidad de procesos que planea usar. Afortunadamente, alguien más ya ha descubierto cómo hacer esa parte por nosotros:

# don't forget to import
import pandas as pd
import multiprocessing

# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()

# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)

# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

Esto crea una lista que contiene nuestro marco de datos en fragmentos. Ahora necesitamos pasarlo a nuestro grupo junto con una función que manipulará los datos.

def func(d):
   # let's create a function that squares every value in the dataframe
   return d * d

# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)

# apply our function to each chunk in the list
result = pool.map(func, chunks)

En este punto, result habrá una lista que contenga cada fragmento después de que haya sido manipulado. En este caso, todos los valores se han elevado al cuadrado. El problema ahora es que el marco de datos original no se ha modificado, por lo que debemos reemplazar todos sus valores existentes con los resultados de nuestro grupo.

for i in range(len(result)):
   # since result[i] is just a dataframe
   # we can reassign the original dataframe based on the index of each chunk
   df.ix[result[i].index] = result[i]

Ahora, mi función para manipular mi marco de datos está vectorizada y probablemente habría sido más rápida si simplemente la hubiera aplicado a la totalidad de mi marco de datos en lugar de dividirla en partes. Sin embargo, en su caso, su función iteraría sobre cada fila de cada fragmento y luego devolvería el fragmento. Esto le permite procesar num_process filas a la vez.

def func(d):
   for row in d.iterrow():
      idx = row[0]
      k = row[1]['Chromosome']
      start,end = row[1]['Bin'].split('-')

      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
   # return the chunk!
   return d

Luego, reasigna los valores en el marco de datos original y ha paralelizado con éxito este proceso.

¿Cuántos procesos debo usar?

Tu rendimiento óptimo va a depender de la respuesta a esta pregunta. Mientras que “TODOS LOS PROCESOS!!!!” es una respuesta, una mejor respuesta es mucho más matizada. Después de cierto punto, lanzar más procesos a un problema en realidad crea más gastos generales de lo que vale. Esto se conoce como la Ley de Amdahl. Nuevamente, somos afortunados de que otros ya hayan abordado esta pregunta por nosotros:

  1. Límite de proceso de grupo de multiprocesamiento de Python
  2. ¿Cuántos procesos debo ejecutar en paralelo?

Un buen valor predeterminado es usar multiprocessing.cpu_count()que es el comportamiento predeterminado de multiprocessing.Pool. De acuerdo con la documentación “Si los procesos son Ninguno, entonces se usa el número devuelto por cpu_count()”. Por eso me puse num_processes al principio a multiprocessing.cpu_count(). De esta manera, si cambia a una máquina más robusta, obtendrá los beneficios sin tener que cambiar el num_processes variable directamente.

Una forma más rápida (alrededor del 10% en mi caso):

Principales diferencias con la respuesta aceptada: uso pd.concat y np.array_split para dividir y unir el marco de datos.

import multiprocessing
import numpy as np


def parallelize_dataframe(df, func):
    num_cores = multiprocessing.cpu_count()-1  #leave one free to not freeze machine
    num_partitions = num_cores #number of partitions to split dataframe
    df_split = np.array_split(df, num_partitions)
    pool = multiprocessing.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

donde func es la función a la que desea aplicar df. Utilizar partial(func, arg=arg_val) por más de un argumento.

Considere usar dask.dataframe, como se muestra en este ejemplo para una pregunta similar: https://stackoverflow.com/a/53923034/4340584

import dask.dataframe as ddf
df_dask = ddf.from_pandas(df, npartitions=4)   # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')

Sección de Reseñas y Valoraciones

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *