Solución:
Puede hacer coincidir patrones en el escribe de la columna (usando el DataFrame esquema) para decidir si analizar la cadena en una marca de tiempo o simplemente usar la marca de tiempo tal como está, y usar el unix_timestamp
función para hacer la conversión real:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
// preparing some example data - df1 with String type and df2 with Timestamp type
val df1 = Seq(("a", "2016-02-01"), ("b", "2016-02-02")).toDF("key", "date")
val df2 = Seq(
("a", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-01").getTime)),
("b", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-02").getTime))
).toDF("key", "date")
// If column is String, converts it to Timestamp
def normalizeDate(df: DataFrame): DataFrame = {
df.schema("date").dataType match {
case StringType => df.withColumn("date", unix_timestamp($"date", "yyyy-MM-dd").cast("timestamp"))
case _ => df
}
}
// after "normalizing", you can assume date has Timestamp type -
// both would print the same thing:
normalizeDate(df1).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
normalizeDate(df2).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
Aquí hay algunas cosas que puede probar:
(1) Empiece a utilizar la función inferSchema durante la carga si tiene una versión que la admita. Esto hará que Spark figurara el tipo de datos de las columnas, esto no funciona en todos los escenarios. También mire los datos de entrada, si tiene citas, le aconsejo que agregue un argumento adicional para dar cuenta de ellas durante la carga.
val inputDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load(fileLocation)
(2) Para identificar el tipo de datos de una columna, puede usar el siguiente código, colocará todos los nombres de columna y los tipos de datos en sus propias matrices de cadenas.
val columnNames : Array[String] = inputDF.columns
val columnDataTypes : Array[String] = inputDF.schema.fields.map(x=>x.dataType).map(x=>x.toString)