Este equipo de especialistas pasados muchos días de trabajo y recopilar de datos, dimos con la respuesta, esperamos que todo este artículo sea de gran utilidad para tu trabajo.
Solución:
A DataFrame
se define bien con una búsqueda en Google de “Definición de DataFrame”:
Un marco de datos es una tabla, o una estructura bidimensional similar a una matriz, en la que cada columna contiene medidas de una variable y cada fila contiene un caso.
Entonces, un DataFrame
tiene metadatos adicionales debido a su formato tabular, que le permite a Spark ejecutar ciertas optimizaciones en la consulta finalizada.
Un RDD
, por otro lado, es simplemente un Resilient Datribuido Dun conjunto que es más una caja negra de datos que no se pueden optimizar, ya que las operaciones que se pueden realizar contra él no están tan restringidas.
Sin embargo, puede pasar de un DataFrame a un RDD
a través de su rdd
método, y puede pasar de un RDD
a un DataFrame
(si el RDD está en formato tabular) a través del toDF
método
En general se recomienda utilizar un DataFrame
siempre que sea posible debido a la optimización de consultas incorporada.
Lo primero es
DataFrame
fue evolucionado deSchemaRDD
.
Sí … conversión entre Dataframe
y RDD
es absolutamente posible.
A continuación, se muestran algunos fragmentos de código de muestra.
df.rdd
esRDD[Row]
A continuación se muestran algunas de las opciones para crear un marco de datos.
-
1)
yourrddOffrow.toDF
se convierte enDataFrame
. -
2) Utilizando
createDataFrame
de contexto sqlval df = spark.createDataFrame(rddOfRow, schema)
donde el esquema puede ser de algunas de las opciones siguientes, como se describe en una buena publicación de SO.
De la clase de caso de scala y la api de reflexión de scalaimport org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
O usando
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
como se describe en Schema también se puede crear usando
StructType
y
StructField
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
De hecho, ahora hay 3 API de Apache Spark …
-
RDD
API:
los
RDD
(Conjunto de datos distribuido resistente) La API ha estado en Spark desde la versión 1.0.los
RDD
API proporciona muchos métodos de transformación, comomap
(),
filter
(), yreduce
() para realizar cálculos sobre los datos. Cada uno de estos métodos da como resultado un nuevoRDD
que representa los datos transformados. Sin embargo, estos métodos solo definen las operaciones que se realizarán y las transformaciones no se realizan hasta que se llama a un método de acción. Ejemplos de métodos de acción soncollect
() y
saveAsObjectFile
().
Ejemplo de RDD:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
Ejemplo: filtrar por atributo con RDD
rdd.filter(_.age > 21)
-
DataFrame
API
Spark 1.3 introdujo un nuevo
DataFrame
API como parte de la iniciativa Project Tungsten que busca mejorar el rendimiento y la escalabilidad de Spark. losDataFrame
API introduce el concepto de un esquema para describir los datos, lo que permite que Spark administre el esquema y solo pase datos entre nodos, de una manera mucho más eficiente que usando la serialización de Java.los
DataFrame
API es radicalmente diferente de laRDD
API porque es una API para crear un plan de consulta relacional que luego puede ejecutar el optimizador Catalyst de Spark. La API es natural para los desarrolladores que están familiarizados con la creación de planes de consulta.
Ejemplo de estilo SQL:
df.filter("age > 21");
Limitaciones:
Debido a que el código se refiere a los atributos de datos por su nombre, el compilador no puede detectar ningún error. Si los nombres de los atributos son incorrectos, el error solo se detectará en tiempo de ejecución, cuando se crea el plan de consulta.
Otro inconveniente con el DataFrame
La API es que está muy centrada en Scala y, si bien es compatible con Java, el soporte es limitado.
Por ejemplo, al crear un DataFrame
de un existente RDD
de objetos Java, el optimizador Catalyst de Spark no puede inferir el esquema y asume que cualquier objeto en el DataFrame implementa el scala.Product
interfaz. Scala case class
funciona bien porque implementan esta interfaz.
-
Dataset
API
los
Dataset
API, lanzada como una vista previa de API en Spark 1.6, tiene como objetivo proporcionar lo mejor de ambos mundos; el familiar estilo de programación orientada a objetos y la seguridad de tipos en tiempo de compilación delRDD
API pero con los beneficios de rendimiento del optimizador de consultas de Catalyst. Los conjuntos de datos también utilizan el mismo mecanismo eficiente de almacenamiento fuera del montón que el
DataFrame
API.Cuando se trata de serializar datos, el
Dataset
API tiene el concepto de
codificadores que se traducen entre representaciones de JVM (objetos) y el formato binario interno de Spark. Spark tiene codificadores integrados que son muy avanzados en el sentido de que generan código de bytes para interactuar con datos fuera del montón y brindan acceso bajo demanda a atributos individuales sin tener que deserializar un objeto completo. Spark aún no proporciona una API para implementar codificadores personalizados, pero está previsto para una versión futura.Además, el
Dataset
La API está diseñada para funcionar igualmente bien con Java y Scala. Cuando se trabaja con objetos Java, es importante que sean totalmente compatibles con beans.
Ejemplo Dataset
Estilo API SQL:
dataset.filter(_.age < 21);
Evaluaciones dif. Entre DataFrame
Y DataSet
:
Flujo de nivel catalista ... (Desmitificando la presentación de DataFrame y Dataset de Spark Summit)
Para leer más ... artículo de databricks: una historia de tres API de Apache Spark: RDD frente a DataFrames y conjuntos de datos
Apache Spark proporciona tres tipos de API
- RDD
- Marco de datos
- Conjunto de datos
Aquí está la comparación de API entre RDD, Dataframe y Dataset.
RDD
La principal abstracción que proporciona Spark es un conjunto de datos distribuido resistente (RDD), que es una colección de elementos particionados en los nodos del clúster que se pueden operar en paralelo.
Características de RDD: -
-
Colección distribuida:
RDD utiliza operaciones MapReduce que se adoptan ampliamente para procesar y generar grandes conjuntos de datos con un algoritmo distribuido paralelo en un clúster. Permite a los usuarios escribir cálculos en paralelo, utilizando un conjunto de operadores de alto nivel, sin tener que preocuparse por la distribución del trabajo y la tolerancia a fallas.
-
Inmutable: RDD compuestos por una colección de registros que están particionados. Una partición es una unidad básica de paralelismo en un RDD, y cada partición es una división lógica de datos que es inmutable y se crea a través de algunas transformaciones en particiones existentes. La inmutabilidad ayuda a lograr consistencia en los cálculos.
-
Tolerante a fallos:
En el caso de que perdamos alguna partición de RDD, podemos reproducir la transformación en esa partición en el linaje para lograr el mismo cálculo, en lugar de hacer la replicación de datos en múltiples nodos. Esta característica es el mayor beneficio de RDD porque ahorra una gran cantidad de esfuerzos en la administración y replicación de datos y, por lo tanto, logra cálculos más rápidos. -
Evaluaciones perezosas: Todas las transformaciones en Spark son perezosas, ya que no calculan sus resultados de inmediato. En cambio, solo recuerdan las transformaciones aplicadas a algún conjunto de datos base. Las transformaciones solo se calculan cuando una acción requiere que se devuelva un resultado al programa controlador.
-
Transformaciones funcionales:
Los RDD admiten dos tipos de operaciones: transformaciones, que crean un nuevo conjunto de datos a partir de uno existente, y acciones, que devuelven un valor al programa controlador después de ejecutar un cálculo en el conjunto de datos. -
Formatos de procesamiento de datos:
Puede procesar de manera fácil y eficiente datos tanto estructurados como no estructurados.
-
Lenguajes de programación compatibles:
RDD API está disponible en Java, Scala, Python y R.
Limitaciones de RDD: -
-
Sin motor de optimización incorporado:
Cuando se trabaja con datos estructurados, los RDD no pueden aprovechar las ventajas de los optimizadores avanzados de Spark, incluido el optimizador de catalizador y el motor de ejecución de tungsteno. Los desarrolladores deben optimizar cada RDD en función de sus atributos. -
Manejo de datos estructurados:
A diferencia de Dataframe y conjuntos de datos, los RDD no infieren el esquema de los datos ingeridos y requieren que el usuario lo especifique.
Marcos de datos
Spark introdujo Dataframes en la versión Spark 1.3. Dataframe supera los desafíos clave que tenían los RDD.
Un DataFrame es una colección distribuida de datos organizados en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional o un R / Python Dataframe. Junto con Dataframe, Spark también introdujo el optimizador de catalizador, que aprovecha las funciones de programación avanzadas para crear un optimizador de consultas extensible.
Características del marco de datos: -
-
Colección distribuida de objeto de fila:
Un DataFrame es una colección distribuida de datos organizados en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional, pero con optimizaciones más ricas bajo el capó. -
Procesamiento de datos:
Procesamiento de formatos de datos estructurados y no estructurados (Avro, CSV, búsqueda elástica y Cassandra) y sistemas de almacenamiento (HDFS, tablas HIVE, MySQL, etc). Puede leer y escribir desde todas estas diversas fuentes de datos. -
Optimización usando optimizador de catalizador:
Impulsa tanto las consultas SQL como la API de DataFrame. El marco de datos utiliza el marco de transformación del árbol del catalizador en cuatro fases,1.Analyzing a logical plan to resolve references 2.Logical plan optimization 3.Physical planning 4.Code generation to compile parts of the query to Java bytecode.
-
Compatibilidad de la colmena:
Con Spark SQL, puede ejecutar consultas de Hive sin modificar en sus almacenes de Hive existentes. Reutiliza la interfaz de Hive y MetaStore y le brinda compatibilidad total con los datos, las consultas y las UDF de Hive existentes. -
Tungsteno:
Tungsten proporciona un backend de ejecución física que gestiona explícitamente la memoria y genera de forma dinámica un código de bytes para la evaluación de expresiones. -
Lenguajes de programación compatibles:
La API de Dataframe está disponible en Java, Scala, Python y R.
Limitaciones del marco de datos: -
- Seguridad de tipo en tiempo de compilación:
Como se discutió, la API de Dataframe no admite la seguridad del tiempo de compilación, lo que le limita la manipulación de datos cuando se desconoce la estructura. El siguiente ejemplo funciona durante el tiempo de compilación. Sin embargo, obtendrá una excepción de tiempo de ejecución al ejecutar este código.
Ejemplo:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
Esto es un desafío especialmente cuando se trabaja con varios pasos de transformación y agregación.
- No se puede operar en el objeto de dominio (objeto de dominio perdido):
Una vez que haya transformado un objeto de dominio en un marco de datos, no podrá regenerarlo a partir de él. En el siguiente ejemplo, una vez que hayamos creado personDF a partir de personRDD, no recuperaremos el RDD original de la clase Person (RDD[Person]).
Ejemplo:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
API de conjuntos de datos
Dataset API es una extensión de DataFrames que proporciona una interfaz de programación orientada a objetos y con seguridad de tipos. Es una colección inmutable y fuertemente tipada de objetos que se asignan a un esquema relacional.
En el núcleo del conjunto de datos, la API es un nuevo concepto llamado codificador, que es responsable de convertir entre objetos JVM y representación tabular. La representación tabular se almacena utilizando el formato binario interno de Tungsteno de Spark, lo que permite operaciones con datos serializados y una mejor utilización de la memoria. Spark 1.6 viene con soporte para generar codificadores automáticamente para una amplia variedad de tipos, incluidos los tipos primitivos (por ejemplo, String, Integer, Long), clases de casos Scala y Java Beans.
Características del conjunto de datos: -
-
Ofrece lo mejor de RDD y Dataframe:
RDD (programación funcional, seguridad de tipos), DataFrame (modelo relacional, optimización de consultas, ejecución de tungsteno, clasificación y barajado) -
Codificadores:
Con el uso de codificadores, es fácil convertir cualquier objeto JVM en un conjunto de datos, lo que permite a los usuarios trabajar con datos estructurados y no estructurados a diferencia de Dataframe. -
Lenguajes de programación compatibles:
Actualmente, la API de conjuntos de datos solo está disponible en Scala y Java. Actualmente, Python y R no son compatibles con la versión 1.6. El soporte de Python está programado para la versión 2.0. -
Tipo de seguridad:
La API de conjuntos de datos proporciona seguridad en el tiempo de compilación que no estaba disponible en los marcos de datos. En el siguiente ejemplo, podemos ver cómo Dataset puede operar en objetos de dominio con funciones lambda de compilación.
Ejemplo:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
- Interoperable: Los conjuntos de datos le permiten convertir fácilmente sus RDD y marcos de datos existentes en conjuntos de datos sin código repetitivo.
Limitación de API de conjuntos de datos: -
- Requiere conversión de tipo a String:
Consultar los datos de los conjuntos de datos actualmente requiere que especifiquemos los campos de la clase como una cadena. Una vez que hemos consultado los datos, nos vemos obligados a convertir la columna al tipo de datos requerido. Por otro lado, si usamos la operación de mapa en conjuntos de datos, no usará el optimizador de Catalyst.
Ejemplo:
ds.select(col("name").as[String], $"age".as[Int]).collect()
No es compatible con Python y R: a partir de la versión 1.6, los conjuntos de datos solo son compatibles con Scala y Java. La compatibilidad con Python se introducirá en Spark 2.0.
La API de conjuntos de datos ofrece varias ventajas sobre el RDD y la API de marco de datos existentes con una mejor seguridad de tipos y programación funcional. Con el desafío de los requisitos de conversión de tipos en la API, aún no tendrá la seguridad de tipos requerida y hará que su código sea frágil.
Sección de Reseñas y Valoraciones
Si te ha resultado de ayuda este post, sería de mucha ayuda si lo compartes con más programadores y nos ayudes a extender nuestro contenido.