Saltar al contenido

¿Cómo hacer CopyMerge en Hadoop 3.0?

Nuestros investigadores estrellas agotaron sus reservas de café, buscando noche y día por la solución, hasta que Lucián encontró el resultado en Gitea por lo tanto en este momento la compartimos con nosotros.

Solución:

Se ha eliminado el método FileUtil#copyMerge. Vea los detalles del cambio principal:

https://issues.apache.org/jira/browse/HADOOP-12967

https://issues.apache.org/jira/browse/HADOOP-11392

Puedes usar getmerge

Uso: hadoop fs -getmerge [-nl]

Toma un directorio de origen y un archivo de destino como entrada y concatena archivos en src en el archivo local de destino. Opcionalmente, se puede configurar -nl para habilitar la adición de un carácter de nueva línea (LF) al final de cada archivo. -skip-empty-file se puede usar para evitar caracteres de nueva línea no deseados en el caso de archivos vacíos.

Ejemplos:

hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt

Código de salida: devuelve 0 en caso de éxito y distinto de cero en caso de error.

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

Como FileUtil.copyMerge() ha quedado obsoleto y eliminado de la API a partir de la versión 3, una solución simple consiste en volver a implementarlo nosotros mismos.

Aquí está el Java Implementación original de versiones anteriores.

Aquí hay un Scala volver a escribir:

import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.hadoop.io.IOUtils
import java.io.IOException

def copyMerge(
    srcFS: FileSystem, srcDir: Path,
    dstFS: FileSystem, dstFile: Path,
    deleteSource: Boolean, conf: Configuration
): Boolean = 

  if (dstFS.exists(dstFile))
    throw new IOException(s"Target $dstFile already exists")

  // Source path is expected to be a directory:
  if (srcFS.getFileStatus(srcDir).isDirectory()) 

    val outputFile = dstFS.create(dstFile)
    Try 
      srcFS
        .listStatus(srcDir)
        .sortBy(_.getPath.getName)
        .collect 
          case status if status.isFile() =>
            val inputFile = srcFS.open(status.getPath())
            Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
            inputFile.close()
        
    
    outputFile.close()

    if (deleteSource) srcFS.delete(srcDir, true) else true
  
  else false

Tuve la misma pregunta y tuve que volver a implementar copyMerge (sin embargo, en PySpark, pero usando las mismas llamadas API que copyMerge original).

No tengo idea de por qué no hay una funcionalidad equivalente en Hadoop 3. Tenemos que fusionar archivos de un directorio HDFS a un archivo HDFS con mucha frecuencia.

Aquí está la implementación en pySpark a la que me referí anteriormente https://github.com/Tagar/stuff/blob/master/copyMerge.py

No se te olvide dar visibilidad a este ensayo si te fue útil.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *