Lessons Learned from Scaling to Multi-Terabyte Datasets

This post is meant to guide you through some of the lessons I’ve learned while working with multi-terabyte datasets. The lessons shared are focused on what someone may face as the size of their dataset scales up and some of the things I’ve done to overcome them. I hope you’re waiting for something to finish running while reading this!

Remember, this is not a rigid guide. It’s about introducing concepts and explaining why you should start applying them. Numerous other tools can surpass the ones I’ve used, and I strongly encourage you to take the initiative and explore them independently. Your active exploration is key to your professional growth.

I’ve divided this post into two sections: scaling on single machines and multi-machine scaling. The goal is to maximize your available resources and reach your goals as quickly as possible.

Lastly, I want to emphasize that no optimization or scaling can compensate for a flawed algorithm. Before scaling up, it’s crucial to evaluate your algorithm. This should always be your first step, providing a confident guide for your work.

Scaling on a Single Machine

Joblib

Compute is the first bottleneck that comes to mind when scaling. Scaling up computations can be done in several different practical ways. If you’re a data scientist or a machine learning engineer, you might already be familiar with Joblib, a library used to run code in parallel (among other things). It is often used in other libraries, such as scikit-learn or XGBoost.

The process of parallelizing something using Joblib is simple, as follows (modified for clarity from the Joblib docs):

>>> from joblib import Parallel, delayed
>>> from math import sqrt

>>> parallel_mapper = Parallel(n_jobs=-1)
>>> delayed_func = delayed(sqrt)
>>> jobs = [
    delayed_func(x**2)
    for x in range(10)
]
>>> parallel_mapper(jobs)
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Joblib is a great way to scale up your parallel workloads. It’s used in scikit-learn and other tools, proving reliable for many workloads. This isn’t even considering its other excellent features regarding memoization or Fast Compressed Persistence. Joblib is helpful for just making a function parallelizable across all your CPU cores.

GNU Parallel

GNU Parallel is a powerful tool for preprocessing or extracting data in the CLI. It differs from Joblib as it can be used outside a script and is versatile. You can even run other Python scripts in parallel. One of the most common use cases is decompressing many files simultaneously. Here’s how I would do it:

> ls
random_0.zip  random_2.zip  random_4.zip  random_6.zip  random_8.zip
random_1.zip  random_3.zip  random_5.zip  random_7.zip  random_9.zip
...

> mkdir output
> ls | parallel --eta --bar "unzip -q {} -d output/"
100% 10:0=0s random_9.zip

> ls output/
random_0.txt  random_2.txt  random_4.txt  random_6.txt  random_8.txt
random_1.txt  random_3.txt  random_5.txt  random_7.txt  random_9.txt
...

These commands are pretty straightforward if you have used a Linux terminal before. The main part to focus on is piping the file names to parallel so that unzip can decompress them.

For any task, once you have a bash command set to run on a single file, you can parallelize it by modifying your command slightly. By default, parallel uses all available CPU cores and can execute commands on multiple machines using ssh, meaning that it could be used as an ad-hock computing cluster.

Another use case is downloading a large number of files. With wget and parallel and a list of files to download, writing a quick one-liner to download all the files in parallel is easy. Other tools, such as axel and aria2c, can do this just as well, but I’ve found this to work better when I need to download many smaller files.

A quick note: While you can use this to download many files, be aware that this can cause strain on servers by creating multiple connections, leading to network congestion and reduced performance for other users or even being seen as a DOS attack. This increased server load can be particularly problematic for smaller websites or servers with limited bandwidth. Famously, aria2c has rejected proposals to increase the maximum number of connections from 16, even though computers have gotten faster, and network bandwidth has increased dramatically. Given their position, I agree with their decisions and ask you to act responsibly when downloading.

Another point I’d like to mention is that while you can get things working quicker with Parallel, it may be challenging to manage bash commands, especially for a beginner where the rest of the team might be more Python/traditional programming language focused. Due to this, I generally recommend keeping Parallel for one-off tasks rather than writing complex ETL pipelines in bash. Maintainable code is second to only no code at all.

Scaling to Multiple Machines

When to Start Using Multiple Machines

One key identifier for when it makes sense to switch to using multiple machines (think Spark or, my favourite, Dask) is when computing is taking too long for your use cases. This could be experiments, data processing, or whatever. The worst timeframe I’ve estimated is some jobs taking months or a year to finish computing if I were to stick to a single instance, even on AWS’s u-24tb1.112xlarge (a beast of a machine). I’m against the waste of any kind, and the better you can utilize the resources available, the better, in my opinion.

By switching to multiple smaller machines, you leverage several performance benefits over a more prominent instance. Depending on your scaling solution, horizontal scaling allows for almost linear scaling across your CPU, memory, and networking with the number of instances you use.

Most reasonably large EC2 instances offer up to 10 GBit internet speeds, which can help alleviate IO bottlenecks, especially if you’re rapidly streaming data to or from S3. If your workload requires data coming in at 50 Gbit/s, you’ve got the option to either use a m7i.48xlarge instance, which costs $9.6768 hourly and runs at 50 GBit, or four m7i.8xlarge instances, which costs $1.6128 hourly per instance or $6.4512 hourly for the same network bandwidth.

I selected networking speeds and cost as the two metrics to focus on here, but if you’re looking to maximize your memory and CPU usage, we can compare the previously mentioned u-24tb1.112xlarge. For the exact cost, you can rent out 135 m7i.8xlarge instances. That gives you 4320 CPUs (10x the instance), 17.28TB of RAM, and 1687.5 Gigabit internet speed (~17x the instance)! While RAM is less, I’ve used a general-purpose instance here to scale, not a memory-optimized one. Using the memory-optimized equivalent, we can get 34.56 TB of RAM, with all the other benefits of using multiple machines (redundancy, finer control for the instance size, etc).

Moreover, with the correct backend, I can scale to as many instances as my use case, orchestration tool, or accounting department will allow. This level of scalability is a crucial advantage, enabling you to meet the demands of your workload without being limited by the capabilities of a single instance.

As with everything, there are benefits to your different approaches. It’s your job to evaluate the pros and cons of each solution and determine what works best for your use case. Minimizing cost while maximizing performance is a good exercise in building intuition for these tasks.

However, given these incredible benefits, I only recommend using multiple instances once you’ve understood the bottlenecks you face. I have seen teams start to scale and over-engineer their approach to computing before understanding their use case. I may have even been a part of those teams before learning my lesson. In some instances, well-written cli tools could process data faster than an entire spark cluster.

Different Computing Models

For Embarrassingly Parallel Workloads

Embarrassingly Parallel Workloads are generally the easiest to scale compared to other types of workloads. We’ve already touched on how to scale up computing using Joblib or Parallel, but what about scaling to multiple machines? There are quite a few tools to scale up computation. I would recommend using AWS Batch or AWS Lambda for embarrassingly parallel workloads that are one-off. Batch is scalable, and with spot pricing, you can get most of your tasks done at a fraction of the cost of using on-demand instances in a fraction of the time it would take to run them in parallel on a single machine. There are other tools you can use (GCP’s Cloud Run, for example), but I can only recommend AWS Batch for longer-running tasks since that’s what I’ve used in the past.

Since setting up the cluster can be time-consuming and is out of the scope of this post, I’ve included a link here incase you’re interested in exploring this yourself.

One caveat worth mentioning is that the general throughput of your job will be limited by your read and write speeds more so than the compute speed. If you’re reading from/writing to a database, then the database is likely to be a bottleneck (or even crash). S3 is a viable option for reading and writing, given it’s designed to scale better, but it still has its limits. 3,500 writes and 5,500 reads per second per partitioned prefix. S3 is designed to be invisible when scaling to the user, so you may have little control over how it adapts to the increased throughput.

Once the data is in S3 (or whatever service you use), you can transfer it wherever needed.

This setup is quite tedious but scales well for one-off tasks. With a few iterations, you can reduce the setup time to a few minutes, depending on how well you’ve automated the process and your team’s needs. Generally, I’ve found that the setup time is worth it for the computing and engineering time saved, but you can understand my hesitation in using this for every task.

Analytical Workloads

Analytical workloads are a bit more challenging to scale. This is because you’re generally working with a single dataset and trying to do a lot of operations on that dataset. You may also have an element of interactivity, such as things running in a Jupyter Notebook. My go-to tool for scaling up analytical workloads is Dask, with an alternative being Spark. Dask and Spark are open-source tools that allow you to scale up your workloads to multiple machines, with their pros and cons. Both these tools can also be used locally, and their implementations of DataFrames (Dask DataFrame and Spark Dataframe) can be used to scale up existing workloads.

Dask is much easier to set up and install. I can get Dask running locally in a few minutes with a single command (pip install "dask[complete]" by the way). On the other hand, Spark requires a bit more setup, and I’ve found that running on my local machine is a bit more challenging. Dask also comes with the benefit that any data scientist using Pandas or Numpy can get used to it quickly while knowing Spark is an entirely different skill set. Dask is also better integrated with several PyData tools, meaning you can take advantage of them immediately. However, given all this, Spark and the Spark ecosystem are much more mature by comparison, and it’s likely that your team already has invested time into getting a Spark cluster up and running. I run into the occasional bug or performance issue with Dask, while Spark is known to be much more stable due to its maturity. Dask is also not suited for longer-running computations.

Given this, my general recommendation is:

  • If you’re a small team or startup with no infrastructure for big data or distributed computing. In that case, I recommend at least experimenting with Dask, regardless of the team’s experience with Spark. In the time you take to get Spark running locally, you could’ve validated your use case with Dask, and your team will be able to leverage other tools in the PyData space.
  • If you’re already part of a larger organization that uses Spark or some other significant data infrastructure. In that case, it makes sense to stick with it unless you have a compelling reason not to. I recommend watching Eric Dill’s talk on Is Spark Still Relevant? for why larger organizations prefer to use Spark over more modern tools. It is five years old, so some talking points may be outdated. That said, you should still try Dask since you can use both.

Conclusion

In conclusion, managing and scaling multi-terabyte datasets requires a deep understanding of both your data and the tools at your disposal. By leveraging Joblib and GNU Parallel for single-machine scaling, you can maximize the efficiency of your computational resources. When scaling beyond a single machine is necessary, AWS Batch, Dask, and Spark provide robust solutions for various workloads, from embarrassingly parallel tasks to complex analytical operations.

The key takeaway is to start by optimizing your algorithms before scaling, ensuring you’re not merely amplifying inefficiencies. Actively exploring and adapting new tools can significantly enhance your performance and cost-effectiveness. Successful scaling is as much about strategic planning and resource management as raw computational power. Embrace the learning curve; you’ll be well-equipped to handle even the largest datasets confidently and skillfully.

7 thoughts on “Lessons Learned from Scaling to Multi-Terabyte Datasets

    1. That’s a good one to add, but I’ve never used K8 in that capacity. I’ll make a note of it in my future work. Thanks!

      Like

  1. Great post! I’m curious, how did you deploy Dask on the cloud for the multi-machine scaling? I’m guessing Dask Cloud Provider or something like Coiled?

    Like

    1. Yes! There are a lot of ways in which you can deploy dask tbh. Someone else mentioned using K8, which could also be a way to do so. I’ve played with Coiled, Dask on Docker Swarm, and Dask Cloud Provider (my primary way atm). There are quite a few ways you can spin up a cluster, and it depends on what you have at hand more than anything.

      If I had to pick today, I would probably pick Coiled. It has a generous free tier. Then again, I am pretty cheap, so maybe I’ll prefer to run something locally instead 🤔

      If I were part of a company with a dedicated DevOps person, I would 100% choose Dask Cloud Provider. If I had to set it up independently, I’d choose Coiled.

      Like

  2. We ran into serious I/O bottlenecks when scaling our analysis of multi-terabyte datasets in AWS, especially when pushing data to hundreds of powerful workers. After evaluating several options, we decided to use an S3-backed file system called flexFS (https://flexfs.io) so we could skip the whole logistical pain of using S3 (error recovery, provisioning local storage, caching, etc.). It’s made our lives a lot easier and saved us a ton in EC2 costs.

    Like

    1. Oh man, I’d love to hear more about what issues you ran and see if we ran into similar problems. We were able to solve them without a separate file system

      I’ll have a look at flexFS. It seems interesting

      Like

      1. Most of our analysis tooling needed to operate on large files (hundreds of GBs each) via POSIX, so we needed to download to local EBS volumes (which have quite low throughput). We would also usually over download because our analysis tools really only needed to read parts of the files. And there was large variations in the file sizes, and combinations of files we needed for a given task, so we often needed to (over) provision EBS volumes for the worst case, leading to higher EBS charges. An alternative might have been to use instances with ephemeral NVMe drives to download to, but they get pricey and limit our instance type options. And even those don’t solve the over download problem. They just increase throughput to around 1000 MB/s. POSIX network file systems are more efficient as they eliminate the intermediate download/upload issues (including over downloading and over provisioning local storage) by streaming data back and forth as needed, but the big ones (eg EFS, FSx for Lustre) were either poor in terms of throughput or really expensive. Our goal was to feed the CPUs as quickly as possible and simply our workflows to minimize operational and development costs.

        Like

Leave a comment