Using PySpark’s PartitionBy Function for String Column in Data Partitioning

In the previous blog post, we looked into data partitioning in Spark applications and the benefits it brings to the table. An essential consideration for data partitioning is deciding which data field or column to employ as the basis for partitioning your data.

When your data includes categorical or temporal columns, the choice for a partitioning column is relatively straightforward. However, what if you find yourself faced with a string column that needs to serve as the partitioning factor? It doesn’t make sense to partition data based on a multitude of distinct string values, and this approach won’t necessarily lead to optimal Spark performance.

There’s a clever technique that can be employed to partition Spark DataFrames based on the values within a string column. It involves using the first letter of each string value as the partitioning criterion. This method not only simplifies the partitioning process but can also enhance the efficiency of your Spark operations.

Here’s a simple example on performing data partitioning based on the first letter of a string column.

# create a list of tuples containing data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), 
              ("Dave", 40), ("Eliza", 23), ("'Owen", 21)]

# create a PySpark DataFrame from the list of tuples
df = spark.createDataFrame(data, ["Name", "Age"])

# Create partioning column and add it to the DataFrame
df = df.withColumn("Name_first_letter", df.Name.substr(1, 1))

# Remove non-alphanueric characters from Name_first_letter column
# This ensure that the partitioning column is a valid directory name
df = df.withColumn("Name_first_letter", 
                  df.Name_first_letter.regexp_replace("[^a-zA-Z0-9]", "_"))

# Write the PySpark DataFrame to a Parquet file 
# partitioned by Name_first_letter
df.write.partitionBy("Name_first_letter").parquet("data/output/df_parquet")

Partitioning a PySpark DataFrame by the first letter of the values in a string column can have several advantages, depending on your specific use case and data distribution. Here are some potential advantages:

  • Improved Query Performance: Partitioning can significantly improve query performance when you often filter, or aggregate data based on the first letter of the string column. By partitioning the data by the first letter, you can avoid scanning the entire DataFrame for each query, and instead, the query engine can efficiently skip irrelevant partitions.
  • Faster Data Retrieval: When you’re looking for specific records that start with a particular letter, partitioning makes it faster to retrieve those records. It reduces the amount of data that needs to be read and processed.
  • Parallelism: Partitioning allows for better parallelism in data processing. Different tasks can work on different partitions concurrently, which can lead to improved overall performance.
  • Reduced Shuffling: Partitioning can reduce data shuffling during operations like joins and aggregations.
  • Easier Data Maintenance: If your data frequently changes, partitioning can make it easier to update or append data to specific partitions without affecting the entire dataset. This can be particularly useful for scenarios where you have a large historical dataset and regularly receive new data.

It’s worth noting that while partitioning can offer these advantages, it also comes with some trade-offs. Partitioning can increase storage overhead because it creates multiple directories/files on the storage system. Additionally, it requires careful planning to choose the right partitioning strategy based on your query patterns and data characteristics.  

Optimising Spark Analytics: The PartitionBy Function

Apache Spark is an open source, distributed computing framework designed for big data processing and analytics.  Spark was created to address the shortcomings of MapReduce in terms of use and versatility.

Recently I was using Spark framework (Through PySpark API) on Azure Synapse Analytics extensively for analysing bulks of retail data.  Following better optimising approaches makes a huge difference in runtimes when it comes to Spark queries.

Among several Spark optimisation methods, I thought of discussing the importance of data partitioning in Spark and ways we can partition data.

Most of the enterprises are adapting data Lakehouse architecture today and it’s common that most of the data is stored in parquet format. Parquet is a columnar storage file format which is designed to optimise performance when dealing with large analytical workloads.

With some experiments I did with large datasets, I found “partitionBy” option with parquet in Spark makes a huge difference in query performance in most of the use cases. It specifies how the data should be organized within the storage system based on one or more columns in your dataset.

Using “partitionBy” column values has several advantages.

  • Improved Query Performance: When you partition data by specific column values, Spark can take advantage of partition pruning during query execution. This means that when you run a query that filters or selects data based on the partitioned column, Spark can skip reading irrelevant partitions, significantly reducing the amount of data that needs to be processed. This results in faster query execution.
  • Optimized Joins: If you often join tables based on the partitioned column, partitioning by that column can lead to more efficient join operations. Spark can perform joins on data that is collocated in the same partitions, minimizing data shuffling and reducing the overall execution time of the join operations.
  • Reduced Data Skew: Data skew occurs when certain values in a column are much more frequent than others. By partitioning data, you can distribute the skewed values across multiple partitions, preventing a single partition from becoming a bottleneck during processing.
  • Simplified Data Management: Each partition represents a distinct subset of your data based on the partitioned column’s values. This organization simplifies tasks such as data backup, archiving, and deletion, as you can target specific partitions rather than working with entire datasets.
  • Enhanced Parallelism: Different partitions can be processed concurrently by separate tasks or executors, maximizing resource utilization and speeding up data processing tasks.
  • Facilitates Time-Based Queries: If you have time-series data, partitioning by a timestamp or date column can be especially useful. It allows you to efficiently perform time-based queries, such as retrieving data for a specific date range, by only reading the relevant partitions.
  • Easier Maintenance: Partitioning can simplify data maintenance tasks, like adding new data or updating existing data. You can easily add new partitions or drop old ones without affecting the entire dataset.

Let’s walk through a practical experience to understand the advantages of using partitionBy column values in Spark.

Scenario: Imagine you’re working with a large dataset of sales transactions from an e-commerce platform. Each transaction has various attributes, including a timestamp representing the transaction date. You want to analyse the data to find out which products sold the most during a specific date range.

Step 1 – Partitioning by Date: First, partition the sales data by the transaction date using the partitionBy command in Spark when writing the data to the storage. Each partition represents a specific date, making it easy to organize and query the data.

# Specify the partition column
partition_column = "transaction_date"

# Write the DataFrame to Parquet with partitionBy
df.write.partitionBy(partition_column).parquet("/path/to/output/directory")

Step 2 – Query for Date Range: Let’s say you want to analyse sales for a specific date range. When you run your PySpark query, it automatically prunes irrelevant partitions and only reads the partitions that contain data for the selected date range.

# Read the partitioned data from Parquet
partitioned_df = spark.read.parquet("/path/to/output/directory")

# Query data for a specific date
specific_date = date(2023, 7, 1)
result = partitioned_df.filter(partitioned_df[partition_column] == specific_date)

result.show()

In summary, using partitionBy column values in Spark can significantly enhance query performance, optimize joins, improve data organization and management, and provide advantages in terms of data skew handling and parallelism. It’s a powerful technique for optimizing Spark jobs, particularly when dealing with large and diverse datasets.

Share your experience and approaches you use when it comes to optimising Spark query performance.

Managing Python Libraries on Azure Synapse Apache Spark Pools

Azure Synapse Analytics is Microsoft’s one-stop shop that integrates big data analytics and data warehousing. It features Apache Spark pools to provide scalable and cost-effective processing power for Spark workloads which enables you to quickly and easily process and analyse large volumes of data at scale.

I have been working with Spark environment on Azure Synapse for a while and thought of documenting the experience I had with installing external python libraries for Spark pools on Azure. This guideline may come handy for you if you are performing your big data analytics experiments with specific python libraries.

Apache Spark pools on Azure Synapse Workspaces comes with Anaconda python distribution and with pip as a package manager. Most of the native python libraries used in the data analytics space are already installed. If you need any additional packages to be installed and used within your scripts, there are 3 ways you can do it on Synapse.

  • Use magic command on notebooks to install packages in session level.
  • Upload the python package as a workspace package and install in Spark pool.
  • Install packages using PIP or conda input file.

01. Use magic command on notebooks to install packages in session level.

Install packages for session level on Notebooks

This is the most simple and straight forward way of installing a python package in Spark session level. You just have to use the magic command followed up with the usual pythonic way of installing the package through pip or conda. Though this is easy for prototyping and quick experiments, it’s pointless if you are installing it over and over again when you start a new spark session. Better to avoid this method in production environments. Good for rapid prototyping experiments.

02. Upload the python package as a workspace package and install in Spark pool.

Upload python libraries as workspace packages

Azure Synapse workspace allows to have workspace packages uploaded and install on the Spark pools. It accepts python wheels (.whl files), jar files or tar.gz as packages.

After uploading the packages go for specific Apache Spark pool and then select the packages you want to install on it. The initial installation may take few minutes. (In my case, it took around 20 mins to install 3 packages)

With the experience I had with different python packages, I would stick with python wheels from pip or jars from official package distributions. I tried sentence-transformers tar.gz file from pyPI (https://pypi.org/project/sentence-transformers/ ). It gave me an error during the installation process mentioning a package dependency with R (which is confusing)

03. Install packages using PIP or conda input file.

Upload the requirement files

If you are familiar with building conda environments or docker configurations, having a package list as a config file should not be new to you. You can specify either a .txt file or an .yml file with the desired package versions to be installed to the Spark cluster. If you want to specify a specific channel to get libraries, you should use a .yml file.

For an example if you need to install sentence-transformers package and spark-nlp python packages which are used in NLP for the Spark environment, you should add these two line in a .txt file and upload it as the input file for the workspace.

sentence-transformers===2.2.2
spark-nlp===4.4.1

I find this option as the most robust way of installing python packages to a Spark since it gives you the flexibility to select the desired package version and the specific channel to use during installation. It saves the time from installing the packages each time when the session get refreshed too.

Let me know your experience and the lessons learned during experiments with Apache Spark pools on Azure Synapse Analytics.

Here’s the Azure documentation on this for further reference.