Solución:
En pySpark
podrías hacer algo como esto, usando countDistinct()
:
from pyspark.sql.functions import col, countDistinct
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))
Similarmente en Scala
:
import org.apache.spark.sql.functions.countDistinct
import org.apache.spark.sql.functions.col
df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*)
Si desea acelerar las cosas ante la posible pérdida de precisión, también puede usar approxCountDistinct()
.
Múltiples agregaciones serían bastante costosas de calcular. Sugiero que utilice métodos de aproximación en su lugar. En este caso, aproximando un recuento distinto:
val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3")
val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap
df.agg(exprs).show()
// +---------------------------+---------------------------+---------------------------+
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)|
// +---------------------------+---------------------------+---------------------------+
// | 2| 2| 3|
// +---------------------------+---------------------------+---------------------------+
los approx_count_distinct
el método se basa en HyperLogLog bajo el capó.
los HyperLogLog algoritmo y su variante HyperLogLog ++ (implementado en Spark) se basa en lo siguiente inteligente observación.
Si los números se distribuyen uniformemente en un rango, entonces el recuento de elementos distintos se puede aproximar a partir del mayor número de ceros a la izquierda en la representación binaria de los números.
Por ejemplo, si observamos un número cuyos dígitos en forma binaria son de la forma 0…(k times)…01…1
, entonces podemos estimar que hay en el orden de 2 ^ k elementos en el conjunto. Esta es una estimación muy burda, pero se puede refinar con gran precisión con un algoritmo de dibujo.
En el artículo original se puede encontrar una explicación detallada de la mecánica detrás de este algoritmo.
Nota: A partir de Spark 1.6, cuando Spark llama SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df
cada cláusula debe desencadenar una agregación separada para cada cláusula. Considerando que esto es diferente a SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df
donde agregamos una vez. Por lo tanto, el rendimiento no será comparable al usar un count(distinct(_))
y approxCountDistinct
(o approx_count_distinct
).
Es uno de los cambios de comportamiento desde Spark 1.6 :
Con el planificador de consultas mejorado para consultas que tienen distintas agregaciones (SPARK-9241), el plan de una consulta que tiene una única agregación distinta se ha cambiado a una versión más sólida. Para volver al plan generado por el planificador de Spark 1.5, configure spark.sql.specializeSingleDistinctAggPlanning en true. (CHISPA-12077)
Referencia: Algoritmos aproximados en Apache Spark: HyperLogLog y Quantiles.
si solo desea contar para una columna en particular, lo siguiente podría ayudar. Aunque es una respuesta tardía. podría ayudar a alguien. (pyspark 2.2.0
probado)
from pyspark.sql.functions import col, countDistinct
df.agg(countDistinct(col("colName")).alias("count")).show()