Data Processing - (Py)Spark

Spark uses Resilient Distributed Dataset (RDD).



In one cluster, we have a driver is responsible for managing the tasks, result consolidation, and also shared data access1.


PySpark provides

  • SparkContext
    • SparkSession

A spark dataframe is immutable. This makes it tricky to update a dataframe.

Useful Commands

Get Session

In pyspark, we can also get or create the session using the following method.


List all Tables

A pyspark.sql.SparkSession has property catalog and can be used to list the tables.


Run SQL Query

Given a SQL query query, we can query the table using .sql() method of the session.

query = ...

Convert Dataframe to Pandas Dataframe


Convert Pandas Dataframe to Spark Dataframe

To create a spark dataframe from a pandas dataframe,


Note that the dataframe is not yet in the catalog of the session.

  • It is only stored locally.
  • SQL query doesn’t work.

Use Query on local dataframe

To be able to query the dataframe, it has to be in the catalog. A dataframe has the method .createOrReplaceTempView("a_table_name"). This will create a table that can be used as long as the session is alive.

Once a table is in the catalog, we can create a dataframe using



sdf.explain can be used to show the plan of the execution.

Improve performance by copying data to workers

spark.sql.functions.broadcast can speed up some tasks such as join and count.

import pyspark.sql.functions as F


sdf_1.join(F.broadcast(sdf_2), sdf_1.col_1 = sdf_2.col_1)

Updating Dataframe

Create and Add

spark_df = spark_df.withColumn("a_new_col", df.existing_col * 3.14)

Contrary to our intuition, this will create a new dataframe with all the original columns and this new column "a_new_col".

A comparison would be the .select() method of a dataframe.

Update the Type of a Column


PySpark can convert string representation of integers to integers. In the above example, it works even if df.col_1 contains string representation of some integers.


SQL Where clause

.filter("sql query where clause")

or Python

.filter(df.col_1 < 0)

In chained methods, it is important to make sure each step has the columns required for the method to run. For example, if a dataframe has columns "Name", "Age", "Skills", the following won’t work as "Age" is not among the selected columns,"Name", "Skills").filter("Age > 30")

Select a Newly Created Column

.selectExpr("col_1 * 3.14 as col_1_mul_pi")

.selectExpr works similar to .select, we can select other columns just like what we do in the SQL select clause

.selectExpr("col_1", "col_2", "col_1 * 3.14 as col_1_mul_pi")


Use .groupBy.

  • pyspark.sql.GroupedData has some useful methods, e.g., .avg, count.
  • Use .agg(pyspark.sql.functions.x_y_z_function) to aggregate.

Joining Tables



ML on Spark

      • input DataFrame, output DataFrame
      • input DataFrame, output model object.
      • Pipeline(stages=[...])
      • Once a pipeline is formed, we can
  • pyspark dataframe has the method .randomSplit()

  1. dc-clean-spark-perform Cluster configurations - Cleaning Data with PySpark  ↩︎

Planted: by ;

L Ma (2022). 'Data Processing - (Py)Spark', Datumorphism, 01 April. Available at: