Saltar al contenido

Iterar filas y columnas en el dataframe de Spark

este problema se puede abordar de diversas formas, pero nosotros te compartimos la que para nosotros es la solución más completa.

Solución:

Considera que tienes un Dataframe como abajo

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+

Para hacer un bucle en tu Marco de datos y extraer los elementos de la Marco de datospuede elegir uno de los siguientes enfoques.

Enfoque 1 – Bucle usando foreach

Bucle de un marco de datos directamente usando foreach bucle no es posible. Para hacer esto, primero debe definir el esquema del marco de datos usando case class y luego debe especificar este esquema en el marco de datos.

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=$t.name,sector=$t.sector,age=$t.age"))

Por favor, vea el resultado a continuación:

ingrese la descripción de la imagen aquí

Enfoque 2 – Bucle usando rdd

Usar rdd.collect encima de tu Marco de datos. los row variable contendrá cada fila de Marco de datos de rdd tipo de fila. Para obtener cada elemento de una fila, use row.mkString(",") que contendrá el valor de cada fila en valores separados por comas. Usando split función (función incorporada) puede acceder a cada valor de columna de rdd fila con índice.

for (row <- df.rdd.collect)
   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   

Tenga en cuenta que hay dos inconvenientes de este enfoque.
1. Si hay un , en el valor de la columna, los datos se dividirán incorrectamente en la columna adyacente.

2. rdd.collect es un action que devuelve todos los datos a la memoria del controlador, donde la memoria del controlador podría no ser tan grande para almacenar los datos, lo que terminaría con la falla de la aplicación.

Recomendaría usar Enfoque 1.

Enfoque 3: usar where y select

Puedes usar directamente where y select que realizará un bucle interno y encontrará los datos. Dado que no debería arrojar una excepción fuera del límite del índice, se usa una condición if

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

Enfoque 4: uso de tablas temporales

Puede registrar el marco de datos como tentable, que se almacenará en la memoria de Spark. Luego puede usar una consulta de selección como otra base de datos para consultar los datos y luego recopilarlos y guardarlos en una variable

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")

Puedes convertir Row a Seq con toSeq. Una vez vuelto a Seq puede iterar sobre él como de costumbre con foreach, map o lo que necesites

    sqlDF.foreach  row => 
           row.toSeq.foreachcol => println(col) 
    

Producción:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40

Deberías usar mkString en tu Row:

sqlDF.foreach  row =>
  println(row.mkString(",")) 

Pero tenga en cuenta que esto se imprimirá dentro de las JVM de los ejecutores, por lo que normalmente no verá la salida (a menos que trabaje con master = local)

Finalizando este artículo puedes encontrar los informes de otros programadores, tú igualmente tienes la libertad de dejar el tuyo si lo crees conveniente.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *