este problema se puede abordar de diversas formas, pero nosotros te compartimos la que para nosotros es la solución más completa.
Solución:
Considera que tienes un Dataframe
como abajo
+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy| aaa| 20|
|Berta| bbb| 30|
| Joe| ccc| 40|
+-----+------+---+
Para hacer un bucle en tu Marco de datos y extraer los elementos de la Marco de datospuede elegir uno de los siguientes enfoques.
Enfoque 1 – Bucle usando foreach
Bucle de un marco de datos directamente usando foreach
bucle no es posible. Para hacer esto, primero debe definir el esquema del marco de datos usando case class
y luego debe especificar este esquema en el marco de datos.
import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=$t.name,sector=$t.sector,age=$t.age"))
Por favor, vea el resultado a continuación:
Enfoque 2 – Bucle usando rdd
Usar rdd.collect
encima de tu Marco de datos. los row
variable contendrá cada fila de Marco de datos de rdd
tipo de fila. Para obtener cada elemento de una fila, use row.mkString(",")
que contendrá el valor de cada fila en valores separados por comas. Usando split
función (función incorporada) puede acceder a cada valor de columna de rdd
fila con índice.
for (row <- df.rdd.collect)
var name = row.mkString(",").split(",")(0)
var sector = row.mkString(",").split(",")(1)
var age = row.mkString(",").split(",")(2)
Tenga en cuenta que hay dos inconvenientes de este enfoque.
1. Si hay un ,
en el valor de la columna, los datos se dividirán incorrectamente en la columna adyacente.
2. rdd.collect
es un action
que devuelve todos los datos a la memoria del controlador, donde la memoria del controlador podría no ser tan grande para almacenar los datos, lo que terminaría con la falla de la aplicación.
Recomendaría usar Enfoque 1.
Enfoque 3: usar where y select
Puedes usar directamente where
y select
que realizará un bucle interno y encontrará los datos. Dado que no debería arrojar una excepción fuera del límite del índice, se usa una condición if
if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString
Enfoque 4: uso de tablas temporales
Puede registrar el marco de datos como tentable, que se almacenará en la memoria de Spark. Luego puede usar una consulta de selección como otra base de datos para consultar los datos y luego recopilarlos y guardarlos en una variable
df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
Puedes convertir Row
a Seq
con toSeq
. Una vez vuelto a Seq
puede iterar sobre él como de costumbre con foreach
, map
o lo que necesites
sqlDF.foreach row =>
row.toSeq.foreachcol => println(col)
Producción:
Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
Deberías usar mkString
en tu Row
:
sqlDF.foreach row =>
println(row.mkString(","))
Pero tenga en cuenta que esto se imprimirá dentro de las JVM de los ejecutores, por lo que normalmente no verá la salida (a menos que trabaje con master = local)
Finalizando este artículo puedes encontrar los informes de otros programadores, tú igualmente tienes la libertad de dejar el tuyo si lo crees conveniente.