Saltar al contenido

Spark Dataframe: cómo agregar una columna de índice: también conocido como índice de datos distribuidos

[*]Te recomendamos que revises esta respuesta en un entorno controlado antes de pasarlo a producción, un saludo.

Solución:

[*]Con Scala puedes usar:

import org.apache.spark.sql.functions._ 

df.withColumn("id",monotonicallyIncreasingId)

[*]Puede consultar este ejemplo y Scala docs.

[*]Con Pyspark puedes usar:

from pyspark.sql.functions import monotonically_increasing_id 

df_index = df.select("*").withColumn("id", monotonically_increasing_id())

[*]monótonamente_creciente_id – Se garantiza que el ID generado será monotónicamente creciente y único, pero no consecutivo.

[*]“Quiero agregar una columna del 1 al número de la fila”.

[*]Digamos que tenemos el siguiente DF

+--------+-------------+-------+
| userId | productCode | count |
+--------+-------------+-------+
|     25 |        6001 |     2 |
|     11 |        5001 |     8 |
|     23 |         123 |     5 |
+--------+-------------+-------+

[*]Para generar los IDs a partir de 1

val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))

[*]Esto agregaría una columna de índice ordenada por valor creciente de conteo.

+--------+-------------+-------+-------+
| userId | productCode | count | index |
+--------+-------------+-------+-------+
|     25 |        6001 |     2 |     1 |
|     23 |         123 |     5 |     2 |
|     11 |        5001 |     8 |     3 |
+--------+-------------+-------+-------+

[*]NOTA : Los enfoques anteriores no dan un número de secuencia, pero sí dan una identificación creciente.

[*]Una forma sencilla de hacerlo y asegurarse de que el orden de los índices sea como se muestra a continuación. zipWithIndex.

[*]Data de muestra.

+-------------------+
|               Name|
+-------------------+
|     Ram Ghadiyaram|
|        Ravichandra|
|              ilker|
|               nick|
|             Naveed|
|      Gobinathan SP|
|Sreenivas Venigalla|
|     Jackela Kowski|
|   Arindam Sengupta|
|            Liangpi|
|             Omar14|
|        anshu kumar|
+-------------------+

    package com.example

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType, StructField, StructType
import org.apache.spark.sql.DataFrame, Row

/**
  * DistributedDataIndex : Program to index an RDD  with
  */
object DistributedDataIndex extends App with Logging 

  val spark = builder
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()

  import spark.implicits._

  val df = spark.sparkContext.parallelize(
    Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
      , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
    )).toDF("Name")
  df.show
  logInfo("addColumnIndex here")
  // Add index now...
  val df1WithIndex = addColumnIndex(df)
    .withColumn("monotonically_increasing_id", monotonically_increasing_id)
  df1WithIndex.show(false)

  /**
    * Add Column Index to dataframe to each row
    */
  def addColumnIndex(df: DataFrame) = 
    spark.sqlContext.createDataFrame(
      df.rdd.zipWithIndex.map 
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      ,
      // Create schema for index column
      StructType(df.schema.fields :+ StructField("index", LongType, false)))
  

[*]resultado :

+-------------------+-----+---------------------------+
|Name               |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram     |0    |0                          |
|Ravichandra        |1    |8589934592                 |
|ilker              |2    |8589934593                 |
|nick               |3    |17179869184                |
|Naveed             |4    |25769803776                |
|Gobinathan SP      |5    |25769803777                |
|Sreenivas Venigalla|6    |34359738368                |
|Jackela Kowski     |7    |42949672960                |
|Arindam Sengupta   |8    |42949672961                |
|Liangpi            |9    |51539607552                |
|Omar14             |10   |60129542144                |
|anshu kumar        |11   |60129542145                |
+-------------------+-----+---------------------------+

[*]

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *