[*]Queremos brindarte la mejor solución que encontramos online. Nosotros deseamos que te resulte de ayuda y si puedes aportar alguna mejora hazlo con libertad.
Solución:
[*]Guía de pruebas de Pyspark Unit
[*]1. Debe descargar la distribución de Spark del sitio y descomprimirla. O si ya tiene una distribución funcional de Spark y Python, simplemente instálela chispa: pip install pyspark
[*]2. Establezca variables del sistema como esta si es necesario:
export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6"
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PATH="SPARK_HOME/bin:$PATH"
[*]Agregué esto en .profile en mi directorio de inicio. Si ya tiene una distribución funcional de Spark, se pueden configurar estas variables.
[*]3. Además, es posible que deba configurar:
PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell"
PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"
[*]Python y tarros? Si. Pyspark usa py4j para comunicarse con la parte Java de Spark. Y si desea resolver una situación más complicada, como ejecutar el servidor Kafka con pruebas en Python o usar TestHiveContext de Scala como en el ejemplo, debe especificar jars. Lo hice a través de las variables de entorno de configuración de ejecución de Idea.
[*]4. Y podrías usar pyspark/tests.py
, pyspark/streaming/tests.py
, pyspark/sql/tests.py
, pyspark/ml/tests.py
, pyspark/mllib/tests.py
scripts que contienen varias clases TestCase y ejemplos para probar aplicaciones pyspark. En su caso, podría hacerlo (ejemplo de pyspark/sql/tests.py):
class HiveContextSQLTests(ReusedPySparkTestCase):
@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
_scala_HiveContext =
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
shutil.rmtree(cls.tempdir.name, ignore_errors=True)
[*]pero debe especificar –jars con Hive libs en PYSPARK_SUBMIT_ARGS como se describe anteriormente
[*]o sin colmena:
class SQLContextTests(ReusedPySparkTestCase):
def test_get_or_create(self):
sqlCtx = SQLContext.getOrCreate(self.sc)
self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)
[*]Como sé si pyspark se ha instalado a través de pip
no ha descrito tests.py en el ejemplo. En este caso, simplemente descargue la distribución del sitio de Spark y copie los ejemplos de código.
[*]Ahora puede ejecutar su TestCase como de costumbre: python -m unittest test.py
[*]actualizar:
Dado que Spark 2.3, el uso de HiveContext y SqlContext está en desuso. Podría usar la API SparkSession Hive.
[*]Asumiendo que tienes pyspark
instalado (por ejemplo pip install pyspark
en un venv), puede usar la clase a continuación para realizar pruebas unitarias en unittest
:
import unittest
import pyspark
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = pyspark.SparkConf().setMaster("local[*]").setAppName("testing")
cls.sc = pyspark.SparkContext(conf=conf)
cls.spark = pyspark.SQLContext(cls.sc)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
[*]Ejemplo:
class SimpleTestCase(PySparkTestCase):
def test_with_rdd(self):
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = self.sc.parallelize(test_input, 1)
from operator import add
results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])
def test_with_df(self):
df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']],
schema=['c1', 'c2'])
self.assertEqual(df.count(), 2)
[*]Tenga en cuenta que esto crea un contexto por clase. Utilizar setUp
en lugar de setUpClass
para obtener un contexto por prueba. Por lo general, esto agrega una gran cantidad de tiempo de sobrecarga en la ejecución de las pruebas, ya que la creación de un nuevo contexto Spark es costosa actualmente.
[*]Esta es una forma de hacerlo. En la llamada CLI:
python -m unittest my_unit_test_script.py
[*]Código
import functools
import unittest
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
def rename_chars(column_name):
chars = ((' ', '_&'), ('.', '_$'))
new_cols = functools.reduce(lambda a, kv: a.replace(*kv), chars, column_name)
return new_cols
def column_names(df):
changed_col_names = df.schema.names
for cols in changed_col_names:
df = df.withColumnRenamed(cols, rename_chars(cols))
return df
class RenameColumnNames(unittest.TestCase):
def setUp(self):
conf = SparkConf()
sc = SparkContext(conf=conf)
self.sqlContext = HiveContext(sc)
def test_column_names(self):
cols = ['ID', 'NAME', 'last.name', 'abc test']
val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
df = self.sqlContext.createDataFrame(val, cols)
result = df.schema.names
expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
self.assertEqual(result, expected)
Puntuaciones y reseñas
[*]Si eres capaz, tienes el poder dejar un enunciado acerca de qué te ha gustado de este escrito.