Solución:
Antes de continuar: esta operación es otra groupByKey
. Si bien tiene múltiples aplicaciones legítimas, es relativamente costoso, así que asegúrese de usarlo solo cuando sea necesario.
No es una solución exactamente concisa o eficiente, pero puede usar UserDefinedAggregateFunction
introducido en Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Uso de ejemplo:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
También puede crear un contenedor de Python como se muestra en Spark: ¿Cómo mapear Python con Scala o funciones definidas por el usuario de Java?
En la práctica, puede ser más rápido extraer RDD, groupByKey
, mkString
y reconstruir DataFrame.
Puede obtener un efecto similar combinando collect_list
función (Spark> = 1.6.0) con concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
Puedes probar la función collect_list
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
O puede registrar un UDF algo como
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
y puedes usar esta función en la consulta
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
Aquí hay una función que puede usar en PySpark:
import pyspark.sql.functions as F
def group_concat(col, distinct=False, sep=','):
if distinct:
collect = F.collect_set(col.cast(StringType()))
else:
collect = F.collect_list(col.cast(StringType()))
return F.concat_ws(sep, collect)
table.groupby('username').agg(F.group_concat('friends').alias('friends'))
En SQL:
select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username