Saltar al contenido

Paralelizar aplicar después de pandas groupby

Solución:

Esto parece funcionar, aunque realmente debería estar integrado en pandas.

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

if __name__ == '__main__':
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
    print 'parallel version: '
    print applyParallel(df.groupby(df.index), tmpFunc)

    print 'regular version: '
    print df.groupby(df.index).apply(tmpFunc)

    print 'ideal version (does not work): '
    print df.groupby(df.index).applyParallel(tmpFunc)

La respuesta de Ivan es excelente, pero parece que se puede simplificar un poco, eliminando también la necesidad de depender de joblib:

from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

Por cierto: esto no puede reemplazar alguna groupby.apply (), pero cubrirá los casos típicos: por ejemplo, debería cubrir los casos 2 y 3 en la documentación, mientras que debería obtener el comportamiento del caso 1 dando el argumento axis=1 a la final pandas.concat() llama.

EDITAR: los documentos cambiaron; la versión anterior se puede encontrar aquí, en cualquier caso, estoy copiando los tres ejemplos a continuación.

case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels

case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together

case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together

Tengo un truco que uso para obtener la paralelización en Pandas. Divido mi marco de datos en fragmentos, pongo cada fragmento en el elemento de una lista y luego uso los bits paralelos de ipython para hacer una aplicación paralela en la lista de marcos de datos. Luego volví a armar la lista usando pandas concat función.

Sin embargo, esto no es de aplicación general. Me funciona porque la función que quiero aplicar a cada fragmento del marco de datos tarda aproximadamente un minuto. Y separar y juntar mis datos no lleva tanto tiempo. Así que esto es claramente una torpeza. Dicho esto, aquí hay un ejemplo. Estoy usando el cuaderno Ipython, así que verás. %%time magia en mi código:

## make some example data
import pandas as pd

np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
                   'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')

Para este ejemplo, voy a hacer ‘fragmentos’ basados ​​en el grupo anterior, pero no tiene que ser así como se fragmentan los datos. Aunque es un patrón bastante común.

dflist = []
for name, group in grouped:
    dflist.append(group)

configurar los bits paralelos

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True

escribir una función tonta para aplicar a nuestros datos

def myFunc(inDf):
    inDf['newCol'] = inDf.data ** 10
    return inDf

ahora ejecutemos el código en serie y luego en paralelo. serial primero:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s

ahora paralelo

%%time
parallel_list = lview.map(myFunc, dflist)

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s

luego solo se necesitan unos pocos ms para fusionarlos nuevamente en un marco de datos

%%time
combinedDf = pd.concat(parallel_list)
 CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms

Estoy ejecutando 6 motores IPython en mi MacBook, pero puede ver que reduce el tiempo de ejecución a 2 segundos desde 14 segundos.

Para simulaciones estocásticas de ejecución realmente prolongada, puedo usar el backend de AWS activando un clúster con StarCluster. La mayor parte del tiempo, sin embargo, paralelo solo en 8 CPU en mi MBP.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *