Haz todo lo posible por entender el código de forma correcta previamente a utilizarlo a tu trabajo y si ttienes algo que aportar puedes dejarlo en la sección de comentarios.
Solución:
Actualizar – a partir de Chispa 1.6simplemente puede usar la fuente de datos csv integrada:
spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")
También puede usar varias opciones para controlar el análisis de CSV, por ejemplo:
val df = spark.read.option("header", "false").csv("file.txt")
Para la versión Spark < 1.6: La forma más fácil es usar spark-csv: inclúyalo en sus dependencias y siga el LÉAME, le permite configurar un delimitador personalizado (;
), puede leer encabezados CSV (si los tiene) y puede inferir el esquema tipos (con el costo de un escaneo extra de los datos).
Alternativamente, si conoce el esquema, puede crear una clase de caso que lo represente y mapear sus elementos RDD en instancias de esta clase antes de transformarse en un DataFrame, por ejemplo:
case class Record(id: Int, name: String)
val myFile1 = myFile.map(x=>x.split(";")).map
case Array(id, name) => Record(id.toInt, name)
myFile1.toDF() // DataFrame will have columns "id" and "name"
He dado diferentes formas de crear DataFrame desde un archivo de texto.
val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)
archivo de texto sin procesar
val file = sc.textFile("C:\vikas\spark\Interview\text.txt")
val fileToDf = file.map(_.split(",")).mapcase Array(a,b,c) =>
(a,b.toInt,c).toDF("name","age","city")
fileToDf.foreach(println(_))
sesión de chispa sin esquema
import org.apache.spark.sql.SparkSession
val sparkSess =
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()
val df = sparkSess.read.option("header",
"false").csv("C:\vikas\spark\Interview\text.txt")
df.show()
sesión de chispa con esquema
import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header",
"false").schema(schema).csv("C:\vikas\spark\Interview\text.txt")
dfWithSchema.show()
usando el contexto sql
import org.apache.spark.sql.SQLContext
val fileRdd =
sc.textFile("C:\vikas\spark\Interview\text.txt").map(_.split(",")).mapx
=> org.apache.spark.sql.Row(x:_*)
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()
Si desea utilizar el toDF
método, tienes que convertir tu RDD
de Array[String]
en un RDD
de una clase de caso. Por ejemplo, tienes que hacer:
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()