Esta es la solución más acertada que te podemos dar, sin embargo mírala pausadamente y analiza si es compatible a tu trabajo.
Solución:
¡Espero que esto ayude!
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
["Animal", "Enemy"])
a.show()
#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
#add 'sequential' index and join both dataframe to get the final result
a = a.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
final_df = a.join(b, a.row_idx == b.row_idx).
drop("row_idx")
final_df.show()
Aporte:
+------+-----+
|Animal|Enemy|
+------+-----+
| Dog| Cat|
| Cat| Dog|
| Mouse| Cat|
+------+-----+
La salida es:
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
| Cat| Dog| 4|
| Dog| Cat| 5|
| Mouse| Cat| 1|
+------+-----+------+
Como mencionó @Tw UxTLi51Nus, si puede ordenar el DataFrame, digamos, por Animal, sin que esto cambie sus resultados, puede hacer lo siguiente:
def add_labels(indx):
return rating[indx-1] # since row num begins from 1
labels_udf = udf(add_labels, IntegerType())
a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a.createOrReplaceTempView('a')
a = spark.sql('select row_number() over (order by "Animal") as num, * from a')
a.show()
+---+------+-----+
|num|Animal|Enemy|
+---+------+-----+
| 1| Dog| Cat|
| 2| Cat| Dog|
| 3| Mouse| Cat|
+---+------+-----+
new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---+------+-----+------+
|num|Animal|Enemy|Rating|
+---+------+-----+------+
| 1| Dog| Cat| 5|
| 2| Cat| Dog| 4|
| 3| Mouse| Cat| 1|
+---+------+-----+------+
Y luego suelte el num
columna:
new_df.drop('num').show()
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
| Dog| Cat| 5|
| Cat| Dog| 4|
| Mouse| Cat| 1|
+------+-----+------+
Editar:
Otra forma, aunque quizás fea y un poco ineficiente, si no puede ordenar por una columna, es volver a rdd y hacer lo siguiente:
a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
# or create the rdd from the start:
# a = spark.sparkContext.parallelize([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")])
a = a.rdd.zipWithIndex()
a = a.toDF()
a.show()
+-----------+---+
| _1| _2|
+-----------+---+
| [Dog,Cat]| 0|
| [Cat,Dog]| 1|
|[Mouse,Cat]| 2|
+-----------+---+
a = a.select(bb._1.getItem('Animal').alias('Animal'), bb._1.getItem('Enemy').alias('Enemy'), bb._2.alias('num'))
def add_labels(indx):
return rating[indx] # indx here will start from zero
labels_udf = udf(add_labels, IntegerType())
new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---------+--------+---+------+
|Animal | Enemy|num|Rating|
+---------+--------+---+------+
| Dog| Cat| 0| 5|
| Cat| Dog| 1| 4|
| Mouse| Cat| 2| 1|
+---------+--------+---+------+
(No recomendaría esto si tienes muchos datos)
Espero que esto ayude, buena suerte!
Puede que me equivoque, pero creo que la respuesta aceptada no funcionará. monotonically_increasing_id
solo garantiza que los identificadores serán únicos y crecientes, no que serán consecutivos. Por lo tanto, usarlo en dos marcos de datos diferentes probablemente creará dos columnas muy diferentes, y la unión en su mayoría regresará vacía.
inspirándonos en esta respuesta https://stackoverflow.com/a/48211877/7225303 a una pregunta similar, podríamos cambiar la respuesta incorrecta a:
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
["Animal", "Enemy"])
a.show()
+------+-----+
|Animal|Enemy|
+------+-----+
| Dog| Cat|
| Cat| Dog|
| Mouse| Cat|
+------+-----+
#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
b.show()
+------+
|Rating|
+------+
| 5|
| 4|
| 1|
+------+
a = a.withColumn("idx", F.monotonically_increasing_id())
b = b.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
a = a.withColumn("idx", F.row_number().over(windowSpec))
b = b.withColumn("idx", F.row_number().over(windowSpec))
a.show()
+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
| Dog| Cat| 1|
| Cat| Dog| 2|
| Mouse| Cat| 3|
+------+-----+---+
b.show()
+------+---+
|Rating|idx|
+------+---+
| 5| 1|
| 4| 2|
| 1| 3|
+------+---+
final_df = a.join(b, a.idx == b.idx).drop("idx")
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
| Dog| Cat| 5|
| Cat| Dog| 4|
| Mouse| Cat| 1|
+------+-----+------+
Si conservas algún reparo y disposición de avanzar nuestro enunciado eres capaz de ejecutar una crítica y con deseo lo leeremos.