Fast, Multipart Downloads from S3

Using the “Range” header and greenlets, you can do very fast downloads from S3. The speed improvements are considerably higher than with uploads. With a 130M file, it was more than three-times faster to parallelize a download.

I’ve put the code in RandomUtility.

Usage:

$ python s3_parallel_download.py (access key) (secret key) (bucket name) (key name)
...
2014-06-23 11:45:06,896 - __main__ - DEBUG -  19%   6%  13%   6%   6%  13%  27%
2014-06-23 11:45:16,896 - __main__ - DEBUG -  52%  26%  26%  26%  39%  26%  68%
2014-06-23 11:45:26,897 - __main__ - DEBUG -  85%  32%  52%  39%  52%  45% 100%
2014-06-23 11:45:36,897 - __main__ - DEBUG - 100%  78%  78%  59%  65%  65% 100%
2014-06-23 11:45:46,897 - __main__ - DEBUG - 100% 100% 100%  78%  91%  91% 100%
Downloaded: /var/folders/qk/t5991kt11cb2y6qgmzrzm_g00000gp/T/tmpU7pL8I

Doing Fast Multipart Uploads to S3 Using Greenlets

S3 allows you to upload pieces of large files in parallel. Unfortunately, most/all of the examples that I’ve seen online are inefficient or inconvenient. For example:

  • Physical file splits of the original file: If you couldn’t guess that S3 would have a way to work off a single copy of the source file, than you probably shouldn’t be using this functionality.
  • Threading: Threads don’t truly run in parallel (in Python).
  • Function-based designs (as opposed to class-based): I’ve never been a fan of this in Python. Too much context info has to be curried.
  • Using multiprocessing: For every upload, you’ll have a number of processes, and all will still be in competition for the network device.

None of these strategies hold a candle to Greenlets (running off different file-pointers to the same physical copy of the file).

This example is located at RandomUtility: s3_parallel.

This is the principal class. Go to the original source for the imports and the couple module-level constants.

class ParallelUpload(object):
    def __init__(self, ak, sk, bucket_name, filepath, 
                 chunk_size_b=_DEFAULT_CHUNK_SIZE_B,
                 monitor_interval_s=_DEFAULT_MONITOR_INTERVAL_S):
        self.__ak = ak
        self.__sk = sk
        self.__bucket_name = bucket_name
        self.__filepath = filepath
        self.__s3_key_name = os.path.basename(filepath)
        self.__chunk_size_b = chunk_size_b
        self.__coverage = 0.0
        self.__monitor_interval_s = _DEFAULT_MONITOR_INTERVAL_S

        self.__filesize_b = os.path.getsize(self.__filepath)
        self.__chunks = int(math.ceil(float(self.__filesize_b) / 
                                      float(self.__chunk_size_b)))

        self.__progress = [0.0] * self.__chunks

    def __get_bucket(self, bucket_name):
        conn = boto.s3.connection.S3Connection(self.__ak, self.__sk)
        return conn.lookup(bucket_name)

    def __standard_upload(self):
        bucket = self.__get_bucket(self.__bucket_name)
        new_s3_item = bucket.new_key(self.__s3_key_name)
        new_s3_item.set_contents_from_filename(
            self.__filepath, 
            cb=self.__standard_cb, 
            num_cb=20)

    def __standard_cb(self, current, total):
        _logger.debug("Status: %.2f%%", float(current) / float(total) * 100.0)

    def __multipart_cb(self, i, current, total):
        self.__progress[i] = float(current) / float(total) * 100.0

    def __transfer_part(self, (mp_info, i, offset)):
        (mp_id, mp_key_name, mp_bucket_name) = mp_info

        bucket = self.__get_bucket(mp_bucket_name)
        mp = boto.s3.multipart.MultiPartUpload(bucket)
        mp.key_name = mp_key_name
        mp.id = mp_id

        # At any given time, this will describe the farther percentage into the 
        # file that we're actively working on.
        self.__coverage = max(
                            (float(offset) / float(self.__filesize_b) * 100.0), 
                            self.__coverage)

        # The last chunk might be shorter than the rest.
        eff_chunk_size = min(offset + self.__chunk_size_b, 
                             self.__filesize_b) - \
                         offset

        with open(filepath, 'rb') as f:
            f.seek(offset)
            mp.upload_part_from_file(
                f, 
                i + 1, 
                size=eff_chunk_size, 
                cb=functools.partial(self.__multipart_cb, i), 
                num_cb=100)

    def __mp_show_progress(self):
        while 1:
            columns = [("%3d%% " % self.__progress[i]) 
                       for i 
                       in range(self.__chunks)]

            pline = ' '.join(columns)
            _logger.debug(pline)

            gevent.sleep(self.__monitor_interval_s)

    def __multipart_upload(self):
        bucket = self.__get_bucket(self.__bucket_name)

        mp = bucket.initiate_multipart_upload(self.__s3_key_name)
        mp_info = (mp.id, mp.key_name, mp.bucket_name)
        chunk_list = range(0, self.__filesize_b, self.__chunk_size_b)

        try:
            gen = ((mp_info, i, offset) 
                   for (i, offset) 
                   in enumerate(chunk_list))

            f = functools.partial(gevent.spawn, self.__transfer_part)

            if self.__monitor_interval_s > 0:
                p = gevent.spawn(self.__mp_show_progress)

            g_list = map(f, gen)

            gevent.joinall(g_list)

            if self.__monitor_interval_s > 0:
                p.kill()
                p.join()
        except:
            mp.cancel_upload()
            raise
        else:
            mp.complete_upload()

    def start(self):
        if self.__filesize_b < _MIN_MULTIPART_SIZE_B:
            self.__standard_upload()
        else:
            self.__multipart_upload()

The output when called as a command will look like this:

$ python s3_parallel.py (access key) (secret key) (bucket name) (file-path)
2014-06-17 10:16:48,458 - __main__ - DEBUG -   0%    0%    0%    0%    0%    0%    0% 
2014-06-17 10:16:58,459 - __main__ - DEBUG -   3%    3%    2%    2%    2%    1%    7% 
2014-06-17 10:17:08,460 - __main__ - DEBUG -   6%    5%    5%    4%    5%    4%   14% 
2014-06-17 10:17:18,461 - __main__ - DEBUG -  10%    7%    8%    8%    7%    6%   18% 
2014-06-17 10:17:28,461 - __main__ - DEBUG -  16%   10%   13%   11%   10%    8%   26% 
2014-06-17 10:17:38,462 - __main__ - DEBUG -  21%   14%   20%   15%   14%   12%   35% 
2014-06-17 10:17:48,462 - __main__ - DEBUG -  26%   17%   27%   19%   19%   15%   48% 
2014-06-17 10:17:58,463 - __main__ - DEBUG -  32%   20%   33%   24%   24%   18%   59% 
2014-06-17 10:18:08,463 - __main__ - DEBUG -  37%   24%   39%   29%   28%   22%   70% 
2014-06-17 10:18:18,464 - __main__ - DEBUG -  43%   28%   44%   34%   32%   26%   82% 
2014-06-17 10:18:28,464 - __main__ - DEBUG -  48%   31%   50%   39%   36%   31%   91% 
2014-06-17 10:18:38,465 - __main__ - DEBUG -  52%   35%   55%   44%   43%   36%  100% 
2014-06-17 10:18:48,465 - __main__ - DEBUG -  60%   39%   63%   47%   47%   40%  100% 
2014-06-17 10:18:58,466 - __main__ - DEBUG -  68%   44%   69%   53%   53%   45%  100% 
2014-06-17 10:19:08,466 - __main__ - DEBUG -  77%   49%   75%   58%   57%   49%  100% 
2014-06-17 10:19:18,467 - __main__ - DEBUG -  83%   54%   84%   65%   62%   52%  100% 
2014-06-17 10:19:28,467 - __main__ - DEBUG -  88%   58%   90%   71%   69%   58%  100% 
2014-06-17 10:19:38,468 - __main__ - DEBUG -  96%   61%   96%   77%   74%   63%  100% 
2014-06-17 10:19:48,468 - __main__ - DEBUG - 100%   67%  100%   83%   83%   70%  100% 
2014-06-17 10:19:58,469 - __main__ - DEBUG - 100%   73%  100%   93%   93%   76%  100% 
2014-06-17 10:20:08,469 - __main__ - DEBUG - 100%   83%  100%  100%  100%   86%  100% 
2014-06-17 10:20:18,470 - __main__ - DEBUG - 100%   95%  100%  100%  100%  100%  100% 

Using a REST-Based Pipe to Keep Systems Connected

You’ll eventually need a bidirectional pipe/bridge between environments/subnets, and an infrastructure-level pipe/VPN connection would be overkill. You might consider using SSH multiplexing, which allows you to:

  • Track the state of a named SSH connection.
  • Reuse the same connection for subsequent calls into the same server.

However, multiplexing has two fairly large disadvantages:

  • In order to get bidirectional communication, you’ll have to start stacking forward- and reverse-tunnels on top of the connection, and this gets complicated.
  • If you need to access the pipe from an application, then there’s a degree of risk in depending on an elaborately-configured console utility in order for your application to work correctly. There is no API.

To a lesser degree, you might also have to adhere to certain security restrictions. For example, you might only allowed to connect in one direction, to one port.

Instead of writing your own socket server, forming your own socket protocol, writing your own heartbeat mechanism, and writing adapters for your applications on both the client and server systems, you might consider RestPipe.

RestPipe

RestPipe is a solution that aggressively maintains a bidirectional connection from one or more client machines to a single server. If the client needs to talk to the server, the client talks to a local webserver that translates the request to a message over an SSL-authenticated socket (written using coroutines/greenlets and Protocol Buffers), the server passes the request to your event-handler, and the response is forwarded back as a response to the original web-request. The same process also works in reverse if the server wants to talk to the client, and provides the hostname as a part of the URL.

Setup

The documentation is fairly complete. To get it going quickly on a development system:

  1. Use CaKit to generate a CA identity, server identity, and client identity.
  2. Install the restpipe package using PyPI.
  3. Start the server.
  4. Start the client.
  5. Use cURL to make a request to either the server (which will query the client), or the client (which will query the server).

Examples Queries (Available by Default)

  • $ curl http://rpclient.local/server/time && echo
    {"time_from_server": 1402897823.882672}
    
  • $ curl http://rpserver.local/client/localhost/time && echo
    {"time_from_client": 1402897843.879908}
    
  • $ curl http://rpclient.local/server/cat//hello%20/world && echo
    {"result_from_server": "hello world"}
    
  • $ curl http://rpserver.local/client/localhost/cat//hello%20/world && echo
    {"result_from_client": "hello world"}
    

Using ZeroMQ With Coroutines (gevent) Under Python

ZeroMQ (0MQ) is a beautiful library that basically replaces the socket layer with a very thin, pattern-based wrapper. Aside from removing this overhead from your code, 0MQ also usually gives you the guarantee that one read will return one message (or one part of a multipart message).

gevent is a coroutine-based networking library for Python. Coroutines allow you to leverage the blocking that certain types of operations, like network requests, to perform other operations while waiting (works best when you’re doing a number of similar operations in parallel). It’s a compromise that allows you to speed up synchronous operations to the point of being comparable to multithreading (at least in the case of network operations).

There was a point at which ZeroMQ didn’t support this (and a package named gevent_zmq had to be used), but it has since become compatible with it.

For example, a server:

import gevent

import zmq.green as zmq

_BINDING = 'ipc:///tmp/test_server'

context = zmq.Context()

def server():
    server_socket = context.socket(zmq.REP)
    server_socket.bind(_BINDING)

    while 1:
        received = server_socket.recv()
        print("Received:\n[%s]" % (received))
        print('')

        server_socket.send('TestResponse')

server = gevent.spawn(server)
server.join()

The corresponding client:

import gevent

import zmq.green as zmq

_BINDING = 'ipc:///tmp/test_server'

context = zmq.Context()

def client():
    client_socket = context.socket(zmq.REQ)
    client_socket.connect(_BINDING)

    client_socket.send("TestMessage")

    response = client_socket.recv()
    print("Response:\n[%s]" % (response))
    print('')

client = gevent.spawn(client)
client.join()

Displaying the output here would nearly be redundant, given that the result should be plainly obvious.