Saltar al contenido

Cómo hacer que la primera fila sea el encabezado al leer un archivo en PySpark y convertirlo a Pandas Dataframe

este problema se puede solucionar de diferentes formas, pero te dejamos la resolución más completa para nosotros.

Solución:

Hay un par de formas de hacerlo, dependiendo de la estructura exacta de sus datos. Como no das ningún detalle, intentaré mostrarlo usando un archivo de datos. nyctaxicab.csv que puedes descargar.

Si su archivo está en csv formato, debe utilizar el correspondiente spark-csv paquete, proporcionado por Databricks. No es necesario descargarlo explícitamente, simplemente ejecute pyspark como sigue:

$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0

y luego

>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

>>> df.count()
249999

El archivo tiene 250 000 filas, incluido el encabezado, por lo que 249 999 es el número correcto de registros reales. Aquí está el esquema, como lo infiere automáticamente el paquete:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

Puedes ver más detalles en mi publicación de blog relevante.

Si, por cualquier motivo, no puede utilizar el spark-csv paquete, tendrá que restar la primera fila de los datos y luego usarla para construir su esquema. Esta es la idea general, y puede encontrar nuevamente un ejemplo completo con detalles del código en otra publicación de mi blog:

>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
 u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']

# Construct the schema from the header 
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','')  # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)

# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema)  # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas()  # pandas dataframe 

Por brevedad, aquí todas las columnas terminan siendo del tipo stringpero en la publicación del blog muestro en detalle y explico cómo puede refinar aún más los tipos de datos (y nombres) deseados para campos específicos.

La respuesta simple sería establecer header='true'

P.ej:

df = spark.read.csv('housing.csv', header='true')

o

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

Una forma más de hacerlo es a continuación,

log_txt = sc.textFile(file_path)
header = log_txt.first() #get the first row to a variable
fields = [StructField(field_name, StringType(), True) for field_name in header] #get the types of header variable fields
schema = StructType(fields) 
filter_data = log_txt.filter(lambda row:row != header) #remove the first row from or else there will be duplicate rows 
df = spark.createDataFrame(filter_data, schema=schema) #convert to pyspark DF
df.show()

Si haces scroll puedes encontrar las explicaciones de otros creadores, tú aún eres capaz mostrar el tuyo si dominas el tema.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *