Después de consultar expertos en la materia, programadores de diversas áreas y profesores dimos con la respuesta al dilema y la plasmamos en este post.
Solución:
Siempre que use Spark versión 2.1 o superior, puede aprovechar el hecho de que podemos usar valores de columna como argumentos al usar pyspark.sql.functions.expr()
:
- crear un maniquí string de repetir comas con una longitud igual a
diffDays
- dividir esto string en
','
para convertirlo en un array de tamañodiffDays
- Usar
pyspark.sql.functions.posexplode()
para explotar esto array junto con sus índices - finalmente usar
pyspark.sql.functions.date_add()
para sumar el valor del índice número de días albookingDt
Código:
import pyspark.sql.functions as f
diffDaysDF.withColumn("repeat", f.expr("split(repeat(',', diffDays), ',')"))
.select("*", f.posexplode("repeat").alias("txnDt", "val"))
.drop("repeat", "val", "diffDays")
.withColumn("txnDt", f.expr("date_add(bookingDt, txnDt)"))
.show()
#+----------+----------+-------+----------+
#| arrivalDt| bookingDt|vyge_id| txnDt|
#+----------+----------+-------+----------+
#|2018-01-05|2018-01-01| 1000|2018-01-01|
#|2018-01-05|2018-01-01| 1000|2018-01-02|
#|2018-01-05|2018-01-01| 1000|2018-01-03|
#|2018-01-05|2018-01-01| 1000|2018-01-04|
#|2018-01-05|2018-01-01| 1000|2018-01-05|
#+----------+----------+-------+----------+
Bueno, puedes hacer lo siguiente.
Cree un marco de datos solo con fechas:
dates_df
# con todos los días entre el primero bookingDt
y última arrivalDt
y luego unir esos df con entre condiciones:
df.join(dates_df,
on=col('dates_df.dates').between(col('df.bookindDt'), col('dt.arrivalDt'))
.select('df.*', 'dates_df.dates')
Podría funcionar incluso más rápido que la solución con explode
sin embargo, debe averiguar cuál es la fecha de inicio y finalización de este df.
10 años df tendrá solo 3650 registros, no muchos de los que preocuparse.
Para Chispa 2.4+ La secuencia se puede utilizar para crear una array que contiene todas las fechas entre bookingDt
y arrivalDt
. Este array luego se puede explotar.
from pyspark.sql import functions as F
df = df
.withColumn('bookingDt', F.col('bookingDt').cast('date'))
.withColumn('arrivalDt', F.col('arrivalDt').cast('date'))
df.withColumn('txnDt', F.explode(F.expr('sequence(bookingDt, arrivalDt, interval 1 day)')))
.show()
Producción:
+-------+----------+----------+----------+
|vyge_id| bookingDt| arrivalDt| txnDt|
+-------+----------+----------+----------+
| 1000|2018-01-01|2018-01-05|2018-01-01|
| 1000|2018-01-01|2018-01-05|2018-01-02|
| 1000|2018-01-01|2018-01-05|2018-01-03|
| 1000|2018-01-01|2018-01-05|2018-01-04|
| 1000|2018-01-01|2018-01-05|2018-01-05|
+-------+----------+----------+----------+
Comentarios y puntuaciones
Puedes corroborar nuestro análisis dejando un comentario y valorándolo te damos la bienvenida.