Saltar al contenido

¿Cómo consultar la columna de datos JSON usando Spark DataFrames?

Esta división fue analizado por especialistas así garantizamos la exactitud de nuestro tutorial.

Chispa> = 2,4

Si es necesario, el esquema se puede determinar utilizando schema_of_json función (tenga en cuenta que esto supone que una fila arbitraria es un representante válido del esquema).

import org.apache.spark.sql.functions.lit, schema_of_json, from_json
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

Chispa> = 2,1

Puedes usar from_json función:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

Chispa> = 1.6

Puedes usar get_json_object que toma una columna y una ruta:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

y extrae campos a cadenas individuales que se pueden convertir a los tipos esperados.

El path El argumento se expresa mediante la sintaxis de puntos, con $. que denota la raíz del documento (ya que el código anterior usa string interpolación $ tiene que ser escapado, por lo tanto $$.).

Chispa <= 1,5:

¿Es esto posible actualmente?

Hasta donde yo sé, no es directamente posible. Puedes probar algo similar a esto:

val df = sc.parallelize(Seq(
  ("1", """"k": "foo", "v": 1.0""", "some_other_field_1"),
  ("2", """"k": "bar", "v": 3.0""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

yo asumo eso blob El campo no se puede representar en JSON. De lo contrario, omitirá dividir y unir:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map
  case Row(key: String, json: String) =>
    s""""key": "$key", "jsonData": $json"""
) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

Un enfoque alternativo (más barato, aunque más complejo) es utilizar una UDF para analizar JSON y generar un struct o map columna. Por ejemplo, algo como esto:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => 
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
)

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)

La respuesta de zero323 es completa pero pierde un enfoque que está disponible en Spark 2.1+ y es más simple y más robusto que usar schema_of_json():

import org.apache.spark.sql.functions.from_json

val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))

Aquí está el equivalente de Python:

from pyspark.sql.functions import from_json

json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))

El problema con schema_of_json(), como señala zero323, es que inspecciona un solo string y deriva un esquema de eso. Si tiene datos JSON con esquemas variados, entonces el esquema del que obtiene schema_of_json() no reflejará lo que obtendría si fusionara los esquemas de todos los datos JSON en su DataFrame. Analizando esos datos con from_json() entonces producirá una gran cantidad de null o valores vacíos donde el esquema devuelto por schema_of_json() no coincide con los datos.

Al utilizar la capacidad de Spark para derivar un esquema JSON completo a partir de un RDD de cadenas JSON, podemos garantizar que se puedan analizar todos los datos JSON.

Ejemplo: schema_of_json() vs. spark.read.json()

Aquí hay un ejemplo (en Python, el código es muy similar para Scala) para ilustrar la diferencia entre derivar el esquema de un solo elemento con schema_of_json() y derivarlo de todos los datos usando spark.read.json().

>>> df = spark.createDataFrame(
...     [
...         (1, '"a": true'),
...         (2, '"a": "hello"'),
...         (3, '"b": 22'),
...     ],
...     schema=['id', 'jsonData'],
... )

a tiene un valor booleano en una fila y un string valor en otro. El esquema combinado para a establecería su tipo en string. b sería un número entero.

Veamos cómo se comparan los diferentes enfoques. Primero el schema_of_json() Acercarse:

>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: boolean (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1|  [true]|
|  2|    null|
|  3|      []|
+---+--------+

Como puede ver, el esquema JSON que derivamos era muy limitado. "a": "hello" no se pudo analizar como un booleano y se devolvió null, y "b": 22 simplemente se eliminó porque no estaba en nuestro esquema.

Ahora con spark.read.json():

>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: long (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1| [true,]|
|  2|[hello,]|
|  3|  [, 22]|
+---+--------+

Aquí tenemos todos nuestros datos conservados y con un esquema completo que da cuenta de todos los datos. "a": true fue lanzado como un string para que coincida con el esquema de "a": "hello".

La principal desventaja de usar spark.read.json() es que Spark escaneará todos sus datos para derivar el esquema. Dependiendo de la cantidad de datos que tenga, esa sobrecarga podría ser significativa. Si tu saber que todos sus datos JSON tienen un esquema consistente, está bien seguir adelante y usar schema_of_json() contra un solo elemento. Si tiene variabilidad de esquema pero no desea escanear todos sus datos, puede configurar samplingRatio a algo menos que 1.0 en tu llamada a spark.read.json() para mirar un subconjunto de los datos.

Aquí están los documentos para spark.read.json(): API de Scala / API de Python

El from_json La función es exactamente lo que estás buscando. Su código se verá algo así como:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

//You can define whatever struct type that your json states
val schema = StructType(Seq(
  StructField("key", StringType, true), 
  StructField("value", DoubleType, true)
))

df.withColumn("jsonData", from_json(col("jsonData"), schema))

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)


Tags :

Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *