Saltar al contenido

¿La forma más rápida de procesar un archivo grande?

Después de mirar en diferentes repositorios y foros de internet al terminar nos encontramos con la resolución que te enseñaremos más adelante.

Solución:

Parece que su código está vinculado a E / S. Esto significa que el multiprocesamiento no ayudará: si pasa el 90% de su tiempo leyendo desde el disco, tener 7 procesos adicionales esperando en la siguiente lectura no ayudará en nada.

Y, mientras usa un módulo de lectura CSV (si el stdlib’s csv o algo como NumPy o Pandas) puede ser una buena idea por simplicidad, es poco probable que marque una gran diferencia en el rendimiento.

Aun así, vale la pena comprobar que realmente están Límite de E / S, en lugar de solo adivinar. Ejecute su programa y vea si el uso de su CPU está cerca del 0% o cerca del 100% o un núcleo. Haz lo que Amadan sugirió en un comentario y ejecuta tu programa con solo pass para el procesamiento y vea si eso corta el 5% del tiempo o el 70%. Incluso puede intentar comparar con un bucle sobre os.open y os.read(1024*1024) o algo y ver si es más rápido.


Dado que usa Python 2.x, Python se basa en la biblioteca stdio de C para adivinar cuánto almacenar en búfer a la vez, por lo que podría valer la pena forzarlo a almacenar más en búfer. La forma más sencilla de hacerlo es usar readlines(bufsize) para algunos grandes bufsize. (Puede probar con diferentes números y medirlos para ver dónde está el pico. En mi experiencia, por lo general, cualquier cosa entre 64K y 8MB es aproximadamente lo mismo, pero dependiendo de su sistema, eso puede ser diferente, especialmente si está, por ejemplo, leyendo fuera de un sistema de archivos de red con un gran rendimiento pero una latencia horrible que inunda el rendimiento frente a la latencia de la unidad física real y el almacenamiento en caché que hace el sistema operativo).

Así por ejemplo:

bufsize = 65536
with open(path) as infile: 
    while True:
        lines = infile.readlines(bufsize)
        if not lines:
            break
        for line in lines:
            process(line)

Mientras tanto, suponiendo que esté en un sistema de 64 bits, puede intentar usar mmap en lugar de leer el archivo en primer lugar. Esto ciertamente no es garantizado para ser mejor, pero mayo ser mejor, dependiendo de su sistema. Por ejemplo:

with open(path) as infile:
    m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)

Una pitón mmap es una especie de objeto extraño: actúa como un str y como un file al mismo tiempo, por lo que puede, por ejemplo, iterar manualmente el escaneo de nuevas líneas, o puede llamar readline sobre él como si fuera un archivo. Ambos requerirán más procesamiento de Python que iterar el archivo como líneas o hacer por lotes readlines (porque un bucle que estaría en C ahora está en Python puro … aunque tal vez puedas solucionarlo con re, ¿o con una simple extensión de Cython?) … pero la ventaja de E / S de que el sistema operativo sepa lo que está haciendo con el mapeo puede hundir la desventaja de la CPU.

Desafortunadamente, Python no expone el madvise llamada que usaría para modificar las cosas en un intento de optimizar esto en C (por ejemplo, establecer explícitamente MADV_SEQUENTIAL en lugar de hacer que el kernel adivine o forzar páginas enormes transparentes), pero en realidad ctypes la función fuera de libc.

Sé que esta pregunta es antigua; pero quería hacer algo similar, creé un marco simple que te ayuda a leer y procesar un archivo grande en paralelo. Dejando lo que intenté como respuesta.

Este es el código, doy un ejemplo al final.

def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
    """
    function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned

    Params : 
        fname : path to the file to be chunked
        size : size of each chink is ~> this
        skiplines : number of lines in the begining to skip, -1 means don't skip any lines
    Returns : 
        start and end position of chunks in Bytes
    """
    chunks = []
    fileEnd = os.path.getsize(fname)
    with open(fname, "rb") as f:
        if(skiplines > 0):
            for i in range(skiplines):
                f.readline()

        chunkEnd = f.tell()
        count = 0
        while True:
            chunkStart = chunkEnd
            f.seek(f.tell() + size, os.SEEK_SET)
            f.readline()  # make this chunk line aligned
            chunkEnd = f.tell()
            chunks.append((chunkStart, chunkEnd - chunkStart, fname))
            count+=1

            if chunkEnd > fileEnd:
                break
    return chunks

def parallel_apply_line_by_line_chunk(chunk_data):
    """
    function to apply a function to each line in a chunk

    Params :
        chunk_data : the data for this chunk 
    Returns :
        list of the non-None results for this chunk
    """
    chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
    func_args = chunk_data[4:]

    t1 = time.time()
    chunk_res = []
    with open(file_path, "rb") as f:
        f.seek(chunk_start)
        cont = f.read(chunk_size).decode(encoding='utf-8')
        lines = cont.splitlines()

        for i,line in enumerate(lines):
            ret = func_apply(line, *func_args)
            if(ret != None):
                chunk_res.append(ret)
    return chunk_res

def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
    """
    function to apply a supplied function line by line in parallel

    Params :
        input_file_path : path to input file
        chunk_size_factor : size of 1 chunk in MB
        num_procs : number of parallel processes to spawn, max used is num of available cores - 1
        skiplines : number of top lines to skip while processing
        func_apply : a function which expects a line and outputs None for lines we don't want processed
        func_args : arguments to function func_apply
        fout : do we want to output the processed lines to a file
    Returns :
        list of the non-None results obtained be processing each line
    """
    num_parallel = min(num_procs, psutil.cpu_count()) - 1

    jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)

    jobs = [list(x) + [func_apply] + func_args for x in jobs]

    print("Starting the parallel pool for  jobs ".format(len(jobs)))

    lines_counter = 0

    pool = mp.Pool(num_parallel, maxtasksperchild=1000)  # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering

    outputs = []
    for i in range(0, len(jobs), num_parallel):
        print("Chunk start = ", i)
        t1 = time.time()
        chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])

        for i, subl in enumerate(chunk_outputs):
            for x in subl:
                if(fout != None):
                    print(x, file=fout)
                else:
                    outputs.append(x)
                lines_counter += 1
        del(chunk_outputs)
        gc.collect()
        print("All Done in time ", time.time() - t1)

    print("Total lines we have = ".format(lines_counter))

    pool.close()
    pool.terminate()
    return outputs

Digamos, por ejemplo, tengo un archivo en el que quiero contar el número de palabras en cada línea, entonces el procesamiento de cada línea se vería así

def count_words_line(line):
    return len(line.strip().split())

y luego llame a la función como:

parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)

Al usar esto, obtengo una aceleración de ~ 8 veces en comparación con la lectura vanilla línea por línea en un archivo de muestra de tamaño ~ 20 GB en el que hago un procesamiento moderadamente complicado en cada línea.

Agradecemos que quieras añadir valor a nuestro contenido aportando tu veteranía en las aclaraciones.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *