Optimising Spark Analytics: The PartitionBy Function

Apache Spark is an open source, distributed computing framework designed for big data processing and analytics.  Spark was created to address the shortcomings of MapReduce in terms of use and versatility.

Recently I was using Spark framework (Through PySpark API) on Azure Synapse Analytics extensively for analysing bulks of retail data.  Following better optimising approaches makes a huge difference in runtimes when it comes to Spark queries.

Among several Spark optimisation methods, I thought of discussing the importance of data partitioning in Spark and ways we can partition data.

Most of the enterprises are adapting data Lakehouse architecture today and it’s common that most of the data is stored in parquet format. Parquet is a columnar storage file format which is designed to optimise performance when dealing with large analytical workloads.

With some experiments I did with large datasets, I found “partitionBy” option with parquet in Spark makes a huge difference in query performance in most of the use cases. It specifies how the data should be organized within the storage system based on one or more columns in your dataset.

Using “partitionBy” column values has several advantages.

  • Improved Query Performance: When you partition data by specific column values, Spark can take advantage of partition pruning during query execution. This means that when you run a query that filters or selects data based on the partitioned column, Spark can skip reading irrelevant partitions, significantly reducing the amount of data that needs to be processed. This results in faster query execution.
  • Optimized Joins: If you often join tables based on the partitioned column, partitioning by that column can lead to more efficient join operations. Spark can perform joins on data that is collocated in the same partitions, minimizing data shuffling and reducing the overall execution time of the join operations.
  • Reduced Data Skew: Data skew occurs when certain values in a column are much more frequent than others. By partitioning data, you can distribute the skewed values across multiple partitions, preventing a single partition from becoming a bottleneck during processing.
  • Simplified Data Management: Each partition represents a distinct subset of your data based on the partitioned column’s values. This organization simplifies tasks such as data backup, archiving, and deletion, as you can target specific partitions rather than working with entire datasets.
  • Enhanced Parallelism: Different partitions can be processed concurrently by separate tasks or executors, maximizing resource utilization and speeding up data processing tasks.
  • Facilitates Time-Based Queries: If you have time-series data, partitioning by a timestamp or date column can be especially useful. It allows you to efficiently perform time-based queries, such as retrieving data for a specific date range, by only reading the relevant partitions.
  • Easier Maintenance: Partitioning can simplify data maintenance tasks, like adding new data or updating existing data. You can easily add new partitions or drop old ones without affecting the entire dataset.

Let’s walk through a practical experience to understand the advantages of using partitionBy column values in Spark.

Scenario: Imagine you’re working with a large dataset of sales transactions from an e-commerce platform. Each transaction has various attributes, including a timestamp representing the transaction date. You want to analyse the data to find out which products sold the most during a specific date range.

Step 1 – Partitioning by Date: First, partition the sales data by the transaction date using the partitionBy command in Spark when writing the data to the storage. Each partition represents a specific date, making it easy to organize and query the data.

# Specify the partition column
partition_column = "transaction_date"

# Write the DataFrame to Parquet with partitionBy
df.write.partitionBy(partition_column).parquet("/path/to/output/directory")

Step 2 – Query for Date Range: Let’s say you want to analyse sales for a specific date range. When you run your PySpark query, it automatically prunes irrelevant partitions and only reads the partitions that contain data for the selected date range.

# Read the partitioned data from Parquet
partitioned_df = spark.read.parquet("/path/to/output/directory")

# Query data for a specific date
specific_date = date(2023, 7, 1)
result = partitioned_df.filter(partitioned_df[partition_column] == specific_date)

result.show()

In summary, using partitionBy column values in Spark can significantly enhance query performance, optimize joins, improve data organization and management, and provide advantages in terms of data skew handling and parallelism. It’s a powerful technique for optimizing Spark jobs, particularly when dealing with large and diverse datasets.

Share your experience and approaches you use when it comes to optimising Spark query performance.

Managing Python Libraries on Azure Synapse Apache Spark Pools

Azure Synapse Analytics is Microsoft’s one-stop shop that integrates big data analytics and data warehousing. It features Apache Spark pools to provide scalable and cost-effective processing power for Spark workloads which enables you to quickly and easily process and analyse large volumes of data at scale.

I have been working with Spark environment on Azure Synapse for a while and thought of documenting the experience I had with installing external python libraries for Spark pools on Azure. This guideline may come handy for you if you are performing your big data analytics experiments with specific python libraries.

Apache Spark pools on Azure Synapse Workspaces comes with Anaconda python distribution and with pip as a package manager. Most of the native python libraries used in the data analytics space are already installed. If you need any additional packages to be installed and used within your scripts, there are 3 ways you can do it on Synapse.

  • Use magic command on notebooks to install packages in session level.
  • Upload the python package as a workspace package and install in Spark pool.
  • Install packages using PIP or conda input file.

01. Use magic command on notebooks to install packages in session level.

Install packages for session level on Notebooks

This is the most simple and straight forward way of installing a python package in Spark session level. You just have to use the magic command followed up with the usual pythonic way of installing the package through pip or conda. Though this is easy for prototyping and quick experiments, it’s pointless if you are installing it over and over again when you start a new spark session. Better to avoid this method in production environments. Good for rapid prototyping experiments.

02. Upload the python package as a workspace package and install in Spark pool.

Upload python libraries as workspace packages

Azure Synapse workspace allows to have workspace packages uploaded and install on the Spark pools. It accepts python wheels (.whl files), jar files or tar.gz as packages.

After uploading the packages go for specific Apache Spark pool and then select the packages you want to install on it. The initial installation may take few minutes. (In my case, it took around 20 mins to install 3 packages)

With the experience I had with different python packages, I would stick with python wheels from pip or jars from official package distributions. I tried sentence-transformers tar.gz file from pyPI (https://pypi.org/project/sentence-transformers/ ). It gave me an error during the installation process mentioning a package dependency with R (which is confusing)

03. Install packages using PIP or conda input file.

Upload the requirement files

If you are familiar with building conda environments or docker configurations, having a package list as a config file should not be new to you. You can specify either a .txt file or an .yml file with the desired package versions to be installed to the Spark cluster. If you want to specify a specific channel to get libraries, you should use a .yml file.

For an example if you need to install sentence-transformers package and spark-nlp python packages which are used in NLP for the Spark environment, you should add these two line in a .txt file and upload it as the input file for the workspace.

sentence-transformers===2.2.2
spark-nlp===4.4.1

I find this option as the most robust way of installing python packages to a Spark since it gives you the flexibility to select the desired package version and the specific channel to use during installation. It saves the time from installing the packages each time when the session get refreshed too.

Let me know your experience and the lessons learned during experiments with Apache Spark pools on Azure Synapse Analytics.

Here’s the Azure documentation on this for further reference.