Part 3: Data Storage
In this session we’ll talk about reading and writing large data sets. There are a few interrelated topics that arise here, so it’s really helpful to understand that – in terms of the read/write capabilities of the arrow package – we’re focusing almost entirely on the highlighted path in the diagram below:
Why this specific pathway?
- Arrow Datasets are the only data object suitable for larger-than-memory data. Arrow Tables and R data frames both attempt to represent the entire data set in-memory, which simply won’t work here
- Apache Parquet is a modern data format optimized to make life easier when you’re working with big data (as opposed to CSV and Feather, which both have their uses but aren’t ideal for this situation)
- Local storage because it’s easier. Let’s not complicate the story by talking about S3 buckets!
Parquet files
If you work with large data sets already, you may have encountered parquet files before and none of this will be new material for you. But for folks new to this world it’s not as well known, so we’ll talk a little about the Apache Parquet project. Although they are both associated with the Apache Software Foundation, and the arrow package is (as far as we know!) the easiest way to work with parquet files, Apache Parquet is an entirely different project to Apache Arrow.
Parquet files structure a tabular data set in a format that is “row-chunked, column-arranged, and paged”. Here’s what we mean by that. First, we take a table and partition it row-wise into a set of distinct “chunks”, as shown below:
Then, for every chunk and for every column in each chunk, that column is split into a collection of “pages”:
When the table is written to disk, we start writing from chunk 1, column 1, writing each page in order. We then repeat this for every column in this chunk; and then move to the next chunk. This continues until every page has been written:
Importantly for our purposes each chunk, column, and page is preceded by relevant metadata. In addition, a metadata block is written at the end of the file containing information about the table, and the locations of the various constituent parts.
There are two key features to this format that make it desirable when working with large data sets:
- Parquet file readers can use metadata to scan the file intelligently: if we know in advance that only some subset of the table is needed for a query, the reader can skip to the relevant pages
- Data are stored in a compressed binary format, which reduces file size relative to an uncompressed text format such as CSV.
That being said, you probably won’t be surprised to hear that we’re glossing over a lot of details here. You wouldn’t be able to code a parquet reader on the basis of this simplified description! However, that’s not our goal here: the purpose of this description is to give you enough of a sense of how a parquet file is organized, so you can understand why they’re handy when working with large data! You can find a lot more information about these details in the Apache Parquet documentation.
A single parquet file generally contains a quantity of data small enough to load into memory. For example, let’s say I want to load the NYC taxi data for September 2019. This subset of the data can be stored in a 122MB parquet file, and I can load the whole thing into R as a conventional data frame. It’s about 6.5 million rows, but that’s not too much of a challenge:
<- "~/Datasets/nyc-taxi/year=2019/month=9/part-0.parquet"
parquet_file
<- read_parquet(parquet_file)
nyc_taxi_2019_09 nyc_taxi_2019_09
# A tibble: 6,567,396 × 22
vendor_name pickup_datetime dropoff_datetime passenger_count
<chr> <dttm> <dttm> <int>
1 VTS 2019-09-01 15:14:09 2019-09-01 15:31:52 2
2 VTS 2019-09-01 15:36:17 2019-09-01 16:12:44 1
3 VTS 2019-09-01 15:29:19 2019-09-01 15:54:13 1
4 CMT 2019-09-01 15:33:09 2019-09-01 15:52:14 2
5 VTS 2019-09-01 15:57:43 2019-09-01 16:26:21 1
6 CMT 2019-09-01 15:59:16 2019-09-01 16:28:12 1
7 CMT 2019-09-01 15:20:06 2019-09-01 15:52:19 1
8 CMT 2019-09-01 15:27:54 2019-09-01 15:32:56 0
9 CMT 2019-09-01 15:35:08 2019-09-01 15:55:51 0
10 CMT 2019-09-01 15:19:37 2019-09-01 15:30:52 1
# … with 6,567,386 more rows, and 18 more variables: trip_distance <dbl>,
# pickup_longitude <dbl>, pickup_latitude <dbl>, rate_code <chr>,
# store_and_fwd <chr>, dropoff_longitude <dbl>, dropoff_latitude <dbl>,
# payment_type <chr>, fare_amount <dbl>, extra <dbl>, mta_tax <dbl>,
# tip_amount <dbl>, tolls_amount <dbl>, total_amount <dbl>,
# improvement_surcharge <dbl>, congestion_surcharge <dbl>,
# pickup_location_id <int>, dropoff_location_id <int>
One thing to highlight here is that the columnar structure to parquet files makes it possible to load only a subset of the columns:
|>
parquet_file read_parquet(col_select = matches("pickup"))
# A tibble: 6,567,396 × 4
pickup_datetime pickup_longitude pickup_latitude pickup_location_id
<dttm> <dbl> <dbl> <int>
1 2019-09-01 15:14:09 NA NA 186
2 2019-09-01 15:36:17 NA NA 138
3 2019-09-01 15:29:19 NA NA 132
4 2019-09-01 15:33:09 NA NA 79
5 2019-09-01 15:57:43 NA NA 132
6 2019-09-01 15:59:16 NA NA 132
7 2019-09-01 15:20:06 NA NA 132
8 2019-09-01 15:27:54 NA NA 224
9 2019-09-01 15:35:08 NA NA 79
10 2019-09-01 15:19:37 NA NA 97
# … with 6,567,386 more rows
Better yet, the file reader is faster when only a subset of the columns is needed
tic()
|>
parquet_file read_parquet() |>
invisible() # suppress printing
toc()
1.206 sec elapsed
tic()
|>
parquet_file read_parquet(col_select = matches("pickup")) |>
invisible()
toc()
0.183 sec elapsed
This property is handy when dealing with larger-than-memory data: because we can’t load the whole thing into memory, we’re going to have to iteratively read small pieces of the data set. In the next section we’ll talk about how large data sets are typically distributed over many parquet files, but the key thing right now is that whenever we’re loading one of those pieces from a parquet file, an intelligently designed reader will be able to speed things up by reading only the relevant subset each parquet file.
Multi-file data sets
In our hands-on exercises we’ve been working with the NYC Taxi data, a single tabular data set that is split across 158 distinct parquet files. It’s time to take a closer look at how this works. Let’s start by opening the data:
<- open_dataset("~/Datasets/nyc-taxi/")
nyc_taxi nyc_taxi
FileSystemDataset with 158 Parquet files
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
...
The ...
in the output indicates truncation: I’m only showing the first few lines of output because it’s the first line that’s important. The nyc_taxi
object is an Arrow Dataset represented by 158 files. We can inspect the files
field of this object to find the paths to these files:
$files nyc_taxi
[1] "/home/danielle/Datasets/nyc-taxi/year=2009/month=1/part-0.parquet"
[2] "/home/danielle/Datasets/nyc-taxi/year=2009/month=10/part-0.parquet"
[3] "/home/danielle/Datasets/nyc-taxi/year=2009/month=11/part-0.parquet"
[4] "/home/danielle/Datasets/nyc-taxi/year=2009/month=12/part-0.parquet"
[5] "/home/danielle/Datasets/nyc-taxi/year=2009/month=2/part-0.parquet"
...
Notice that the filenames are structured. They’re organised by year and month and – as you might expect – the data for September 2016 can be found in the year=2016/month=9/
folder. Not only that, the folder names correspond to actual field-value pairings in the data. The nyc_taxi
data set has variables named year
and month
, and those variables can take values 2016
and 9
respectively. This convention, in which folders are named using the relevant filed-value pairs, is referred to as “Hive partitioning”, based on the Apache Hive project.
Partitioning the data set in a thoughtful way is one of those tricks used to make large data sets manageable. If I want to compute some quantity based only on the rides that took place in September 2019, I can ask the operating system to open the one file containing that data. My query never has to touch the other 157 files. To give a sense of how much of a difference this makes, let’s compare two different queries. The first one extracts a subset of about 10 million rows, based on the partitioning variables:
|>
nyc_taxi filter(year == 2016, month == 9) |>
nrow()
[1] 10116018
The second one extracts a subset of about the same size, again about 10 million rows, based on the pickup location zone:
|>
nyc_taxi filter(pickup_location_id == 138) |>
nrow()
[1] 10823589
Neither of these queries do very much with the data: they’re just inspecting the metadata to count the number of rows. However, the first query only needs to look at the metadata in one file, whereas the second one has to extract and aggregate data from all 158 files. The difference in compute time is striking:
tic()
|>
nyc_taxi filter(year == 2016, month == 9) |>
nrow() |>
invisible()
toc()
0.014 sec elapsed
tic()
|>
nyc_taxi filter(pickup_location_id == 138) |>
nrow() |>
invisible()
toc()
3.895 sec elapsed
Admittedly, this is a bit of a contrived example, but the core point is still important: partitioning the data set on variables that you’re most likely to query on tends to speed things up.
This leads to a natural question: how many variables should we partition on? The nyc_taxi
data set is partitioned on year
and month
, but there’s nothing stopping us from defining a weekday
variable that takes on values of Sunday, Monday, etc, and using that to define a third level of partitioning. Or we could have chosen to drop month
entirely and partition only on year
. Which approach is best?
The answer, boringly, is that it depends. As a general rule, if you break the data up into too many small data sets, the operating system has to do too much work searching for the files you want; too few, and you end up with some very large and unwieldy files that are hard to move around and search. So there’s often a sweet spot where you partition based on small number of variables (usually those that are used most often in queries) and end up with a manageable number of files of a manageable size. These rough guidelines can help avoid some known worst cases:
- Avoid files smaller than 20MB and larger than 2GB.
- Avoid partitioning layouts with more than 10,000 distinct partitions.
As an aside, you can apply the same guidelines when thinking about how to structure groups within file types such as parquet that have a notion of row chunks etc, because the same tradeoffs exist there.
An example
Okay, enough theory. Let’s actually do this. We’ve already seen the “read” functionality in action, but I’m going to do it again with some additional arguments specified. This time around, I’m going to open a smaller subset, corresponding only to the 2016 data:
<- open_dataset(
nyc_taxi_2016a sources = "~/Datasets/nyc-taxi/year=2016/",
format = "parquet",
unify_schemas = TRUE
)
In this version of the command I’ve explicitly stated that I’m looking for parquet files, though I didn’t really need to do this because format = "parquet"
is the default. I’ve also set unify_schemas
to TRUE
rather than the default value of FALSE
. What this argument refers to is the way open_dataset()
aggregates the data files. When unify_schemas = TRUE
, it examines every data file to find names and data types for each column (i.e., the schema for the data set), and then seeks to aggregate those into a coherent whole. This can be time consuming, and is usually unnecessary because the data are written in the exact same format in every file. As a consequence, when unify_schemas = FALSE
(the default), the scanner will just look at the first file and assume that every data file has the same schema.
Okay, so let’s have a look at the data:
nyc_taxi_2016a
FileSystemDataset with 12 Parquet files
vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
...
As expected, this is a multi-file Dataset object constructed from 12 files.
Next, let’s imagine that we’re about to write an application whose primary function is to look at the different vendors who provide the raw data on a monthly basis. That’s a highly specialized use of this data set, and it may be advantageous to partition by month
and vendor_name
because those are the variables we’re likely to be querying on later. Because the philosophy of the arrow package is to try to preserve dplyr logic to the greatest possible extent, the default behaviour of write_dataset()
is to inspect the grouping variables for the data and use those to construct a Hive-style partition. So if I want to write this Dataset to file using month and vendor as my partitioning variables I would do this:
tic()
|>
nyc_taxi_2016a group_by(month, vendor_name) |>
write_dataset("~/Datasets/nyc-taxi_2016")
toc()
39.782 sec elapsed
As you can see, this write operation does take a little while to finish, but half a minute isn’t too bad.
In any case, I can open the new data set the same way as before:
<- open_dataset("~/Datasets/nyc-taxi_2016")
nyc_taxi_2016b nyc_taxi_2016b
FileSystemDataset with 27 Parquet files
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
...
Notice the difference between nyc_taxi_2016a
and nyc_taxi_2016b
. The both refer to the same data conceptually (i.e., all the taxi rides from 2016), but they’re linked to different files and they carve up the dataset in different ways:
$files nyc_taxi_2016a
[1] "/home/danielle/Datasets/nyc-taxi/year=2016/month=1/part-0.parquet"
[2] "/home/danielle/Datasets/nyc-taxi/year=2016/month=10/part-0.parquet"
[3] "/home/danielle/Datasets/nyc-taxi/year=2016/month=11/part-0.parquet"
[4] "/home/danielle/Datasets/nyc-taxi/year=2016/month=12/part-0.parquet"
[5] "/home/danielle/Datasets/nyc-taxi/year=2016/month=2/part-0.parquet"
...
$files nyc_taxi_2016b
[1] "/home/danielle/Datasets/nyc-taxi_2016/month=1/vendor_name=CMT/part-0.parquet"
[2] "/home/danielle/Datasets/nyc-taxi_2016/month=1/vendor_name=VTS/part-0.parquet"
[3] "/home/danielle/Datasets/nyc-taxi_2016/month=10/vendor_name=CMT/part-0.parquet"
[4] "/home/danielle/Datasets/nyc-taxi_2016/month=10/vendor_name=VTS/part-0.parquet"
[5] "/home/danielle/Datasets/nyc-taxi_2016/month=11/vendor_name=CMT/part-0.parquet"
...
To give you a sense of the difference between the two, here’s an example of a (somewhat) realistic query, computed on the nyc_taxi_2016b
data:
|>
nyc_taxi_2016b filter(vendor_name == "CMT") |>
group_by(month) |>
summarize(distance = sum(trip_distance, na.rm = TRUE)) |>
collect()
# A tibble: 12 × 2
month distance
<int> <dbl>
1 1 33436823.
2 12 13642500.
3 10 41799496.
4 11 13826243.
5 2 40006137.
6 3 55384892.
7 4 27440575.
8 5 52798627.
9 6 15617981.
10 7 19210103.
11 8 22581320.
12 9 76139235.
Here’s the time taken for this query:
0.556 sec elapsed
and for the same query performed on the nyc_taxi_2016a
data:
4.829 sec elapsed
The difference is not quite as extreme as the contrived example earlier, but it’s still quite substantial: using your domain expertise to choose relevant variables to partition on can make a real difference in how your queries perform!