Data Processing - (Py)Spark
Spark uses Resilient Distributed Dataset (RDD).
Spark
Clusters
In one cluster, we have a driver is responsible for managing the tasks, result consolidation, and also shared data access1.
PySpark
PySpark provides
SparkContext
SparkSession
A spark dataframe is immutable. This makes it tricky to update a dataframe.
Useful Commands
Get Session
In pyspark, we can also get or create the session using the following method.
pyspark.sql.SparkSession.builder.getOrCreate()
List all Tables
A pyspark.sql.SparkSession
has property catalog
and can be used to list the tables.
pyspark.sql.SparkSession.catalog.listTables()
Run SQL Query
Given a SQL query query
, we can query the table using .sql()
method of the session.
query = ...
pyspark.sql.SparkSession.sql(query)
Convert Dataframe to Pandas Dataframe
.toPandas()
Convert Pandas Dataframe to Spark Dataframe
To create a spark dataframe from a pandas dataframe,
.createDataFrame(a_pandas_dataframe)
Note that the dataframe is not yet in the catalog of the session.
- It is only stored locally.
- SQL query doesn’t work.
Use Query on local dataframe
To be able to query the dataframe, it has to be in the catalog. A dataframe has the method .createOrReplaceTempView("a_table_name")
. This will create a table that can be used as long as the session is alive.
Once a table is in the catalog, we can create a dataframe using
pyspark.sql.SparkSession.table("a_table_name")
explain
sdf.explain
can be used to show the plan of the execution.
Improve performance by copying data to workers
spark.sql.functions.broadcast
can speed up some tasks such as join and count.
import pyspark.sql.functions as F
F.broadcast(sdf).count()
sdf_1.join(F.broadcast(sdf_2), sdf_1.col_1 = sdf_2.col_1)
Updating Dataframe
Create and Add
spark_df = spark_df.withColumn("a_new_col", df.existing_col * 3.14)
Contrary to our intuition, this will create a new dataframe with all the original columns and this new column "a_new_col"
.
A comparison would be the .select()
method of a dataframe.
Update the Type of a Column
df.col_1.cast("integer")
PySpark can convert string representation of integers to integers. In the above example, it works even if df.col_1
contains string representation of some integers.
filter
SQL Where clause
.filter("sql query where clause")
or Python
.filter(df.col_1 < 0)
In chained methods, it is important to make sure each step has the columns required for the method to run. For example, if a dataframe has columns "Name"
, "Age"
, "Skills"
, the following won’t work as "Age"
is not among the selected columns,
sdf.select("Name", "Skills").filter("Age > 30")
Select a Newly Created Column
.selectExpr("col_1 * 3.14 as col_1_mul_pi")
.selectExpr
works similar to .select
, we can select other columns just like what we do in the SQL select clause
.selectExpr("col_1", "col_2", "col_1 * 3.14 as col_1_mul_pi")
Grouping
Use .groupBy
.
pyspark.sql.GroupedData
has some useful methods, e.g.,.avg
,count
.- Use
.agg(pyspark.sql.functions.x_y_z_function)
to aggregate.
Joining Tables
Pipelines
ML on Spark
pyspark.ml
pyspark.ml.Transformer
pyspark.ml.Transformer.transform()
: input DataFrame, output DataFrame
pyspark.ml.Estimator
pyspark.ml.Estimator.fit()
: input DataFrame, output model object.
pyspark.ml.features
pyspark.ml.Pipeline
Pipeline(stages=[...])
- Once a pipeline is formed, we can
a_pipeline.fit(my_spark_df).transform(my_spark_df)
pyspark.ml.evaluation
pyspark.ml.evaluation.BinaryClassificationEvaluator()
pyspark.ml.tuning
pyspark.ml.tuning.ParamGridBuilder
pyspark.ml.tuning.CrossValidator
- pyspark dataframe has the method
.randomSplit()
wiki/tools/data-processing-spark
:L Ma (2022). 'Data Processing - (Py)Spark', Datumorphism, 01 April. Available at: https://datumorphism.leima.is/wiki/tools/data-processing-spark/.