Esta cuestión se puede solucionar de diversas formas, por lo tanto te dejamos la que para nosotros es la resolución más completa.
Solución:
Hice este script, funcionó para mis 10 marcos de datos pandas
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
Puedes verlo también en esta esencia.
Con esto solo tienes que llamar spark_df = pandas_to_spark(pandas_df)
Debe asegurarse de que las columnas del marco de datos de pandas sean apropiadas para el tipo que está infiriendo Spark. Si su marco de datos de pandas enumera algo como:
pd.info()
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
Y obtienes ese error, prueba:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
Ahora asegúrate .astype(str)
es en realidad el tipo que desea que sean esas columnas. Básicamente, cuando el código Java subyacente intenta inferir el tipo de un objeto en Python, usa algunas observaciones y hace una suposición, si esa suposición no se aplica a todos los datos en la (s) columna (s) que está tratando de convertir de pandas a chispa fallará.
Los errores relacionados con el tipo se pueden evitar mediante imponer un esquema como sigue:
Nota: se creó un archivo de texto (test.csv) con los datos originales (como arriba) y se insertaron nombres de columna hipotéticos (“col1”, “col2”, …, “col25”).
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
contenido del marco de datos de pandas:
col1 col2 col3 col4 col5 col6 col7 col8 ...
0 10000001 1 0 1 12:35 OK 10002 1 ...
1 10000001 2 0 1 12:36 OK 10002 1 ...
2 10000002 1 0 4 12:19 PA 10003 1 ...
A continuación, cree el esquema:
from pyspark.sql.types import *
mySchema = StructType([ StructField("col1", LongType(), True)
,StructField("col2", IntegerType(), True)
,StructField("col3", IntegerType(), True)
,StructField("col4", IntegerType(), True)
,StructField("col5", StringType(), True)
,StructField("col6", StringType(), True)
,StructField("col7", IntegerType(), True)
,StructField("col8", IntegerType(), True)
,StructField("col9", IntegerType(), True)
,StructField("col10", IntegerType(), True)
,StructField("col11", StringType(), True)
,StructField("col12", StringType(), True)
,StructField("col13", IntegerType(), True)
,StructField("col14", IntegerType(), True)
,StructField("col15", IntegerType(), True)
,StructField("col16", IntegerType(), True)
,StructField("col17", IntegerType(), True)
,StructField("col18", IntegerType(), True)
,StructField("col19", IntegerType(), True)
,StructField("col20", IntegerType(), True)
,StructField("col21", IntegerType(), True)
,StructField("col22", IntegerType(), True)
,StructField("col23", IntegerType(), True)
,StructField("col24", IntegerType(), True)
,StructField("col25", IntegerType(), True)])
Nota: True
(implica anulable permitido)
crea el marco de datos de pyspark:
df = spark.createDataFrame(pdDF,schema=mySchema)
confirme que el marco de datos de pandas ahora es un marco de datos de pyspark:
type(df)
producción:
pyspark.sql.dataframe.DataFrame
Aparte:
Para abordar el comentario de Kate a continuación, para imponer un esquema general (String), puede hacer lo siguiente:
df=spark.createDataFrame(pdDF.astype(str))