Saltar al contenido

Código pyspark de prueba unitaria usando python

[*]Queremos brindarte la mejor solución que encontramos online. Nosotros deseamos que te resulte de ayuda y si puedes aportar alguna mejora hazlo con libertad.

Solución:

[*]Guía de pruebas de Pyspark Unit

[*]1. Debe descargar la distribución de Spark del sitio y descomprimirla. O si ya tiene una distribución funcional de Spark y Python, simplemente instálela chispa: pip install pyspark

[*]2. Establezca variables del sistema como esta si es necesario:

export SPARK_HOME="/home/eugene/spark-1.6.0-bin-hadoop2.6"
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PATH="SPARK_HOME/bin:$PATH"

[*]Agregué esto en .profile en mi directorio de inicio. Si ya tiene una distribución funcional de Spark, se pueden configurar estas variables.

[*]3. Además, es posible que deba configurar:

PYSPARK_SUBMIT_ARGS="--jars path/to/hive/jars/jar.jar,path/to/other/jars/jar.jar --conf spark.driver.userClassPathFirst=true --master local[*] pyspark-shell"
PYSPARK_PYTHON="/home/eugene/anaconda3/envs/ste/bin/python3"

[*]Python y tarros? Si. Pyspark usa py4j para comunicarse con la parte Java de Spark. Y si desea resolver una situación más complicada, como ejecutar el servidor Kafka con pruebas en Python o usar TestHiveContext de Scala como en el ejemplo, debe especificar jars. Lo hice a través de las variables de entorno de configuración de ejecución de Idea.

[*]4. Y podrías usar pyspark/tests.py, pyspark/streaming/tests.py, pyspark/sql/tests.py, pyspark/ml/tests.py, pyspark/mllib/tests.pyscripts que contienen varias clases TestCase y ejemplos para probar aplicaciones pyspark. En su caso, podría hacerlo (ejemplo de pyspark/sql/tests.py):

class HiveContextSQLTests(ReusedPySparkTestCase):

    @classmethod
    def setUpClass(cls):
        ReusedPySparkTestCase.setUpClass()
        cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
        try:
            cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
        except py4j.protocol.Py4JError:
            cls.tearDownClass()
            raise unittest.SkipTest("Hive is not available")
        except TypeError:
            cls.tearDownClass()
            raise unittest.SkipTest("Hive is not available")
        os.unlink(cls.tempdir.name)
        _scala_HiveContext =
            cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
        cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
        cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
        cls.df = cls.sc.parallelize(cls.testData).toDF()

    @classmethod
    def tearDownClass(cls):
        ReusedPySparkTestCase.tearDownClass()
        shutil.rmtree(cls.tempdir.name, ignore_errors=True)

[*]pero debe especificar –jars con Hive libs en PYSPARK_SUBMIT_ARGS como se describe anteriormente

[*]o sin colmena:

class SQLContextTests(ReusedPySparkTestCase):
    def test_get_or_create(self):
        sqlCtx = SQLContext.getOrCreate(self.sc)
        self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)

[*]Como sé si pyspark se ha instalado a través de pipno ha descrito tests.py en el ejemplo. En este caso, simplemente descargue la distribución del sitio de Spark y copie los ejemplos de código.

[*]Ahora puede ejecutar su TestCase como de costumbre: python -m unittest test.py

[*]actualizar:
Dado que Spark 2.3, el uso de HiveContext y SqlContext está en desuso. Podría usar la API SparkSession Hive.

[*]Asumiendo que tienes pyspark instalado (por ejemplo pip install pyspark en un venv), puede usar la clase a continuación para realizar pruebas unitarias en unittest:

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[*]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)
        cls.spark = pyspark.SQLContext(cls.sc)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()

[*]Ejemplo:

class SimpleTestCase(PySparkTestCase):

    def test_with_rdd(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])

    def test_with_df(self):
        df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']], 
                                        schema=['c1', 'c2'])
        self.assertEqual(df.count(), 2)

[*]Tenga en cuenta que esto crea un contexto por clase. Utilizar setUp en lugar de setUpClass para obtener un contexto por prueba. Por lo general, esto agrega una gran cantidad de tiempo de sobrecarga en la ejecución de las pruebas, ya que la creación de un nuevo contexto Spark es costosa actualmente.

[*]Esta es una forma de hacerlo. En la llamada CLI:

python -m unittest my_unit_test_script.py

[*]Código

import functools
import unittest

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext


def rename_chars(column_name):
    chars = ((' ', '_&'), ('.', '_$'))
    new_cols = functools.reduce(lambda a, kv: a.replace(*kv), chars, column_name)
    return new_cols


def column_names(df):
    changed_col_names = df.schema.names
    for cols in changed_col_names:
        df = df.withColumnRenamed(cols, rename_chars(cols))
    return df


class RenameColumnNames(unittest.TestCase):
    def setUp(self):
        conf = SparkConf()
        sc = SparkContext(conf=conf)
        self.sqlContext = HiveContext(sc)

    def test_column_names(self):
        cols = ['ID', 'NAME', 'last.name', 'abc test']
        val = [(1, 'Sam', 'SMITH', 'eng'), (2, 'RAM', 'Reddy', 'turbine')]
        df = self.sqlContext.createDataFrame(val, cols)
        result = df.schema.names
        expected = ['ID', 'NAME', 'last_$name', 'abc_&test']
        self.assertEqual(result, expected)

Puntuaciones y reseñas

[*]Si eres capaz, tienes el poder dejar un enunciado acerca de qué te ha gustado de este escrito.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *