Saltar al contenido

Error al convertir el marco de datos de Pandas en el marco de datos de Spark

Esta cuestión se puede solucionar de diversas formas, por lo tanto te dejamos la que para nosotros es la resolución más completa.

Solución:

Hice este script, funcionó para mis 10 marcos de datos pandas

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

Puedes verlo también en esta esencia.

Con esto solo tienes que llamar spark_df = pandas_to_spark(pandas_df)

Debe asegurarse de que las columnas del marco de datos de pandas sean apropiadas para el tipo que está infiriendo Spark. Si su marco de datos de pandas enumera algo como:

pd.info()

RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

Y obtienes ese error, prueba:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

Ahora asegúrate .astype(str) es en realidad el tipo que desea que sean esas columnas. Básicamente, cuando el código Java subyacente intenta inferir el tipo de un objeto en Python, usa algunas observaciones y hace una suposición, si esa suposición no se aplica a todos los datos en la (s) columna (s) que está tratando de convertir de pandas a chispa fallará.

Los errores relacionados con el tipo se pueden evitar mediante imponer un esquema como sigue:

Nota: se creó un archivo de texto (test.csv) con los datos originales (como arriba) y se insertaron nombres de columna hipotéticos (“col1”, “col2”, …, “col25”).

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")

contenido del marco de datos de pandas:

       col1     col2    col3    col4    col5    col6    col7    col8   ... 
0      10000001 1       0       1       12:35   OK      10002   1      ...
1      10000001 2       0       1       12:36   OK      10002   1      ...
2      10000002 1       0       4       12:19   PA      10003   1      ...

A continuación, cree el esquema:

from pyspark.sql.types import *

mySchema = StructType([ StructField("col1", LongType(), True)
                       ,StructField("col2", IntegerType(), True)
                       ,StructField("col3", IntegerType(), True)
                       ,StructField("col4", IntegerType(), True)
                       ,StructField("col5", StringType(), True)
                       ,StructField("col6", StringType(), True)
                       ,StructField("col7", IntegerType(), True)
                       ,StructField("col8", IntegerType(), True)
                       ,StructField("col9", IntegerType(), True)
                       ,StructField("col10", IntegerType(), True)
                       ,StructField("col11", StringType(), True)
                       ,StructField("col12", StringType(), True)
                       ,StructField("col13", IntegerType(), True)
                       ,StructField("col14", IntegerType(), True)
                       ,StructField("col15", IntegerType(), True)
                       ,StructField("col16", IntegerType(), True)
                       ,StructField("col17", IntegerType(), True)
                       ,StructField("col18", IntegerType(), True)
                       ,StructField("col19", IntegerType(), True)
                       ,StructField("col20", IntegerType(), True)
                       ,StructField("col21", IntegerType(), True)
                       ,StructField("col22", IntegerType(), True)
                       ,StructField("col23", IntegerType(), True)
                       ,StructField("col24", IntegerType(), True)
                       ,StructField("col25", IntegerType(), True)])

Nota: True (implica anulable permitido)

crea el marco de datos de pyspark:

df = spark.createDataFrame(pdDF,schema=mySchema)

confirme que el marco de datos de pandas ahora es un marco de datos de pyspark:

type(df)

producción:

pyspark.sql.dataframe.DataFrame

Aparte:

Para abordar el comentario de Kate a continuación, para imponer un esquema general (String), puede hacer lo siguiente:

df=spark.createDataFrame(pdDF.astype(str)) 

Reseñas y valoraciones del post

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *