Solución:
Puede paralelizar esto con Dask.dataframe.
>>> dmaster = dd.from_pandas(master, npartitions=4)
>>> dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave), name="my_value"))
>>> dmaster.compute()
original my_value
0 this is a nice sentence 2
1 this is another one 3
2 stackoverflow is nice 1
Además, debe pensar en las compensaciones entre el uso de subprocesos y procesos aquí. Es casi seguro que su coincidencia de cadenas difusas no libera el GIL, por lo que no obtendrá ningún beneficio del uso de múltiples subprocesos. Sin embargo, el uso de procesos hará que los datos se serialicen y se muevan por su máquina, lo que podría ralentizar un poco las cosas.
Puede experimentar entre el uso de subprocesos y procesos o un sistema distribuido administrando el get=
argumento de palabra clave para el compute()
método.
import dask.multiprocessing
import dask.threaded
>>> dmaster.compute(get=dask.threaded.get) # this is default for dask.dataframe
>>> dmaster.compute(get=dask.multiprocessing.get) # try processes instead
Estoy trabajando en algo similar y quería proporcionar una solución de trabajo más completa para cualquier otra persona con la que se pueda encontrar con esta pregunta. @MRocklin lamentablemente tiene algunos errores de sintaxis en los fragmentos de código proporcionados. No soy un experto en Dask, por lo que no puedo comentar sobre algunas consideraciones de rendimiento, pero esto debería cumplir su tarea tal como lo sugirió @MRocklin. Esto esta usando Dask versión 0.17.2 y Pandas versión 0.22.0:
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd
master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})
slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})
def fuzzy_score(str1, str2):
return fuzz.token_set_ratio(str1, str2)
def helper(orig_string, slave_df):
slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
#return my_value corresponding to the highest score
return slave_df.loc[slave_df.score.idxmax(),'my_value']
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
Luego, obtenga sus resultados (como en esta sesión de intérprete):
In [6]: dmaster.compute(get=dask.multiprocessing.get)
Out[6]:
original my_value
0 this is a nice sentence 3
1 this is another one 4
2 stackoverflow is nice 5
Estas respuestas se basan en una API anterior. Algún código más nuevo:
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
dmaster.compute(scheduler="processes")
Personalmente, me desharía de esa llamada de aplicación a fuzzy_score en la función auxiliar y simplemente realizaría la operación allí.
Puede modificar el planificador con estos consejos.