Esta es el arreglo más acertada que encomtrarás aportar, sin embargo mírala pausadamente y valora si se puede adaptar a tu proyecto.
Solución:
Me las arreglé para que esto funcionara con la última versión de fastparquet & s3fs. A continuación se muestra el código para el mismo:
import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()
#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)
myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()
créditos a Martin por señalarme en la dirección correcta a través de nuestra conversación
nótese bien : Esto sería más lento que usar pyarrow, según el punto de referencia. Actualizaré mi respuesta una vez que se implemente el soporte de s3fs en pyarrow a través de ARROW-1213
Hice una evaluación comparativa rápida en iteraciones individuales con pyarrow y una lista de archivos enviados como un globo a fastparquet. fastparquet es más rápido con s3fs vs pyarrow + mi código hackish. Pero creo que pyarrow + s3fs será más rápido una vez implementado.
El código y los puntos de referencia se encuentran a continuación:
>>> def test_pq():
... for current_file in list_parquet_files:
... f = fs.open(current_file)
... df = pq.read_table(f).to_pandas()
... # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
... #probably not the best way to split :)
... elements_list=current_file.split('/')
... for item in elements_list:
... if item.find(date_partition) != -1:
... current_date = item.split('=')[1]
... elif item.find(dma_partition) != -1:
... current_dma = item.split('=')[1]
... df['serial_number'] = current_dma
... df['cur_date'] = current_date
... list_.append(df)
... frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
... df = fp_obj.to_pandas()
>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317
Actualización 2019
Después de todos los RP, se han resuelto problemas como Arrow-2038 y Fast Parquet – PR # 182.
Leer archivos de parquet con Pyarrow
# pip install pyarrow
# pip install s3fs
>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()
>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://bucket/path'
's3://your-bucket-name/directory_name'
>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas()
Leer archivos de parquet con Fast parquet
# pip install s3fs
# pip install fastparquet
>>> import s3fs
>>> import fastparquet as fp
>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'bucket/path'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"root_dir_path/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)
>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()
Comparativas rápidas
Probablemente esta no sea la mejor manera de compararlo. Por favor, lea la publicación del blog para obtener un punto de referencia.
#pyarrow
>>> import timeit
>>> def test_pq():
... dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
... table = dataset.read()
... df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407
#fastparquet
>>> def test_fp():
... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
... df = fp_obj.to_pandas()
>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028
Más lecturas sobre la velocidad de Pyarrow
Referencia:
- fastparquet
- s3fs
- pyarrow
- código de flecha pyarrow basado en discusión y también documentación
- código de fastparquet basado en discusiones PR-182, PR-182 y también documentación
Para python 3.6+, AWS tiene una biblioteca llamada aws-data-wrangler que ayuda con la integración entre Pandas / S3 / Parquet
para instalar hacer;
pip install awswrangler
para leer parquet particionado de s3 usando awswrangler 1.x.x
y arriba, hazlo;
import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)
Configurando dataset=True
awswrangler espera archivos de parquet particionados. Leerá todos los archivos de parquet individuales de sus particiones debajo del s3 key usted especifica en el path
.
Para aquellos de ustedes que quieran leer solo partes de un archivo de parquet particionado, pyarrow acepta una lista de keys así como solo la ruta del directorio parcial para leer en todas las partes de la partición. Este método es especialmente útil para las organizaciones que han dividido sus conjuntos de datos de parquet de forma significativa, por ejemplo, por año o país, lo que permite a los usuarios especificar qué partes del archivo necesitan. Esto reducirá los costos a largo plazo, ya que AWS cobra por byte al leer conjuntos de datos.
# Read in user specified partitions of a partitioned parquet file
import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()
keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'
,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'
,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'
,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']
bucket = 'bucket_yada_yada_yada'
# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
parq_list.append('s3://'+bucket+'/'+key)
# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()
Nos puedes añadir valor a nuestro contenido informacional participando con tu veteranía en las explicaciones.