Saltar al contenido

Cómo encontrar la mediana y los cuantiles usando Spark

Basta ya de buscar por todo internet porque llegaste al espacio correcto, contamos con la solución que buscas pero sin problema.

Solución:

Trabajo en proceso

SPARK-30569 – Agregar funciones DSL invocando percentile_approx

Spark 2.0+:

Puedes usar approxQuantile método que implementa el algoritmo de Greenwald-Khanna:

Pitón:

df.approxQuantile("x", [0.5], 0.25)

Scala:

df.stat.approxQuantile("x", Array(0.5), 0.25)

donde el último parámetro es un error relativo. Cuanto menor sea el número, más precisos serán los resultados y el cálculo será más caro.

Desde Spark 2.2 (SPARK-14352) admite la estimación en varias columnas:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

y

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

Los métodos subyacentes también se pueden usar en la agregación de SQL (tanto global como a tientas) usando approx_percentile función:

> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
 [10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
 10.0

Chispa <2.0

Pitón

Como mencioné en los comentarios, lo más probable es que no valga la pena tanto alboroto. Si los datos son relativamente pequeños como en su caso, simplemente recopile y calcule la mediana localmente:

import numpy as np

np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))

%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes

Tarda alrededor de 0,01 segundos en mi computadora de pocos años y alrededor de 5,5 MB de memoria.

Si los datos son mucho más grandes, la clasificación será un factor limitante, por lo que en lugar de obtener un valor exacto, probablemente sea mejor muestrear, recopilar y calcular localmente. Pero si realmente quieres usar Spark, algo como esto debería funcionar (si no arruiné nada):

from numpy import floor
import time

def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd.
        sortBy(lambda x: x).
        zipWithIndex().
        map(lambda (x, i): (i, x)).
        cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(floor(h)) + np.array([0L, 1L]))

    return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

Y algunas pruebas:

np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)

Finalmente definamos la mediana:

from functools import partial
median = partial(quantile, p=0.5)

Hasta ahora todo va bien, pero se necesitan 4,66 s en modo local sin ninguna comunicación de red. Probablemente haya una forma de mejorar esto, pero ¿por qué molestarse?

Independiente del idioma (Colmena UDAF):

Si utiliza HiveContext también puede utilizar UDAF de Hive. Con valores integrales:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")

sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

Con valores continuos:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

En percentile_approx puede pasar un argumento adicional que determina una cantidad de registros a utilizar.

Agregar una solución si solo desea un método RDD y no desea pasar a DF. Este fragmento puede obtener un percentil para un RDD del doble.

Si ingresa el percentil como 50, debe obtener la mediana requerida. Avíseme si hay algún caso de esquina que no se haya tenido en cuenta.

/**
  * Gets the nth percentile entry for an RDD of doubles
  *
  * @param inputScore : Input scores consisting of a RDD of doubles
  * @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
  *                     It prefers the higher value when the desired quantile lies between two data points
  * @return : The number best representing the percentile in the Rdd of double
  */    
  def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = 
    val numEntries = inputScore.count().toDouble
    val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt


    inputScore
      .sortBy  case (score) => score 
      .zipWithIndex()
      .filter  case (score, index) => index == retrievedEntry 
      .map  case (score, index) => score 
      .collect()(0)
  

Aquí está el método que utilicé usando funciones de ventana (con pyspark 2.2.0).

from pyspark.sql import DataFrame

class median():
    """ Create median class with over method to pass partition """
    def __init__(self, df, col, name):
        assert col
        self.column=col
        self.df = df
        self.name = name

    def over(self, window):
        from pyspark.sql.functions import percent_rank, pow, first

        first_window = window.orderBy(self.column)                                  # first, order by column we want to compute the median for
        df = self.df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
        second_window = window.orderBy(pow(df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
        return df.withColumn(self.name, first(self.column).over(second_window))     # the first row of the window corresponds to median

def addMedian(self, col, median_name):
    """ Method to be added to spark native DataFrame class """
    return median(self, col, median_name)

# Add method to DataFrame class
DataFrame.addMedian = addMedian

Luego llame al método addMedian para calcular la mediana de col2:

from pyspark.sql import Window

median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)

Finalmente, puede agrupar si es necesario.

df.groupby("col1", "median")

Aquí tienes las comentarios y puntuaciones

Si posees algún titubeo y capacidad de aumentar nuestro ensayo puedes escribir una reseña y con deseo lo analizaremos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *