Intenta comprender el código bien antes de adaptarlo a tu trabajo si tquieres aportar algo puedes decirlo en los comentarios.
Solución:
Si desea dividir su marco de datos en dos diferentes, haga dos selecciones en él con las diferentes columnas que desee.
val sourceDf = spark.read.csv(...)
val df1 = sourceDF.select("first column", "second column", "third column")
val df2 = sourceDF.select("first column", "second column", "third column")
Tenga en cuenta que esto, por supuesto, significa que sourceDf se evaluaría dos veces, por lo que si puede caber en la memoria distribuida y usa la mayoría de las columnas en ambos marcos de datos, podría ser una buena idea almacenarlo en caché. Si tiene muchas columnas adicionales que no necesita, entonces puede hacer una selección primero para seleccionar las columnas que necesitará para que almacene todos esos datos adicionales en la memoria.
Digamos que nuestro marco de datos principal tiene ‘norte’ columnas
podemos crear ‘X’ Child DataFrames (Consideremos 2 en nuestro caso).
Las columnas para el marco de datos secundario se pueden elegir según lo desee de cualquiera de las columnas del marco de datos principal.
Considere que la fuente tiene 10 columnas y queremos dividirnos 2 tramas de datos que contiene columnas a las que se hace referencia desde el marco de datos principal.
Las columnas para el marco de datos secundario se pueden decidir usando el Seleccione API de marco de datos
val parentDF = spark.read.format("csv").load("/path of the CSV file")
val Child1_DF = parentDF.select("col1","col2","col3","col9","col10").show()
val child2_DF = parentDF.select("col5", "col6","col7","col8","col1","col2").show()
Tenga en cuenta que el recuento de columnas en los marcos de datos secundarios puede diferir en longitud y será menor que el recuento de columnas del marco de datos principal.
también podemos referirnos a los nombres de las columnas sin mencionar los nombres reales usando los índices posicionales de la columna deseada del marco de datos principal
Primero importe los implícitos de chispa que actúan como una clase de ayuda para el uso de la notación $ para acceder a las columnas usando los índices posicionales
import spark.implicits._
import org.apache.spark.sql.functions._
val child3_DF = parentDF.select("_c0","_c1","_c2","_c8","_c9").show()
también podemos seleccionar la columna basándonos en ciertas condiciones. Digamos que queremos que solo se seleccionen columnas con números pares en el marco de datos secundario. Incluso nos referimos a columnas incluso indexadas y el índice comienza desde ‘0’
val parentColumns = parentDF.columns.toList
res0: List[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,_c8,_c9)
val evenParentColumns = res0.zipWithIndex.filter(_._2 % 2 == 0).map( _._1).toSeq
res1: scala.collection.immutable.Seq[String] = List(_c0, _c2, _c4, _c6,_c8)
Ahora alimente estas columnas para que se seleccionen desde parentDF. Tenga en cuenta que la API de selección necesita argumentos de tipo seq. Así que convertimos “evenParentColumns” en la colección Seq
val child4_DF = parentDF.select(res1.head, res1.tail:_*).show()
Esto mostrará las columnas indexadas pares del marco de datos principal.
| _c0 | _c2 | _c4 |_c6 |_c8 |
|ITE00100554|TMAX|null| mi| 1 |
|TE00100554 |TMIN|null| mi| 4 |
|GM000010962|PRCP|null| mi| 7 |
Así que ahora nos quedan las columnas pares en el marco de datos
Del mismo modo, también podemos aplicar otras operaciones a la columna Dataframe como se muestra a continuación
val child5_DF = parentDF.select($"_c0", $"_c8" + 1).show()
Entonces, de muchas maneras, como se mencionó, podemos seleccionar las columnas en el marco de datos.
Hay múltiples opciones (especialmente en Scala) para seleccionar un subconjunto de columnas de ese marco de datos. Las siguientes líneas seleccionarán las dos columnas. colA
y colB
:
import spark.implicits._
import org.apache.spark.sql.functions.col, column, expr
inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))
// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol
// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)
// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)
el uso de $
es posible ya que Scala proporciona una clase implícita que convierte una cadena en una columna usando el método $
:
implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef
def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = /* compiled code */
Por lo general, cuando desee derivar un DataFrame a varios DataFrames, podría mejorar su rendimiento si persist
el DataFrame original antes de crear los demás. Al final puedes unpersist
la trama de datos original.
Manten eso en mente columnas no se resuelven en el momento de la compilación, sino solo cuando se comparan con los nombres de columna de su catálogo, lo que sucede durante la fase de análisis de la ejecución de la consulta. En caso de que necesite un tipo de seguridad más fuerte, puede crear un Dataset
.
Para completar, aquí está el csv para probar el código anterior:
// csv file:
// colA,colB,colC
// 1,"foo","bar"
val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)
// resulting DataFrame schema
root
|-- colA: string (nullable = true)
|-- colB: string (nullable = true)
|-- colC: string (nullable = true)
Aquí puedes ver las comentarios y valoraciones de los usuarios
Recuerda que tienes la opción de añadir una tasación si diste con la contestación.