Introduction to Hadoop Partitions

Submitted by mmacieje on

In this tutorial we describe basic principles of Hadoop partitions and their use with pyspark. A hadoop partition allows to subdivide a directory into virtual sub-directories. A sub-directory contains data representing a partition of a larger dataset. A partition in set theory is a division of a space into subspaces such that their union makes up the original space again. In other words, the partitions shouldn't overlap one another. An example of partition is separation of a dataset into years (one can take a year for model training, another one for model validation and yet another one for model testing). With a partition one can access either a sub-directory or the entire directory. This brings the following benefits:

  • a partition (e.g., a year) can be selected at the level of reading data (no need for querying the data and selecting a partition afterwards)
  • use of a different schema per partition (in case there are different schema per year)
  • query of entire dataset at once without the need of looping over sub-directories

Before we start, connect to a cluster (in this example the NXCALS cluster), click on the star button on the top and follow the instructions

  • The star button only appears if you have selected a SPARK cluster in the configuration
  • The star button is active after the notebook kernel is ready

Creating and Deleting Hadoop Directories

As a starting point, let's create a directory on my private Hadoop space

!hdfs dfs -mkdir /user/mmacieje/tut_part/

In order to delete a directory use the following expression

  • -R for recursive delete
  • -skipTrash to avoid moving deleted data into a trash
!hdfs dfs -rm -R -skipTrash "/user/mmacieje/tut_part/"

To delete a partition in a directory

!hdfs dfs -rm -R -skipTrash "/user/mmacieje/tut_part/partition=2018/"

Writing to a Partition

After we learned how to delete a partition, we should learn how to create one. To this end, let's create a pandas DataFrame with a single row and several columns for two years: 2018 and 2019.

  • 2018
import pandas as pd

data_2018 = {'year': [2018],'year_max': [300], 'year_avg': [100]}
data_2018_df = pd.DataFrame.from_dict(data_2018)
data_2018_df
  year year_max year_avg
0 2018 300 100

Now we write it to our Hadoop directory under a partition (for more details on write, please consult https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=write#pyspark.sql.DataFrame.write)

df = spark.createDataFrame(data_2018_df)
df.write.csv(path='/user/mmacieje/tut_part/partition=2018/', mode="overwrite", header=True)

We can immediately read the partition (for more details on read, see https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=write#pyspark.sql.SparkSession.read)

df = spark.read.csv(path='/user/mmacieje/tut_part/partition=2018/', header=True).toPandas()
df
  year year_max year_avg
0 2018 300 100
  • 2019
import pandas as pd

data_2019 = {'year': [2019],'year_max': [400], 'year_avg': [200]}
data_2019_df = pd.DataFrame.from_dict(data_2019)
data_2019_df
  year year_max year_avg
0 2019 400 200

Similarly, we write it to our Hadoop directory under a partition

df = spark.createDataFrame(data_2019_df)
df.write.csv(path='/user/mmacieje/tut_part/partition=2019/', mode="overwrite", header=True)

and read

df = spark.read.csv(path='/user/mmacieje/tut_part/partition=2019/', header=True).toPandas()
df
  year year_max year_avg
0 2019 400 200

Reading the main directory

Now, if we skip a partition in the path, we will read all partitions at once.

df = spark.read.csv(path='/user/mmacieje/tut_part/', header=True).toPandas()
df
  year year_max year_avg partition
0 2018 300 100 2018
1 2019 400 200 2019