Data Processing - (Py)Spark

Spark uses Resilient Distributed Dataset (RDD).

Spark

Clusters

In one cluster, we have a driver is responsible for managing the tasks, result consolidation, and also shared data access1.

PySpark

PySpark provides

  • SparkContext
    • SparkSession

A spark dataframe is immutable. This makes it tricky to update a dataframe.

Useful Commands

Get Session

In pyspark, we can also get or create the session using the following method.

pyspark.sql.SparkSession.builder.getOrCreate()

List all Tables

A pyspark.sql.SparkSession has property catalog and can be used to list the tables.

pyspark.sql.SparkSession.catalog.listTables()

Run SQL Query

Given a SQL query query, we can query the table using .sql() method of the session.

query = ...
pyspark.sql.SparkSession.sql(query)

Convert Dataframe to Pandas Dataframe

.toPandas()

Convert Pandas Dataframe to Spark Dataframe

To create a spark dataframe from a pandas dataframe,

.createDataFrame(a_pandas_dataframe)

Note that the dataframe is not yet in the catalog of the session.

  • It is only stored locally.
  • SQL query doesn’t work.

Use Query on local dataframe

To be able to query the dataframe, it has to be in the catalog. A dataframe has the method .createOrReplaceTempView("a_table_name"). This will create a table that can be used as long as the session is alive.

Once a table is in the catalog, we can create a dataframe using

pyspark.sql.SparkSession.table("a_table_name")

explain

sdf.explain can be used to show the plan of the execution.

Improve performance by copying data to workers

spark.sql.functions.broadcast can speed up some tasks such as join and count.

import pyspark.sql.functions as F

F.broadcast(sdf).count()

sdf_1.join(F.broadcast(sdf_2), sdf_1.col_1 = sdf_2.col_1)

Updating Dataframe

Create and Add

spark_df = spark_df.withColumn("a_new_col", df.existing_col * 3.14)

Contrary to our intuition, this will create a new dataframe with all the original columns and this new column "a_new_col".

A comparison would be the .select() method of a dataframe.

Update the Type of a Column

df.col_1.cast("integer")

PySpark can convert string representation of integers to integers. In the above example, it works even if df.col_1 contains string representation of some integers.

filter

SQL Where clause

.filter("sql query where clause")

or Python

.filter(df.col_1 < 0)

In chained methods, it is important to make sure each step has the columns required for the method to run. For example, if a dataframe has columns "Name", "Age", "Skills", the following won’t work as "Age" is not among the selected columns,

sdf.select("Name", "Skills").filter("Age > 30")

Select a Newly Created Column

.selectExpr("col_1 * 3.14 as col_1_mul_pi")

.selectExpr works similar to .select, we can select other columns just like what we do in the SQL select clause

.selectExpr("col_1", "col_2", "col_1 * 3.14 as col_1_mul_pi")

Grouping

Use .groupBy.

  • pyspark.sql.GroupedData has some useful methods, e.g., .avg, count.
  • Use .agg(pyspark.sql.functions.x_y_z_function) to aggregate.

Joining Tables

.join

Pipelines

pyspark.ml.Pipeline

ML on Spark

  • pyspark.ml
    • pyspark.ml.Transformer
      • pyspark.ml.Transformer.transform(): input DataFrame, output DataFrame
    • pyspark.ml.Estimator
      • pyspark.ml.Estimator.fit(): input DataFrame, output model object.
    • pyspark.ml.features
    • pyspark.ml.Pipeline
      • Pipeline(stages=[...])
      • Once a pipeline is formed, we can a_pipeline.fit(my_spark_df).transform(my_spark_df)
    • pyspark.ml.evaluation
      • pyspark.ml.evaluation.BinaryClassificationEvaluator()
    • pyspark.ml.tuning
      • pyspark.ml.tuning.ParamGridBuilder
      • pyspark.ml.tuning.CrossValidator
  • pyspark dataframe has the method .randomSplit()

  1. dc-clean-spark-perform Cluster configurations - Cleaning Data with PySpark  ↩︎

Planted: by ;

L Ma (2022). 'Data Processing - (Py)Spark', Datumorphism, 01 April. Available at: https://datumorphism.leima.is/wiki/tools/data-processing-spark/.