Jairo, parte de nuestro staff, nos ha hecho el favor de escribir esta crónica porque controla a la perfección dicho tema.
Solución:
Descubrí la forma correcta de calcular un promedio móvil / móvil usando este stackoverflow:
Funciones de Spark Window: rango entre fechas
La idea básica es convertir su columna de marca de tiempo en segundos, y luego puede usar la función rangeBetween en la clase pyspark.sql.Window para incluir las filas correctas en su ventana.
Aquí está el ejemplo resuelto:
%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
df = df.withColumn('rolling_average', F.avg("dollars").over(w))
Esto da como resultado la columna exacta de promedios móviles que estaba buscando:
dollars timestampGMT rolling_average
17 2017-03-10 15:27:18.0 17.0
13 2017-03-15 12:27:18.0 15.0
25 2017-03-18 11:27:18.0 19.0
Agregaré una variación que personalmente encontré muy útil. Espero que alguien también lo encuentre útil:
Si desea agrupar, dentro de los respectivos grupos, calcule la media móvil:
Ejemplo del marco de datos:
from pyspark.sql.window import Window
from pyspark.sql import functions as func
df = spark.createDataFrame([("tshilidzi", 17.00, "2018-03-10T15:27:18+00:00"),
("tshilidzi", 13.00, "2018-03-11T12:27:18+00:00"),
("tshilidzi", 25.00, "2018-03-12T11:27:18+00:00"),
("thabo", 20.00, "2018-03-13T15:27:18+00:00"),
("thabo", 56.00, "2018-03-14T12:27:18+00:00"),
("thabo", 99.00, "2018-03-15T11:27:18+00:00"),
("tshilidzi", 156.00, "2019-03-22T11:27:18+00:00"),
("thabo", 122.00, "2018-03-31T11:27:18+00:00"),
("tshilidzi", 7000.00, "2019-04-15T11:27:18+00:00"),
("ash", 9999.00, "2018-04-16T11:27:18+00:00")
],
["name", "dollars", "timestampGMT"])
# we need this timestampGMT as seconds for our Window time frame
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df.show(10000, False)
Producción:
+---------+-------+---------------------+
|name |dollars|timestampGMT |
+---------+-------+---------------------+
|tshilidzi|17.0 |2018-03-10 17:27:18.0|
|tshilidzi|13.0 |2018-03-11 14:27:18.0|
|tshilidzi|25.0 |2018-03-12 13:27:18.0|
|thabo |20.0 |2018-03-13 17:27:18.0|
|thabo |56.0 |2018-03-14 14:27:18.0|
|thabo |99.0 |2018-03-15 13:27:18.0|
|tshilidzi|156.0 |2019-03-22 13:27:18.0|
|thabo |122.0 |2018-03-31 13:27:18.0|
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|
|ash |9999.0 |2018-04-16 13:27:18.0|
+---------+-------+---------------------+
Para calcular la media móvil en función de la name
y aún mantener todas las filas:
#create window by casting timestamp to long (number of seconds)
w = (Window()
.partitionBy(col("name"))
.orderBy(F.col("timestampGMT").cast('long'))
.rangeBetween(-days(7), 0))
df2 = df.withColumn('rolling_average', F.avg("dollars").over(w))
df2.show(100, False)
Producción:
+---------+-------+---------------------+------------------+
|name |dollars|timestampGMT |rolling_average |
+---------+-------+---------------------+------------------+
|ash |9999.0 |2018-04-16 13:27:18.0|9999.0 |
|tshilidzi|17.0 |2018-03-10 17:27:18.0|17.0 |
|tshilidzi|13.0 |2018-03-11 14:27:18.0|15.0 |
|tshilidzi|25.0 |2018-03-12 13:27:18.0|18.333333333333332|
|tshilidzi|156.0 |2019-03-22 13:27:18.0|156.0 |
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|7000.0 |
|thabo |20.0 |2018-03-13 17:27:18.0|20.0 |
|thabo |56.0 |2018-03-14 14:27:18.0|38.0 |
|thabo |99.0 |2018-03-15 13:27:18.0|58.333333333333336|
|thabo |122.0 |2018-03-31 13:27:18.0|122.0 |
+---------+-------+---------------------+------------------+
Vale la pena señalar que si no le importan las fechas exactas, pero desea tener disponible el promedio de los últimos 30 días, puede usar la función rowsBetween de la siguiente manera:
w = Window.orderBy('timestampGMT').rowsBetween(-7, 0)
df = eurPrices.withColumn('rolling_average', F.avg('dollars').over(w))
Dado que ordena por fechas, tomará las últimas 7 apariciones. Te ahorras todo el casting.