In this tutorial we describe basic principles of Hadoop partitions and their use with pyspark. A hadoop partition allows to subdivide a directory into virtual sub-directories. A sub-directory contains data representing a partition of a larger dataset. A partition in set theory is a division of a space into subspaces such that their union makes up the original space again. In other words, the partitions shouldn't overlap one another. An example of partition is separation of a dataset into years (one can take a year for model training, another one for model validation and yet another one for model testing). With a partition one can access either a sub-directory or the entire directory. This brings the following benefits:
- a partition (e.g., a year) can be selected at the level of reading data (no need for querying the data and selecting a partition afterwards)
- use of a different schema per partition (in case there are different schema per year)
- query of entire dataset at once without the need of looping over sub-directories
Before we start, connect to a cluster (in this example the NXCALS cluster), click on the star button on the top and follow the instructions
- The star button only appears if you have selected a SPARK cluster in the configuration
- The star button is active after the notebook kernel is ready
Creating and Deleting Hadoop Directories¶
As a starting point, let's create a directory on my private Hadoop space
!hdfs dfs -mkdir /user/mmacieje/tut_part/
In order to delete a directory use the following expression
-R
for recursive delete-skipTrash
to avoid moving deleted data into a trash
!hdfs dfs -rm -R -skipTrash "/user/mmacieje/tut_part/"
To delete a partition in a directory
!hdfs dfs -rm -R -skipTrash "/user/mmacieje/tut_part/partition=2018/"
Writing to a Partition¶
After we learned how to delete a partition, we should learn how to create one. To this end, let's create a pandas DataFrame with a single row and several columns for two years: 2018 and 2019.
- 2018
import pandas as pd
data_2018 = {'year': [2018],'year_max': [300], 'year_avg': [100]}
data_2018_df = pd.DataFrame.from_dict(data_2018)
data_2018_df
Now we write it to our Hadoop directory under a partition (for more details on write, please consult https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=write#pyspark.sql.DataFrame.write)
df = spark.createDataFrame(data_2018_df)
df.write.csv(path='/user/mmacieje/tut_part/partition=2018/', mode="overwrite", header=True)
We can immediately read the partition (for more details on read, see https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=write#pyspark.sql.SparkSession.read)
df = spark.read.csv(path='/user/mmacieje/tut_part/partition=2018/', header=True).toPandas()
df
- 2019
import pandas as pd
data_2019 = {'year': [2019],'year_max': [400], 'year_avg': [200]}
data_2019_df = pd.DataFrame.from_dict(data_2019)
data_2019_df
Similarly, we write it to our Hadoop directory under a partition
df = spark.createDataFrame(data_2019_df)
df.write.csv(path='/user/mmacieje/tut_part/partition=2019/', mode="overwrite", header=True)
and read
df = spark.read.csv(path='/user/mmacieje/tut_part/partition=2019/', header=True).toPandas()
df
Reading the main directory¶
Now, if we skip a partition in the path, we will read all partitions at once.
df = spark.read.csv(path='/user/mmacieje/tut_part/', header=True).toPandas()
df
- Log in to post comments