Skip to content

Commit b2e9062

Browse files
Support for multipart upload
1 parent b156b9a commit b2e9062

1 file changed

Lines changed: 174 additions & 72 deletions

File tree

solvebio/resource/object.py

Lines changed: 174 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -422,12 +422,13 @@ def create_shortcut(self, shortcut_full_path, **kwargs):
422422

423423
return shortcut
424424

425+
425426
@classmethod
426427
def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
427428
from solvebio import Vault
428429
from solvebio import Object
429430

430-
_client = kwargs.pop('client', None) or cls._client or client
431+
_client = kwargs.pop("client", None) or cls._client or client
431432

432433
local_path = os.path.expanduser(local_path)
433434

@@ -442,79 +443,88 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
442443
mimetype = mime_tuple[1] if mime_tuple[1] else mime_tuple[0]
443444
# Get file size
444445
size = os.path.getsize(local_path)
445-
if size == 0:
446-
print('WARNING: skipping empty object: {}'.format(local_path))
447-
return False
448446

449447
# Check if object exists already and compare md5sums
450448
full_path, path_dict = Object.validate_full_path(
451-
os.path.join('{}:{}'.format(vault.full_path, remote_path),
452-
os.path.basename(local_path)), client=_client)
449+
os.path.join(
450+
"{}:{}".format(vault.full_path, remote_path),
451+
os.path.basename(local_path),
452+
),
453+
client=_client,
454+
)
453455
try:
454456
obj = cls.get_by_full_path(full_path, client=_client)
455457
if not obj.is_file:
456-
print('WARNING: A {} currently exists at {}'
457-
.format(obj.object_type, full_path))
458+
print(
459+
"WARNING: A {} currently exists at {}".format(
460+
obj.object_type, full_path
461+
)
462+
)
458463
else:
459464
# Check against md5sum of remote file
460465
if obj.md5 == local_md5:
461-
print('WARNING: File {} (md5sum {}) already exists, '
462-
'not uploading'.format(full_path, local_md5))
466+
print(
467+
"WARNING: File {} (md5sum {}) already exists, "
468+
"not uploading".format(full_path, local_md5)
469+
)
463470
return obj
464471
else:
465-
if kwargs.get('archive_folder'):
466-
obj._archive(kwargs['archive_folder'])
467-
else:
468-
print('WARNING: File {} exists on SolveBio with different '
469-
'md5sum (local: {} vs remote: {}) Uploading anyway, '
470-
'but not overwriting.'
471-
.format(full_path, local_md5, obj.md5))
472+
print(
473+
"WARNING: File {} exists on SolveBio with different "
474+
"md5sum (local: {} vs remote: {}) Uploading anyway, "
475+
"but not overwriting.".format(full_path, local_md5, obj.md5)
476+
)
472477
except NotFoundError:
473-
obj = None
474478
pass
475479

476480
# Lookup parent object
477-
if kwargs.get('follow_shortcuts') and obj and obj.is_file:
478-
vault_id = obj.vault_id
479-
parent_object_id = obj.parent_object_id
480-
filename = obj.filename
481+
if path_dict["parent_path"] == "/":
482+
parent_object_id = None
481483
else:
482-
vault_id = vault.id
483-
filename = os.path.basename(local_path)
484-
if path_dict['parent_path'] == '/':
485-
parent_object_id = None
486-
else:
487-
parent_obj = Object.get_by_full_path(
488-
path_dict['parent_full_path'], assert_type='folder',
489-
client=_client
490-
)
491-
parent_object_id = parent_obj.id
484+
parent_obj = Object.get_by_full_path(
485+
path_dict["parent_full_path"], assert_type="folder", client=_client
486+
)
487+
parent_object_id = parent_obj.id
492488

493-
description = kwargs.get('description')
489+
description = kwargs.get("description")
494490

495491
# Create the file, and upload it to the Upload URL
496492
obj = Object.create(
497-
vault_id=vault_id,
493+
vault_id=vault.id,
498494
parent_object_id=parent_object_id,
499-
object_type='file',
500-
filename=filename,
495+
object_type="file",
496+
filename=os.path.basename(local_path),
501497
md5=local_md5,
502498
mimetype=mimetype,
503499
size=size,
504500
description=description,
505-
tags=kwargs.get('tags', []) or [],
506-
client=_client
501+
tags=kwargs.get("tags", []) or [],
502+
client=_client,
507503
)
508-
print('Notice: File created for {0} at {1}'.format(local_path,
509-
obj.path))
510-
print('Notice: Upload initialized')
511504

505+
print(
506+
"Notice: File created for {0} at {1}".format(local_path, obj.path)
507+
)
508+
print("Notice: Upload initialized")
509+
510+
# Check if multipart upload is needed
511+
if hasattr(obj, "is_multipart") and obj.is_multipart:
512+
return cls._upload_multipart(obj, local_path, local_md5, **kwargs)
513+
else:
514+
return cls._upload_single_file(
515+
obj, local_path, local_md5, mimetype, size, **kwargs
516+
)
517+
518+
@classmethod
519+
def _upload_single_file(
520+
cls, obj, local_path, local_md5, mimetype, size, **kwargs
521+
):
512522
upload_url = obj.upload_url
513523

514524
headers = {
515-
'Content-MD5': base64.b64encode(binascii.unhexlify(local_md5)),
516-
'Content-Type': mimetype,
517-
'Content-Length': str(size),
525+
"Content-MD5": base64.b64encode(binascii.unhexlify(local_md5)),
526+
"Content-Type": mimetype,
527+
"Content-Length": str(size),
518528
}
519529

520530
# Use a session with a retry policy to handle connection errors.
@@ -524,45 +534,137 @@ def upload_file(cls, local_path, remote_path, vault_full_path, **kwargs):
524534
total=max_retries,
525535
read=max_retries,
526536
connect=max_retries,
527-
backoff_factor=2,
528-
status_forcelist=(500, 502, 503, 504, 400),
529-
allowed_methods=["HEAD", "OPTIONS", "GET", "PUT", "POST"]
537+
backoff_factor=0.3,
538+
status_forcelist=(500, 502, 504, 400),
530539
)
531540
session.mount(
532-
'https://', requests.adapters.HTTPAdapter(max_retries=retry))
533-
534-
# Handle retries when upload fails due to an exception such as SSLError
535-
n_retries = 0
536-
while True:
537-
try:
538-
upload_resp = session.put(upload_url,
539-
data=open(local_path, 'rb'),
540-
headers=headers)
541-
except Exception as e:
542-
if n_retries == max_retries:
543-
obj.delete(force=True)
544-
raise FileUploadError(str(e))
545-
546-
n_retries += 1
547-
print('WARNING: Retrying ({}/{}) failed upload for {}: {}'.format(
548-
n_retries, max_retries, local_path, e))
549-
time.sleep(2 * n_retries)
550-
else:
551-
break
541+
"https://", requests.adapters.HTTPAdapter(max_retries=retry)
542+
)
543+
upload_resp = session.put(
544+
upload_url, data=open(local_path, "rb"), headers=headers
545+
)
552546

553547
if upload_resp.status_code != 200:
554-
print('WARNING: Upload status code for {0} was {1}'.format(
555-
local_path, upload_resp.status_code
556-
))
548+
print(
549+
"WARNING: Upload status code for {0} was {1}".format(
550+
local_path, upload_resp.status_code
551+
)
552+
)
557553
# Clean up the failed upload
558554
obj.delete(force=True)
559555
raise FileUploadError(upload_resp.content)
560556
else:
561-
print('Notice: Successfully uploaded {0} to {1}'.format(local_path,
562-
obj.path))
557+
print(
558+
"Notice: Successfully uploaded {0} to {1}".format(
559+
local_path, obj.path
560+
)
561+
)
563562

564563
return obj
565564

565+
@classmethod
566+
def _upload_multipart(cls, obj, local_path, local_md5, **kwargs):
567+
"""Handle multipart upload for larger files"""
568+
_client = kwargs.get("client") or cls._client or client
569+
print(f"Notice: Upload ID {obj.upload_id}")
570+
try:
571+
# Get presigned URLs from the object
572+
presigned_urls = obj.presigned_urls
573+
574+
print(
575+
"Notice: Starting multipart upload with {} parts...".format(
576+
len(presigned_urls)
577+
)
578+
)
579+
580+
# Step 2: Upload each part using presigned URLs
581+
parts = []
582+
with open(local_path, "rb") as f:
583+
for part_info in presigned_urls:
584+
part_number = part_info.part_number
585+
start_byte = part_info.start_byte
586+
end_byte = part_info.end_byte
587+
part_size = part_info.size
588+
upload_url = part_info.upload_url
589+
590+
print(
591+
"Notice: Uploading part {}/{}... (bytes {}-{})".format(
592+
part_number, len(presigned_urls), start_byte, end_byte
593+
)
594+
)
595+
596+
# Seek to start position and read the exact part size
597+
f.seek(start_byte)
598+
chunk_data = f.read(part_size)
599+
if not chunk_data:
600+
break
601+
602+
# Upload part with retry logic
603+
session = requests.Session()
604+
retry = Retry(
605+
total=3,
606+
backoff_factor=2,
607+
status_forcelist=(500, 502, 503, 504),
608+
allowed_methods=["PUT"],
609+
)
610+
session.mount(
611+
"https://", requests.adapters.HTTPAdapter(max_retries=retry)
612+
)
613+
614+
headers = {
615+
"Content-Length": str(len(chunk_data)),
616+
}
617+
618+
upload_resp = session.put(
619+
upload_url, data=chunk_data, headers=headers
620+
)
621+
622+
if upload_resp.status_code != 200:
623+
raise FileUploadError(
624+
"Failed to upload part {}: {}".format(
625+
part_number, upload_resp.content
626+
)
627+
)
628+
629+
# Get ETag from response
630+
etag = upload_resp.headers.get("ETag", "").strip('"')
631+
parts.append({"part_number": part_number, "etag": etag})
632+
633+
# Step 3: Complete multipart upload
634+
print("Notice: Completing multipart upload....")
635+
complete_data = {
636+
"upload_id": obj.upload_id,
637+
"physical_object_id": obj.upload_key,
638+
"parts": parts,
639+
}
640+
641+
print(f"Notice: {complete_data}")
642+
643+
complete_resp = _client.post("/v2/complete_multi_part", complete_data)
644+
645+
if "message" in complete_resp:
646+
print(
647+
"Notice: Successfully uploaded {0} to {1} with multipart upload.".format(
648+
local_path, obj.path
649+
)
650+
)
651+
return obj
652+
else:
653+
raise Exception(complete_resp)
654+
655+
except Exception as e:
656+
# Clean up failed upload - best effort cleanup
657+
try:
658+
_client.delete(
659+
obj.instance_url() + "/multipart-upload",
660+
{},
661+
)
662+
except Exception:
663+
pass # Best effort cleanup
664+
665+
obj.delete(force=True)
666+
raise FileUploadError("Multipart upload failed: {}".format(str(e)))
667+
566668
def _object_list_helper(self, **params):
567669
"""Helper method to get objects within"""
568670

0 commit comments

Comments
 (0)