Basta ya de buscar por todo internet porque llegaste al espacio correcto, contamos con la solución que buscas pero sin problema.
Solución:
Trabajo en proceso
SPARK-30569 – Agregar funciones DSL invocando percentile_approx
Spark 2.0+:
Puedes usar approxQuantile
método que implementa el algoritmo de Greenwald-Khanna:
Pitón:
df.approxQuantile("x", [0.5], 0.25)
Scala:
df.stat.approxQuantile("x", Array(0.5), 0.25)
donde el último parámetro es un error relativo. Cuanto menor sea el número, más precisos serán los resultados y el cálculo será más caro.
Desde Spark 2.2 (SPARK-14352) admite la estimación en varias columnas:
df.approxQuantile(["x", "y", "z"], [0.5], 0.25)
y
df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)
Los métodos subyacentes también se pueden usar en la agregación de SQL (tanto global como a tientas) usando approx_percentile
función:
> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
[10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
10.0
Chispa <2.0
Pitón
Como mencioné en los comentarios, lo más probable es que no valga la pena tanto alboroto. Si los datos son relativamente pequeños como en su caso, simplemente recopile y calcule la mediana localmente:
import numpy as np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))
%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes
Tarda alrededor de 0,01 segundos en mi computadora de pocos años y alrededor de 5,5 MB de memoria.
Si los datos son mucho más grandes, la clasificación será un factor limitante, por lo que en lugar de obtener un valor exacto, probablemente sea mejor muestrear, recopilar y calcular localmente. Pero si realmente quieres usar Spark, algo como esto debería funcionar (si no arruiné nada):
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
Y algunas pruebas:
np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)
Finalmente definamos la mediana:
from functools import partial
median = partial(quantile, p=0.5)
Hasta ahora todo va bien, pero se necesitan 4,66 s en modo local sin ninguna comunicación de red. Probablemente haya una forma de mejorar esto, pero ¿por qué molestarse?
Independiente del idioma (Colmena UDAF):
Si utiliza HiveContext
también puede utilizar UDAF de Hive. Con valores integrales:
rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")
Con valores continuos:
sqlContext.sql("SELECT percentile(x, 0.5) FROM df")
En percentile_approx
puede pasar un argumento adicional que determina una cantidad de registros a utilizar.
Agregar una solución si solo desea un método RDD y no desea pasar a DF. Este fragmento puede obtener un percentil para un RDD del doble.
Si ingresa el percentil como 50, debe obtener la mediana requerida. Avíseme si hay algún caso de esquina que no se haya tenido en cuenta.
/**
* Gets the nth percentile entry for an RDD of doubles
*
* @param inputScore : Input scores consisting of a RDD of doubles
* @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
* It prefers the higher value when the desired quantile lies between two data points
* @return : The number best representing the percentile in the Rdd of double
*/
def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double =
val numEntries = inputScore.count().toDouble
val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt
inputScore
.sortBy case (score) => score
.zipWithIndex()
.filter case (score, index) => index == retrievedEntry
.map case (score, index) => score
.collect()(0)
Aquí está el método que utilicé usando funciones de ventana (con pyspark 2.2.0).
from pyspark.sql import DataFrame
class median():
""" Create median class with over method to pass partition """
def __init__(self, df, col, name):
assert col
self.column=col
self.df = df
self.name = name
def over(self, window):
from pyspark.sql.functions import percent_rank, pow, first
first_window = window.orderBy(self.column) # first, order by column we want to compute the median for
df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending
return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median
def addMedian(self, col, median_name):
""" Method to be added to spark native DataFrame class """
return median(self, col, median_name)
# Add method to DataFrame class
DataFrame.addMedian = addMedian
Luego llame al método addMedian para calcular la mediana de col2:
from pyspark.sql import Window
median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)
Finalmente, puede agrupar si es necesario.
df.groupby("col1", "median")
Aquí tienes las comentarios y puntuaciones
Si posees algún titubeo y capacidad de aumentar nuestro ensayo puedes escribir una reseña y con deseo lo analizaremos.