Solución:
Esto parece funcionar, aunque realmente debería estar integrado en pandas.
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
La respuesta de Ivan es excelente, pero parece que se puede simplificar un poco, eliminando también la necesidad de depender de joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
Por cierto: esto no puede reemplazar alguna groupby.apply (), pero cubrirá los casos típicos: por ejemplo, debería cubrir los casos 2 y 3 en la documentación, mientras que debería obtener el comportamiento del caso 1 dando el argumento axis=1
a la final pandas.concat()
llama.
EDITAR: los documentos cambiaron; la versión anterior se puede encontrar aquí, en cualquier caso, estoy copiando los tres ejemplos a continuación.
case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels
case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together
case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
Tengo un truco que uso para obtener la paralelización en Pandas. Divido mi marco de datos en fragmentos, pongo cada fragmento en el elemento de una lista y luego uso los bits paralelos de ipython para hacer una aplicación paralela en la lista de marcos de datos. Luego volví a armar la lista usando pandas concat
función.
Sin embargo, esto no es de aplicación general. Me funciona porque la función que quiero aplicar a cada fragmento del marco de datos tarda aproximadamente un minuto. Y separar y juntar mis datos no lleva tanto tiempo. Así que esto es claramente una torpeza. Dicho esto, aquí hay un ejemplo. Estoy usando el cuaderno Ipython, así que verás. %%time
magia en mi código:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
Para este ejemplo, voy a hacer ‘fragmentos’ basados en el grupo anterior, pero no tiene que ser así como se fragmentan los datos. Aunque es un patrón bastante común.
dflist = []
for name, group in grouped:
dflist.append(group)
configurar los bits paralelos
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
escribir una función tonta para aplicar a nuestros datos
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
ahora ejecutemos el código en serie y luego en paralelo. serial primero:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
ahora paralelo
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
luego solo se necesitan unos pocos ms para fusionarlos nuevamente en un marco de datos
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
Estoy ejecutando 6 motores IPython en mi MacBook, pero puede ver que reduce el tiempo de ejecución a 2 segundos desde 14 segundos.
Para simulaciones estocásticas de ejecución realmente prolongada, puedo usar el backend de AWS activando un clúster con StarCluster. La mayor parte del tiempo, sin embargo, paralelo solo en 8 CPU en mi MBP.