S3 allows you to upload pieces of large files in parallel. Unfortunately, most/all of the examples that I’ve seen online are inefficient or inconvenient. For example:
- Physical file splits of the original file: If you couldn’t guess that S3 would have a way to work off a single copy of the source file, than you probably shouldn’t be using this functionality.
- Threading: Threads don’t truly run in parallel (in Python).
- Function-based designs (as opposed to class-based): I’ve never been a fan of this in Python. Too much context info has to be curried.
- Using multiprocessing: For every upload, you’ll have a number of processes, and all will still be in competition for the network device.
None of these strategies hold a candle to Greenlets (running off different file-pointers to the same physical copy of the file).
This example is located at RandomUtility: s3_parallel.
This is the principal class. Go to the original source for the imports and the couple module-level constants.
class ParallelUpload(object):
def __init__(self, ak, sk, bucket_name, filepath,
chunk_size_b=_DEFAULT_CHUNK_SIZE_B,
monitor_interval_s=_DEFAULT_MONITOR_INTERVAL_S):
self.__ak = ak
self.__sk = sk
self.__bucket_name = bucket_name
self.__filepath = filepath
self.__s3_key_name = os.path.basename(filepath)
self.__chunk_size_b = chunk_size_b
self.__coverage = 0.0
self.__monitor_interval_s = _DEFAULT_MONITOR_INTERVAL_S
self.__filesize_b = os.path.getsize(self.__filepath)
self.__chunks = int(math.ceil(float(self.__filesize_b) /
float(self.__chunk_size_b)))
self.__progress = [0.0] * self.__chunks
def __get_bucket(self, bucket_name):
conn = boto.s3.connection.S3Connection(self.__ak, self.__sk)
return conn.lookup(bucket_name)
def __standard_upload(self):
bucket = self.__get_bucket(self.__bucket_name)
new_s3_item = bucket.new_key(self.__s3_key_name)
new_s3_item.set_contents_from_filename(
self.__filepath,
cb=self.__standard_cb,
num_cb=20)
def __standard_cb(self, current, total):
_logger.debug("Status: %.2f%%", float(current) / float(total) * 100.0)
def __multipart_cb(self, i, current, total):
self.__progress[i] = float(current) / float(total) * 100.0
def __transfer_part(self, (mp_info, i, offset)):
(mp_id, mp_key_name, mp_bucket_name) = mp_info
bucket = self.__get_bucket(mp_bucket_name)
mp = boto.s3.multipart.MultiPartUpload(bucket)
mp.key_name = mp_key_name
mp.id = mp_id
# At any given time, this will describe the farther percentage into the
# file that we're actively working on.
self.__coverage = max(
(float(offset) / float(self.__filesize_b) * 100.0),
self.__coverage)
# The last chunk might be shorter than the rest.
eff_chunk_size = min(offset + self.__chunk_size_b,
self.__filesize_b) - \
offset
with open(filepath, 'rb') as f:
f.seek(offset)
mp.upload_part_from_file(
f,
i + 1,
size=eff_chunk_size,
cb=functools.partial(self.__multipart_cb, i),
num_cb=100)
def __mp_show_progress(self):
while 1:
columns = [("%3d%% " % self.__progress[i])
for i
in range(self.__chunks)]
pline = ' '.join(columns)
_logger.debug(pline)
gevent.sleep(self.__monitor_interval_s)
def __multipart_upload(self):
bucket = self.__get_bucket(self.__bucket_name)
mp = bucket.initiate_multipart_upload(self.__s3_key_name)
mp_info = (mp.id, mp.key_name, mp.bucket_name)
chunk_list = range(0, self.__filesize_b, self.__chunk_size_b)
try:
gen = ((mp_info, i, offset)
for (i, offset)
in enumerate(chunk_list))
f = functools.partial(gevent.spawn, self.__transfer_part)
if self.__monitor_interval_s > 0:
p = gevent.spawn(self.__mp_show_progress)
g_list = map(f, gen)
gevent.joinall(g_list)
if self.__monitor_interval_s > 0:
p.kill()
p.join()
except:
mp.cancel_upload()
raise
else:
mp.complete_upload()
def start(self):
if self.__filesize_b < _MIN_MULTIPART_SIZE_B:
self.__standard_upload()
else:
self.__multipart_upload()
The output when called as a command will look like this:
$ python s3_parallel.py (access key) (secret key) (bucket name) (file-path)
2014-06-17 10:16:48,458 - __main__ - DEBUG - 0% 0% 0% 0% 0% 0% 0%
2014-06-17 10:16:58,459 - __main__ - DEBUG - 3% 3% 2% 2% 2% 1% 7%
2014-06-17 10:17:08,460 - __main__ - DEBUG - 6% 5% 5% 4% 5% 4% 14%
2014-06-17 10:17:18,461 - __main__ - DEBUG - 10% 7% 8% 8% 7% 6% 18%
2014-06-17 10:17:28,461 - __main__ - DEBUG - 16% 10% 13% 11% 10% 8% 26%
2014-06-17 10:17:38,462 - __main__ - DEBUG - 21% 14% 20% 15% 14% 12% 35%
2014-06-17 10:17:48,462 - __main__ - DEBUG - 26% 17% 27% 19% 19% 15% 48%
2014-06-17 10:17:58,463 - __main__ - DEBUG - 32% 20% 33% 24% 24% 18% 59%
2014-06-17 10:18:08,463 - __main__ - DEBUG - 37% 24% 39% 29% 28% 22% 70%
2014-06-17 10:18:18,464 - __main__ - DEBUG - 43% 28% 44% 34% 32% 26% 82%
2014-06-17 10:18:28,464 - __main__ - DEBUG - 48% 31% 50% 39% 36% 31% 91%
2014-06-17 10:18:38,465 - __main__ - DEBUG - 52% 35% 55% 44% 43% 36% 100%
2014-06-17 10:18:48,465 - __main__ - DEBUG - 60% 39% 63% 47% 47% 40% 100%
2014-06-17 10:18:58,466 - __main__ - DEBUG - 68% 44% 69% 53% 53% 45% 100%
2014-06-17 10:19:08,466 - __main__ - DEBUG - 77% 49% 75% 58% 57% 49% 100%
2014-06-17 10:19:18,467 - __main__ - DEBUG - 83% 54% 84% 65% 62% 52% 100%
2014-06-17 10:19:28,467 - __main__ - DEBUG - 88% 58% 90% 71% 69% 58% 100%
2014-06-17 10:19:38,468 - __main__ - DEBUG - 96% 61% 96% 77% 74% 63% 100%
2014-06-17 10:19:48,468 - __main__ - DEBUG - 100% 67% 100% 83% 83% 70% 100%
2014-06-17 10:19:58,469 - __main__ - DEBUG - 100% 73% 100% 93% 93% 76% 100%
2014-06-17 10:20:08,469 - __main__ - DEBUG - 100% 83% 100% 100% 100% 86% 100%
2014-06-17 10:20:18,470 - __main__ - DEBUG - 100% 95% 100% 100% 100% 100% 100%
Like this:
Like Loading...