Después de mirar en diferentes repositorios y foros de internet al terminar nos encontramos con la resolución que te enseñaremos más adelante.
Solución:
Parece que su código está vinculado a E / S. Esto significa que el multiprocesamiento no ayudará: si pasa el 90% de su tiempo leyendo desde el disco, tener 7 procesos adicionales esperando en la siguiente lectura no ayudará en nada.
Y, mientras usa un módulo de lectura CSV (si el stdlib’s csv
o algo como NumPy o Pandas) puede ser una buena idea por simplicidad, es poco probable que marque una gran diferencia en el rendimiento.
Aun así, vale la pena comprobar que realmente están Límite de E / S, en lugar de solo adivinar. Ejecute su programa y vea si el uso de su CPU está cerca del 0% o cerca del 100% o un núcleo. Haz lo que Amadan sugirió en un comentario y ejecuta tu programa con solo pass
para el procesamiento y vea si eso corta el 5% del tiempo o el 70%. Incluso puede intentar comparar con un bucle sobre os.open
y os.read(1024*1024)
o algo y ver si es más rápido.
Dado que usa Python 2.x, Python se basa en la biblioteca stdio de C para adivinar cuánto almacenar en búfer a la vez, por lo que podría valer la pena forzarlo a almacenar más en búfer. La forma más sencilla de hacerlo es usar readlines(bufsize)
para algunos grandes bufsize
. (Puede probar con diferentes números y medirlos para ver dónde está el pico. En mi experiencia, por lo general, cualquier cosa entre 64K y 8MB es aproximadamente lo mismo, pero dependiendo de su sistema, eso puede ser diferente, especialmente si está, por ejemplo, leyendo fuera de un sistema de archivos de red con un gran rendimiento pero una latencia horrible que inunda el rendimiento frente a la latencia de la unidad física real y el almacenamiento en caché que hace el sistema operativo).
Así por ejemplo:
bufsize = 65536
with open(path) as infile:
while True:
lines = infile.readlines(bufsize)
if not lines:
break
for line in lines:
process(line)
Mientras tanto, suponiendo que esté en un sistema de 64 bits, puede intentar usar mmap
en lugar de leer el archivo en primer lugar. Esto ciertamente no es garantizado para ser mejor, pero mayo ser mejor, dependiendo de su sistema. Por ejemplo:
with open(path) as infile:
m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)
Una pitón mmap
es una especie de objeto extraño: actúa como un str
y como un file
al mismo tiempo, por lo que puede, por ejemplo, iterar manualmente el escaneo de nuevas líneas, o puede llamar readline
sobre él como si fuera un archivo. Ambos requerirán más procesamiento de Python que iterar el archivo como líneas o hacer por lotes readlines
(porque un bucle que estaría en C ahora está en Python puro … aunque tal vez puedas solucionarlo con re
, ¿o con una simple extensión de Cython?) … pero la ventaja de E / S de que el sistema operativo sepa lo que está haciendo con el mapeo puede hundir la desventaja de la CPU.
Desafortunadamente, Python no expone el madvise
llamada que usaría para modificar las cosas en un intento de optimizar esto en C (por ejemplo, establecer explícitamente MADV_SEQUENTIAL
en lugar de hacer que el kernel adivine o forzar páginas enormes transparentes), pero en realidad ctypes
la función fuera de libc
.
Sé que esta pregunta es antigua; pero quería hacer algo similar, creé un marco simple que te ayuda a leer y procesar un archivo grande en paralelo. Dejando lo que intenté como respuesta.
Este es el código, doy un ejemplo al final.
def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
"""
function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned
Params :
fname : path to the file to be chunked
size : size of each chink is ~> this
skiplines : number of lines in the begining to skip, -1 means don't skip any lines
Returns :
start and end position of chunks in Bytes
"""
chunks = []
fileEnd = os.path.getsize(fname)
with open(fname, "rb") as f:
if(skiplines > 0):
for i in range(skiplines):
f.readline()
chunkEnd = f.tell()
count = 0
while True:
chunkStart = chunkEnd
f.seek(f.tell() + size, os.SEEK_SET)
f.readline() # make this chunk line aligned
chunkEnd = f.tell()
chunks.append((chunkStart, chunkEnd - chunkStart, fname))
count+=1
if chunkEnd > fileEnd:
break
return chunks
def parallel_apply_line_by_line_chunk(chunk_data):
"""
function to apply a function to each line in a chunk
Params :
chunk_data : the data for this chunk
Returns :
list of the non-None results for this chunk
"""
chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
func_args = chunk_data[4:]
t1 = time.time()
chunk_res = []
with open(file_path, "rb") as f:
f.seek(chunk_start)
cont = f.read(chunk_size).decode(encoding='utf-8')
lines = cont.splitlines()
for i,line in enumerate(lines):
ret = func_apply(line, *func_args)
if(ret != None):
chunk_res.append(ret)
return chunk_res
def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
"""
function to apply a supplied function line by line in parallel
Params :
input_file_path : path to input file
chunk_size_factor : size of 1 chunk in MB
num_procs : number of parallel processes to spawn, max used is num of available cores - 1
skiplines : number of top lines to skip while processing
func_apply : a function which expects a line and outputs None for lines we don't want processed
func_args : arguments to function func_apply
fout : do we want to output the processed lines to a file
Returns :
list of the non-None results obtained be processing each line
"""
num_parallel = min(num_procs, psutil.cpu_count()) - 1
jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)
jobs = [list(x) + [func_apply] + func_args for x in jobs]
print("Starting the parallel pool for jobs ".format(len(jobs)))
lines_counter = 0
pool = mp.Pool(num_parallel, maxtasksperchild=1000) # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering
outputs = []
for i in range(0, len(jobs), num_parallel):
print("Chunk start = ", i)
t1 = time.time()
chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])
for i, subl in enumerate(chunk_outputs):
for x in subl:
if(fout != None):
print(x, file=fout)
else:
outputs.append(x)
lines_counter += 1
del(chunk_outputs)
gc.collect()
print("All Done in time ", time.time() - t1)
print("Total lines we have = ".format(lines_counter))
pool.close()
pool.terminate()
return outputs
Digamos, por ejemplo, tengo un archivo en el que quiero contar el número de palabras en cada línea, entonces el procesamiento de cada línea se vería así
def count_words_line(line):
return len(line.strip().split())
y luego llame a la función como:
parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)
Al usar esto, obtengo una aceleración de ~ 8 veces en comparación con la lectura vanilla línea por línea en un archivo de muestra de tamaño ~ 20 GB en el que hago un procesamiento moderadamente complicado en cada línea.
Agradecemos que quieras añadir valor a nuestro contenido aportando tu veteranía en las aclaraciones.