Saltar al contenido

Soltar la columna anidada de Dataframe con PySpark

Nuestros mejores desarrolladores han agotado sus reservas de café, investigando todo el tiempo por la solución, hasta que Iris encontró el resultado en GitLab y ahora la comparte con nosotros.

Solución:

Ejemplo para pyspark:

def drop_col(df, struct_nm, delete_struct_child_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_struct_child_col_nm, df.select(".*".format(struct_nm)).columns)
    fields_to_keep = list(map(lambda x:  ".".format(struct_nm, x), fields_to_keep))
    return df.withColumn(struct_nm, struct(fields_to_keep))

Un método que encontré usando pyspark es convertir primero la columna anidada en json y luego analizar el json convertido con un nuevo esquema anidado con las columnas no deseadas filtradas.

Supongamos que tengo el siguiente esquema y quiero eliminar d, e y j (a.b.d, a.e, a.h.j) del marco de datos:

root
 |-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)
 |    |-- e: struct (nullable = true)
 |    |    |-- f: long (nullable = true)
 |    |    |-- g: string (nullable = true)
 |    |-- h: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- i: string (nullable = true)
 |    |    |    |-- j: string (nullable = true)
 |-- k: string (nullable = true)

Utilicé el siguiente enfoque:

  1. Crear nuevo esquema para a al excluir d, e y j. Una forma rápida de hacerlo es seleccionando manualmente los campos que desea df.select("a").schema y cree un nuevo esquema a partir de los campos seleccionados usando StructType. O bien, puede hacer esto mediante programación recorriendo el árbol del esquema y excluyendo los campos no deseados, algo como:

    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                elif isinstance(field.dataType, ArrayType):
                    inner_schema = exclude_nested_field(field.dataType.elementType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, ArrayType(inner_schema)))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
    
  2. Convertir a columna a json: .withColumn("json", F.to_json("a")).drop("a")

  3. Analizar el json convertido a columna del paso 2 con el nuevo esquema encontrado en el paso 1: .withColumn("a", F.from_json("json", new_schema)).drop("json")

Sección de Reseñas y Valoraciones

Si te animas, tienes la habilidad dejar un tutorial acerca de qué le añadirías a esta división.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *