Saltar al contenido

Reemplazo de Spark SQL para la función agregada GROUP_CONCAT de MySQL

Solución:

Antes de continuar: esta operación es otra groupByKey. Si bien tiene múltiples aplicaciones legítimas, es relativamente costoso, así que asegúrese de usarlo solo cuando sea necesario.


No es una solución exactamente concisa o eficiente, pero puede usar UserDefinedAggregateFunction introducido en Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}

Uso de ejemplo:

val df = sc.parallelize(Seq(
  ("username1", "friend1"),
  ("username1", "friend2"),
  ("username2", "friend1"),
  ("username2", "friend3")
)).toDF("username", "friend")

df.groupBy($"username").agg(GroupConcat($"friend")).show

## +---------+---------------+
## | username|        friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+

También puede crear un contenedor de Python como se muestra en Spark: ¿Cómo mapear Python con Scala o funciones definidas por el usuario de Java?

En la práctica, puede ser más rápido extraer RDD, groupByKey, mkString y reconstruir DataFrame.

Puede obtener un efecto similar combinando collect_list función (Spark> = 1.6.0) con concat_ws:

import org.apache.spark.sql.functions.{collect_list, udf, lit}

df.groupBy($"username")
  .agg(concat_ws(",", collect_list($"friend")).alias("friends"))

Puedes probar la función collect_list

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A

O puede registrar un UDF algo como

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))

y puedes usar esta función en la consulta

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")

Aquí hay una función que puede usar en PySpark:

import pyspark.sql.functions as F

def group_concat(col, distinct=False, sep=','):
    if distinct:
        collect = F.collect_set(col.cast(StringType()))
    else:
        collect = F.collect_list(col.cast(StringType()))
    return F.concat_ws(sep, collect)


table.groupby('username').agg(F.group_concat('friends').alias('friends'))

En SQL:

select username, concat_ws(',', collect_list(friends)) as friends
from table
group by username
¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *