Solución:
[UPDATE: From Spark 2.2 onwards, PCA and SVD are both available in PySpark – see JIRA ticket SPARK-6227 and PCA & PCAModel for Spark ML 2.2; original answer below is still applicable for older Spark versions.]
Bueno, parece increíble, pero de hecho no hay forma de extraer dicha información de una descomposición de PCA (al menos a partir de Spark 1.5). Pero, de nuevo, ha habido muchas “quejas” similares; consulte aquí, por ejemplo, por no poder extraer los mejores parámetros de un CrossValidatorModel
.
Afortunadamente, hace algunos meses, asistí al MOOC ‘Scalable Machine Learning’ de AMPLab (Berkeley) & Databricks, es decir, los creadores de Spark, donde implementamos un pipeline completo de PCA ‘a mano’ como parte de las tareas escolares. He modificado mis funciones desde entonces (tenga la seguridad, obtuve todo el crédito :-), para trabajar con marcos de datos como entradas (en lugar de RDD), del mismo formato que el suyo (es decir, filas de DenseVectors
que contiene las características numéricas).
Primero necesitamos definir una función intermedia, estimatedCovariance
, como sigue:
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
Entonces, podemos escribir un main pca
funcionan de la siguiente manera:
from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
# Return the `k` principal components, `k` scores, and all eigenvalues
return components.T, score, eigVals
Prueba
Veamos primero los resultados con el método existente, usando los datos de ejemplo de la documentación de Spark ML PCA (modificándolos para que sean todos DenseVectors
):
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
Entonces, con nuestro método:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
Déjame enfatizar que nosotros no usar cualquier collect()
métodos en las funciones que hemos definido – score
es un RDD
, como debería ser.
Observe que los signos de nuestra segunda columna son todos opuestos a los derivados del método existente; pero esto no es un problema: de acuerdo con An Introduction to Statistical Learning (de descarga gratuita), coautor de Hastie & Tibshirani, p. 382
Cada vector de carga de componente principal es único, hasta un cambio de signo. Esto significa que dos paquetes de software diferentes producirán los mismos vectores de carga de componentes principales, aunque los signos de esos vectores de carga pueden diferir. Los signos pueden diferir porque cada vector de carga de componente principal especifica una dirección en el espacio p-dimensional: voltear el signo no tiene ningún efecto ya que la dirección no cambia. […] De manera similar, los vectores de puntuación son únicos hasta un cambio de signo, ya que la varianza de Z es la misma que la varianza de −Z.
Finalmente, ahora que tenemos los valores propios disponibles, es trivial escribir una función para el porcentaje de la varianza explicada:
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
# 0.79439325322305299
Como prueba, también verificamos si la varianza explicada en nuestros datos de ejemplo es 1.0, para k = 5 (ya que los datos originales son de 5 dimensiones):
varianceExplained(df,5)
# 1.0
Esto debería hacer tu trabajo eficientemente; no dude en aportar las aclaraciones que necesite.
[Developed & tested with Spark 1.5.0 & 1.5.1]
EDITAR:
PCA
y SVD
finalmente ambos están disponibles en pyspark a partir de chispa 2.2.0 de acuerdo a este resuelto ticket JIRA SPARK-6227.
Respuesta original:
La respuesta dada por @desertnaut es realmente excelente desde una perspectiva teórica, pero quería presentar otro enfoque sobre cómo calcular la SVD y extraer los vectores propios.
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
class SVD(JavaModelWrapper):
"""Wrapper around the SVD scala case class"""
@property
def U(self):
""" Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
u = self.call("U")
if u is not None:
return RowMatrix(u)
@property
def s(self):
"""Returns a DenseVector with singular values in descending order."""
return self.call("s")
@property
def V(self):
""" Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
return self.call("V")
Esto define nuestro objeto SVD. Ahora podemos definir nuestro método computeSVD usando Java Wrapper.
def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
"""
Computes the singular value decomposition of the RowMatrix.
The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
* s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
* U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
* v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
:param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
:param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
:param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
:returns: SVD object
"""
java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
return SVD(java_model)
Ahora, apliquemos eso a un ejemplo:
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
features = model.transform(df) # this create a DataFrame with the regular features and pca_features
# We can now extract the pca_features to prepare our RowMatrix.
pca_features = features.select("pca_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)
# Once the RowMatrix is ready we can compute our Singular Value Decomposition
svd = computeSVD(mat,2,True)
svd.s
# DenseVector([9.491, 4.6253])
svd.U.rows.collect()
# [DenseVector([0.1129, -0.909]), DenseVector([0.463, 0.4055]), DenseVector([0.8792, -0.0968])]
svd.V
# DenseMatrix(2, 2, [-0.8025, -0.5967, -0.5967, 0.8025], 0)
En Spark 2.2+ ahora puede obtener fácilmente la variación explicada como:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=<columns of your original dataframe>, outputCol="features")
df = assembler.transform(<your original dataframe>).select("features")
from pyspark.ml.feature import PCA
pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
sum(model.explainedVariance)