Mantén la atención porque en esta sección hallarás la contestación que buscas.
Solución:
Spark/Scala probablemente ya tenga un método de ayuda/utilidad que hará esto por mí.
Estás bien. Spark ya tiene su propio esquema y código de inferencia de tipos de datos que utiliza para inferir el esquema de las fuentes de datos subyacentes (csv, json, etc.). Así que puede verlo para implementar el suyo propio (la implementación real está marcada como privada para Spark y es vinculado a RDD y clases internas, por lo que no se puede usar directamente desde un código fuera de Spark, pero debería darle una buena idea sobre cómo hacerlo).
Dado que csv es de tipo plano (y json puede tener una estructura anidada), la inferencia del esquema csv es relativamente más sencilla y debería ayudarlo con la tarea que está tratando de lograr anteriormente. Así que explicaré cómo funciona la inferencia csv (la inferencia json solo necesita tener en cuenta la estructura posiblemente anidada, pero la inferencia del tipo de datos es bastante análoga).
Con ese prólogo, lo que desea ver es el objeto CSVInferSchema. En particular, mira el infer
método que toma un RDD[Array[String]]
e inferir el tipo de datos para cada elemento del array en todo el RDD. La forma en que lo hace es: marca cada campo como NullType
para empezar y luego, a medida que itera sobre la siguiente fila de valores (Array[String]
) en el RDD
actualiza lo ya inferido DataType
a un nuevo DataType
si el nuevo DataType
es más específico. Esto está pasando aquí:
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)
Ahora inferRowType
llamadas inferField
para cada uno de los campos de la fila. inferField
la implementación es lo que probablemente esté buscando: se necesita el tipo inferido hasta ahora para un campo en particular y el string valor del campo para la fila actual como parámetro. Luego devuelve el tipo inferido existente o si el nuevo tipo inferido es más específico que el nuevo tipo.
La sección relevante del código es la siguiente:
typeSoFar match
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
Tenga en cuenta que si el typeSoFar
es NullType, primero intenta analizarlo como Integer
pero tryParseInteger
call es una cadena de llamadas a análisis de tipo inferior. Entonces, si no puede analizar el valor como Integer, invocará tryParseLong
que en caso de falla invocará tryParseDecimal
que en caso de falla invocará tryParseDouble
guau tryParseTimestamp
guau tryParseBoolean
wofwi finalmente stringType
.
Por lo tanto, puede usar una lógica similar para implementar cualquiera que sea su caso de uso. (Si no necesita fusionar filas, simplemente implemente todos los tryParse*
métodos textualmente y simplemente invocar tryParseInteger
. No es necesario escribir su propia expresión regular).
Espero que esto ayude.
Sí, por supuesto que Spark tiene la magia que necesitas.
En Spark 2.x es CatalystSqlParser
objeto, definido aquí.
Por ejemplo:
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
CatalystSqlParser.parseDataType("string") // StringType
CatalystSqlParser.parseDataType("int") // IntegerType
Y así.
Pero, según tengo entendido, no es parte de la API pública y, por lo tanto, puede cambiar en las próximas versiones sin advertencias.
Así que puedes implementar tu método como:
def toSparkType(inputType: String): DataType = CatalystSqlParser.parseDataType(inputType)
Puedes añadir valor a nuestra información añadiendo tu veteranía en las observaciones.