Saltar al contenido

Cómo crear un DataFrame a partir de un archivo de texto en Spark

Haz todo lo posible por entender el código de forma correcta previamente a utilizarlo a tu trabajo y si ttienes algo que aportar puedes dejarlo en la sección de comentarios.

Solución:

Actualizar – a partir de Chispa 1.6simplemente puede usar la fuente de datos csv integrada:

spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")

También puede usar varias opciones para controlar el análisis de CSV, por ejemplo:

val df = spark.read.option("header", "false").csv("file.txt")

Para la versión Spark < 1.6: La forma más fácil es usar spark-csv: inclúyalo en sus dependencias y siga el LÉAME, le permite configurar un delimitador personalizado (;), puede leer encabezados CSV (si los tiene) y puede inferir el esquema tipos (con el costo de un escaneo extra de los datos).

Alternativamente, si conoce el esquema, puede crear una clase de caso que lo represente y mapear sus elementos RDD en instancias de esta clase antes de transformarse en un DataFrame, por ejemplo:

case class Record(id: Int, name: String)

val myFile1 = myFile.map(x=>x.split(";")).map 
  case Array(id, name) => Record(id.toInt, name)
 

myFile1.toDF() // DataFrame will have columns "id" and "name"

He dado diferentes formas de crear DataFrame desde un archivo de texto.

val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)

archivo de texto sin procesar

val file = sc.textFile("C:\vikas\spark\Interview\text.txt")
val fileToDf = file.map(_.split(",")).mapcase Array(a,b,c) => 
(a,b.toInt,c).toDF("name","age","city")
fileToDf.foreach(println(_))

sesión de chispa sin esquema

import org.apache.spark.sql.SparkSession
val sparkSess = 
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()

val df = sparkSess.read.option("header", 
"false").csv("C:\vikas\spark\Interview\text.txt")
df.show()

sesión de chispa con esquema

import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, 
StringType, nullable=true))
val schema = StructType(fields)

val dfWithSchema = sparkSess.read.option("header", 
"false").schema(schema).csv("C:\vikas\spark\Interview\text.txt")
dfWithSchema.show()

usando el contexto sql

import org.apache.spark.sql.SQLContext

val fileRdd = 
sc.textFile("C:\vikas\spark\Interview\text.txt").map(_.split(",")).mapx 
=> org.apache.spark.sql.Row(x:_*)
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()

Si desea utilizar el toDF método, tienes que convertir tu RDD de Array[String] en un RDD de una clase de caso. Por ejemplo, tienes que hacer:

case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()

valoraciones y reseñas

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *