S3 allows you to upload pieces of large files in parallel. Unfortunately, most/all of the examples that I’ve seen online are inefficient or inconvenient. For example:
- Physical file splits of the original file: If you couldn’t guess that S3 would have a way to work off a single copy of the source file, than you probably shouldn’t be using this functionality.
- Threading: Threads don’t truly run in parallel (in Python).
- Function-based designs (as opposed to class-based): I’ve never been a fan of this in Python. Too much context info has to be curried.
- Using multiprocessing: For every upload, you’ll have a number of processes, and all will still be in competition for the network device.
None of these strategies hold a candle to Greenlets (running off different file-pointers to the same physical copy of the file).
This example is located at RandomUtility: s3_parallel.
This is the principal class. Go to the original source for the imports and the couple module-level constants.
class ParallelUpload(object): def __init__(self, ak, sk, bucket_name, filepath, chunk_size_b=_DEFAULT_CHUNK_SIZE_B, monitor_interval_s=_DEFAULT_MONITOR_INTERVAL_S): self.__ak = ak self.__sk = sk self.__bucket_name = bucket_name self.__filepath = filepath self.__s3_key_name = os.path.basename(filepath) self.__chunk_size_b = chunk_size_b self.__coverage = 0.0 self.__monitor_interval_s = _DEFAULT_MONITOR_INTERVAL_S self.__filesize_b = os.path.getsize(self.__filepath) self.__chunks = int(math.ceil(float(self.__filesize_b) / float(self.__chunk_size_b))) self.__progress = [0.0] * self.__chunks def __get_bucket(self, bucket_name): conn = boto.s3.connection.S3Connection(self.__ak, self.__sk) return conn.lookup(bucket_name) def __standard_upload(self): bucket = self.__get_bucket(self.__bucket_name) new_s3_item = bucket.new_key(self.__s3_key_name) new_s3_item.set_contents_from_filename( self.__filepath, cb=self.__standard_cb, num_cb=20) def __standard_cb(self, current, total): _logger.debug("Status: %.2f%%", float(current) / float(total) * 100.0) def __multipart_cb(self, i, current, total): self.__progress[i] = float(current) / float(total) * 100.0 def __transfer_part(self, (mp_info, i, offset)): (mp_id, mp_key_name, mp_bucket_name) = mp_info bucket = self.__get_bucket(mp_bucket_name) mp = boto.s3.multipart.MultiPartUpload(bucket) mp.key_name = mp_key_name mp.id = mp_id # At any given time, this will describe the farther percentage into the # file that we're actively working on. self.__coverage = max( (float(offset) / float(self.__filesize_b) * 100.0), self.__coverage) # The last chunk might be shorter than the rest. eff_chunk_size = min(offset + self.__chunk_size_b, self.__filesize_b) - \ offset with open(filepath, 'rb') as f: f.seek(offset) mp.upload_part_from_file( f, i + 1, size=eff_chunk_size, cb=functools.partial(self.__multipart_cb, i), num_cb=100) def __mp_show_progress(self): while 1: columns = [("%3d%% " % self.__progress[i]) for i in range(self.__chunks)] pline = ' '.join(columns) _logger.debug(pline) gevent.sleep(self.__monitor_interval_s) def __multipart_upload(self): bucket = self.__get_bucket(self.__bucket_name) mp = bucket.initiate_multipart_upload(self.__s3_key_name) mp_info = (mp.id, mp.key_name, mp.bucket_name) chunk_list = range(0, self.__filesize_b, self.__chunk_size_b) try: gen = ((mp_info, i, offset) for (i, offset) in enumerate(chunk_list)) f = functools.partial(gevent.spawn, self.__transfer_part) if self.__monitor_interval_s > 0: p = gevent.spawn(self.__mp_show_progress) g_list = map(f, gen) gevent.joinall(g_list) if self.__monitor_interval_s > 0: p.kill() p.join() except: mp.cancel_upload() raise else: mp.complete_upload() def start(self): if self.__filesize_b < _MIN_MULTIPART_SIZE_B: self.__standard_upload() else: self.__multipart_upload()
The output when called as a command will look like this:
$ python s3_parallel.py (access key) (secret key) (bucket name) (file-path) 2014-06-17 10:16:48,458 - __main__ - DEBUG - 0% 0% 0% 0% 0% 0% 0% 2014-06-17 10:16:58,459 - __main__ - DEBUG - 3% 3% 2% 2% 2% 1% 7% 2014-06-17 10:17:08,460 - __main__ - DEBUG - 6% 5% 5% 4% 5% 4% 14% 2014-06-17 10:17:18,461 - __main__ - DEBUG - 10% 7% 8% 8% 7% 6% 18% 2014-06-17 10:17:28,461 - __main__ - DEBUG - 16% 10% 13% 11% 10% 8% 26% 2014-06-17 10:17:38,462 - __main__ - DEBUG - 21% 14% 20% 15% 14% 12% 35% 2014-06-17 10:17:48,462 - __main__ - DEBUG - 26% 17% 27% 19% 19% 15% 48% 2014-06-17 10:17:58,463 - __main__ - DEBUG - 32% 20% 33% 24% 24% 18% 59% 2014-06-17 10:18:08,463 - __main__ - DEBUG - 37% 24% 39% 29% 28% 22% 70% 2014-06-17 10:18:18,464 - __main__ - DEBUG - 43% 28% 44% 34% 32% 26% 82% 2014-06-17 10:18:28,464 - __main__ - DEBUG - 48% 31% 50% 39% 36% 31% 91% 2014-06-17 10:18:38,465 - __main__ - DEBUG - 52% 35% 55% 44% 43% 36% 100% 2014-06-17 10:18:48,465 - __main__ - DEBUG - 60% 39% 63% 47% 47% 40% 100% 2014-06-17 10:18:58,466 - __main__ - DEBUG - 68% 44% 69% 53% 53% 45% 100% 2014-06-17 10:19:08,466 - __main__ - DEBUG - 77% 49% 75% 58% 57% 49% 100% 2014-06-17 10:19:18,467 - __main__ - DEBUG - 83% 54% 84% 65% 62% 52% 100% 2014-06-17 10:19:28,467 - __main__ - DEBUG - 88% 58% 90% 71% 69% 58% 100% 2014-06-17 10:19:38,468 - __main__ - DEBUG - 96% 61% 96% 77% 74% 63% 100% 2014-06-17 10:19:48,468 - __main__ - DEBUG - 100% 67% 100% 83% 83% 70% 100% 2014-06-17 10:19:58,469 - __main__ - DEBUG - 100% 73% 100% 93% 93% 76% 100% 2014-06-17 10:20:08,469 - __main__ - DEBUG - 100% 83% 100% 100% 100% 86% 100% 2014-06-17 10:20:18,470 - __main__ - DEBUG - 100% 95% 100% 100% 100% 100% 100%
Reblogged this on Dinesh Ram Kali..
LikeLike