Solución:
Considere usar el struct
función para agrupar las columnas juntas antes de recopilarlas como una lista:
import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.groupBy($"name")
.agg(collect_list(struct($"food", $"price")).as("foods"))
.show(false)
Salidas:
+----+---------------------------------------------+
|name|foods |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]] |
+----+---------------------------------------------+
La forma más sencilla de hacer esto como DataFrame
es recopilar primero dos listas y luego usar una UDF
para zip
las dos listas juntas. Algo como:
import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._
val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val df2 = df.groupBy("name").agg(
collect_list(col("food")) as "food",
collect_list(col("price")) as "price"
).withColumn("food", zipper(col("food"), col("price"))).drop("price")
df2.show(false)
# +----+---------------------------------------------+
# |name|food |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]] |
# +----+---------------------------------------------+
Quizás una mejor manera que la zip
función (ya que UDF y UDAF son muy malos para el rendimiento) es envolver las dos columnas en Struct
.
Esto probablemente también funcionaría:
df.select('name, struct('food, 'price).as("tuple"))
.groupBy('name)
.agg(collect_list('tuple).as("tuples"))
¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)