Fast, Multipart Downloads from S3

Using the “Range” header and greenlets, you can do very fast downloads from S3. The speed improvements are considerably higher than with uploads. With a 130M file, it was more than three-times faster to parallelize a download.

I’ve put the code in RandomUtility.

Usage:

$ python s3_parallel_download.py (access key) (secret key) (bucket name) (key name)
...
2014-06-23 11:45:06,896 - __main__ - DEBUG -  19%   6%  13%   6%   6%  13%  27%
2014-06-23 11:45:16,896 - __main__ - DEBUG -  52%  26%  26%  26%  39%  26%  68%
2014-06-23 11:45:26,897 - __main__ - DEBUG -  85%  32%  52%  39%  52%  45% 100%
2014-06-23 11:45:36,897 - __main__ - DEBUG - 100%  78%  78%  59%  65%  65% 100%
2014-06-23 11:45:46,897 - __main__ - DEBUG - 100% 100% 100%  78%  91%  91% 100%
Downloaded: /var/folders/qk/t5991kt11cb2y6qgmzrzm_g00000gp/T/tmpU7pL8I

Doing Fast Multipart Uploads to S3 Using Greenlets

S3 allows you to upload pieces of large files in parallel. Unfortunately, most/all of the examples that I’ve seen online are inefficient or inconvenient. For example:

  • Physical file splits of the original file: If you couldn’t guess that S3 would have a way to work off a single copy of the source file, than you probably shouldn’t be using this functionality.
  • Threading: Threads don’t truly run in parallel (in Python).
  • Function-based designs (as opposed to class-based): I’ve never been a fan of this in Python. Too much context info has to be curried.
  • Using multiprocessing: For every upload, you’ll have a number of processes, and all will still be in competition for the network device.

None of these strategies hold a candle to Greenlets (running off different file-pointers to the same physical copy of the file).

This example is located at RandomUtility: s3_parallel.

This is the principal class. Go to the original source for the imports and the couple module-level constants.

class ParallelUpload(object):
    def __init__(self, ak, sk, bucket_name, filepath, 
                 chunk_size_b=_DEFAULT_CHUNK_SIZE_B,
                 monitor_interval_s=_DEFAULT_MONITOR_INTERVAL_S):
        self.__ak = ak
        self.__sk = sk
        self.__bucket_name = bucket_name
        self.__filepath = filepath
        self.__s3_key_name = os.path.basename(filepath)
        self.__chunk_size_b = chunk_size_b
        self.__coverage = 0.0
        self.__monitor_interval_s = _DEFAULT_MONITOR_INTERVAL_S

        self.__filesize_b = os.path.getsize(self.__filepath)
        self.__chunks = int(math.ceil(float(self.__filesize_b) / 
                                      float(self.__chunk_size_b)))

        self.__progress = [0.0] * self.__chunks

    def __get_bucket(self, bucket_name):
        conn = boto.s3.connection.S3Connection(self.__ak, self.__sk)
        return conn.lookup(bucket_name)

    def __standard_upload(self):
        bucket = self.__get_bucket(self.__bucket_name)
        new_s3_item = bucket.new_key(self.__s3_key_name)
        new_s3_item.set_contents_from_filename(
            self.__filepath, 
            cb=self.__standard_cb, 
            num_cb=20)

    def __standard_cb(self, current, total):
        _logger.debug("Status: %.2f%%", float(current) / float(total) * 100.0)

    def __multipart_cb(self, i, current, total):
        self.__progress[i] = float(current) / float(total) * 100.0

    def __transfer_part(self, (mp_info, i, offset)):
        (mp_id, mp_key_name, mp_bucket_name) = mp_info

        bucket = self.__get_bucket(mp_bucket_name)
        mp = boto.s3.multipart.MultiPartUpload(bucket)
        mp.key_name = mp_key_name
        mp.id = mp_id

        # At any given time, this will describe the farther percentage into the 
        # file that we're actively working on.
        self.__coverage = max(
                            (float(offset) / float(self.__filesize_b) * 100.0), 
                            self.__coverage)

        # The last chunk might be shorter than the rest.
        eff_chunk_size = min(offset + self.__chunk_size_b, 
                             self.__filesize_b) - \
                         offset

        with open(filepath, 'rb') as f:
            f.seek(offset)
            mp.upload_part_from_file(
                f, 
                i + 1, 
                size=eff_chunk_size, 
                cb=functools.partial(self.__multipart_cb, i), 
                num_cb=100)

    def __mp_show_progress(self):
        while 1:
            columns = [("%3d%% " % self.__progress[i]) 
                       for i 
                       in range(self.__chunks)]

            pline = ' '.join(columns)
            _logger.debug(pline)

            gevent.sleep(self.__monitor_interval_s)

    def __multipart_upload(self):
        bucket = self.__get_bucket(self.__bucket_name)

        mp = bucket.initiate_multipart_upload(self.__s3_key_name)
        mp_info = (mp.id, mp.key_name, mp.bucket_name)
        chunk_list = range(0, self.__filesize_b, self.__chunk_size_b)

        try:
            gen = ((mp_info, i, offset) 
                   for (i, offset) 
                   in enumerate(chunk_list))

            f = functools.partial(gevent.spawn, self.__transfer_part)

            if self.__monitor_interval_s > 0:
                p = gevent.spawn(self.__mp_show_progress)

            g_list = map(f, gen)

            gevent.joinall(g_list)

            if self.__monitor_interval_s > 0:
                p.kill()
                p.join()
        except:
            mp.cancel_upload()
            raise
        else:
            mp.complete_upload()

    def start(self):
        if self.__filesize_b < _MIN_MULTIPART_SIZE_B:
            self.__standard_upload()
        else:
            self.__multipart_upload()

The output when called as a command will look like this:

$ python s3_parallel.py (access key) (secret key) (bucket name) (file-path)
2014-06-17 10:16:48,458 - __main__ - DEBUG -   0%    0%    0%    0%    0%    0%    0% 
2014-06-17 10:16:58,459 - __main__ - DEBUG -   3%    3%    2%    2%    2%    1%    7% 
2014-06-17 10:17:08,460 - __main__ - DEBUG -   6%    5%    5%    4%    5%    4%   14% 
2014-06-17 10:17:18,461 - __main__ - DEBUG -  10%    7%    8%    8%    7%    6%   18% 
2014-06-17 10:17:28,461 - __main__ - DEBUG -  16%   10%   13%   11%   10%    8%   26% 
2014-06-17 10:17:38,462 - __main__ - DEBUG -  21%   14%   20%   15%   14%   12%   35% 
2014-06-17 10:17:48,462 - __main__ - DEBUG -  26%   17%   27%   19%   19%   15%   48% 
2014-06-17 10:17:58,463 - __main__ - DEBUG -  32%   20%   33%   24%   24%   18%   59% 
2014-06-17 10:18:08,463 - __main__ - DEBUG -  37%   24%   39%   29%   28%   22%   70% 
2014-06-17 10:18:18,464 - __main__ - DEBUG -  43%   28%   44%   34%   32%   26%   82% 
2014-06-17 10:18:28,464 - __main__ - DEBUG -  48%   31%   50%   39%   36%   31%   91% 
2014-06-17 10:18:38,465 - __main__ - DEBUG -  52%   35%   55%   44%   43%   36%  100% 
2014-06-17 10:18:48,465 - __main__ - DEBUG -  60%   39%   63%   47%   47%   40%  100% 
2014-06-17 10:18:58,466 - __main__ - DEBUG -  68%   44%   69%   53%   53%   45%  100% 
2014-06-17 10:19:08,466 - __main__ - DEBUG -  77%   49%   75%   58%   57%   49%  100% 
2014-06-17 10:19:18,467 - __main__ - DEBUG -  83%   54%   84%   65%   62%   52%  100% 
2014-06-17 10:19:28,467 - __main__ - DEBUG -  88%   58%   90%   71%   69%   58%  100% 
2014-06-17 10:19:38,468 - __main__ - DEBUG -  96%   61%   96%   77%   74%   63%  100% 
2014-06-17 10:19:48,468 - __main__ - DEBUG - 100%   67%  100%   83%   83%   70%  100% 
2014-06-17 10:19:58,469 - __main__ - DEBUG - 100%   73%  100%   93%   93%   76%  100% 
2014-06-17 10:20:08,469 - __main__ - DEBUG - 100%   83%  100%  100%  100%   86%  100% 
2014-06-17 10:20:18,470 - __main__ - DEBUG - 100%   95%  100%  100%  100%  100%  100%