Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
4b3f7c1
Initial commit
klimaj Mar 26, 2026
12eabe1
git pull origin main
klimaj Mar 28, 2026
62031c5
Define run_distributed_cluster_test_cases
klimaj Mar 28, 2026
7b71209
Refactor T900_distributed.py
klimaj Mar 28, 2026
9b8da5c
Clean up nonce cache error messages
klimaj Mar 29, 2026
cc352c8
Update imported module
klimaj Mar 29, 2026
52b1949
Initial commit
klimaj Mar 29, 2026
f50a5c6
Flush print
klimaj Mar 29, 2026
0d959c7
Rename test suites
klimaj Mar 29, 2026
cb7607a
Split unittest modules
klimaj Mar 29, 2026
f5ad4d1
Add assert preventing a scorefunction transition
klimaj Mar 29, 2026
7c23878
Testing scorefunction transition through intermediate PDB format seri…
klimaj Mar 29, 2026
fc5076d
Assert hashes are equal; enable verbose
klimaj Mar 30, 2026
91605dd
Clean up unit test subprocesses
klimaj Mar 30, 2026
a307f28
Adding unit test standard output streaming with early timeout
klimaj Mar 30, 2026
844b713
Clean up process groups
klimaj Mar 31, 2026
794cea1
Terminating process tree
klimaj Mar 31, 2026
1bd5f00
Disabling streaming in T917
klimaj Apr 1, 2026
cafea5f
Split out unit test; update scorefunction and sequences
klimaj Apr 1, 2026
4edae6e
Update sequence
klimaj Apr 1, 2026
f80690b
Tidy up print statements
klimaj Apr 1, 2026
dee811a
Split out unit test conditions
klimaj Apr 1, 2026
1b5154f
Track additional metrics and assert equal
klimaj Apr 1, 2026
80daaca
Serialize double precision floats with 15 digits
klimaj Apr 1, 2026
4adb2bf
git pull origin main
klimaj Apr 1, 2026
27816d4
Generate random bytes with Python-3.8 compatability
klimaj Apr 1, 2026
25aa4bc
Temporarily disable tests
klimaj Apr 1, 2026
db0e5dd
Disable verbose
klimaj Apr 2, 2026
fdd178a
Revert "Temporarily disable tests"
klimaj Apr 2, 2026
8297550
Move pprint_flush function
klimaj Apr 2, 2026
a0ec063
Update import to pprint_flush
klimaj Apr 2, 2026
27deb8e
Flush print statements
klimaj Apr 2, 2026
2df619f
Flush print statement
klimaj Apr 2, 2026
61dfcab
Flush print statements
klimaj Apr 2, 2026
5e1cb57
Set PYTHONUNBUFFERED=1
klimaj Apr 2, 2026
d8d24c0
Set PYTHONUNBUFFERED=1
klimaj Apr 2, 2026
93a1e55
Set PYTHONUNBUFFERED=1
klimaj Apr 2, 2026
32c514d
Set silent=True
klimaj Apr 2, 2026
bc9064c
Set PYTHONUNBUFFERED=1 for testing framework
klimaj Apr 2, 2026
8fae5be
Flush stdout
klimaj Apr 2, 2026
c8064c9
Add tearDown methods
klimaj Apr 2, 2026
f34ba7b
Improve test case docstrings
klimaj Apr 2, 2026
4f884a5
Set PYTHONUNBUFFERED=1
klimaj Apr 3, 2026
2c23415
Set PYTHONUNBUFFERED=1
klimaj Apr 3, 2026
372c45e
Flush print statement
klimaj Apr 3, 2026
67893bf
Fix ResourceWarning upon exception
klimaj Apr 3, 2026
920e307
Split args; send stderr to stdout; flush
klimaj Apr 3, 2026
c1fb56d
Revert "Set PYTHONUNBUFFERED=1"
klimaj Apr 3, 2026
0fc3dc8
Revert "Set PYTHONUNBUFFERED=1"
klimaj Apr 3, 2026
49348e9
Testing: refactor mfork in self-test.py
klimaj Apr 3, 2026
0b98f18
Clean up print statement
klimaj Apr 3, 2026
a4a3140
Clean up imports
klimaj Apr 3, 2026
0778591
Merge branch 'main' of github.com:klimaj/rosetta into unittests
klimaj Apr 3, 2026
882f39b
Increase default timeout
klimaj Apr 3, 2026
7c12acb
Fix test_name collision
klimaj Apr 3, 2026
d8cdbcd
Flush print statements
klimaj Apr 3, 2026
991f8d1
Testing non-blocking unit test parallelism with streaming output in s…
klimaj Apr 4, 2026
3edce68
Revert "Disabling streaming in T917"
klimaj Apr 4, 2026
17bc82d
Assign defaults globally
klimaj Apr 4, 2026
5b35de4
Split out streaming and non-streaming test cases
klimaj Apr 4, 2026
0acbe1a
Prioritize T900-series of tests
klimaj Apr 4, 2026
99fa168
Tweaking process_jobs to do timeout check first; cleanup
klimaj Apr 4, 2026
48c0285
git checkout d8cdbcdd4df54b950965be45609ad0c739837f5d -- source/src/p…
klimaj Apr 5, 2026
7fae7c7
Prioritize T900 tests
klimaj Apr 5, 2026
9966d8a
Fix file handle
klimaj Apr 6, 2026
1ad5469
git checkout origin/main -- source/src/python/PyRosetta/src/self-test.py
klimaj Apr 7, 2026
348e374
Disable streaming in grandchildren processes
klimaj Apr 7, 2026
219127d
Run unittests with subprocess.getstatusoutput
klimaj Apr 7, 2026
d5651a8
Consolidate distributed.cluster tests into one module for self-test.py
klimaj Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/src/python/PyRosetta/src/pyrosetta/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def init(options='-ex1 -ex2aro', extra_options='', set_logging_handler=None, not
v.extend(args)

if not silent:
print( version() )
print( version() , flush=True)
logger.info( "\n" + version() )
else:
logger.debug( "\n" + version() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"run",
"update_scores",
]
__version__: str = "4.1.0"
__version__: str = "4.1.1"


with warnings.catch_warnings() and suppress(NameError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ def export_init_file(
f.write(init_file_json)
print(
f"Exported PyRosettaCluster decoy output file '{output_file}' to "
+ f"PyRosetta initialization file: '{output_init_file}'"
+ f"PyRosetta initialization file: '{output_init_file}'",
flush=True,
)
else:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,50 +221,44 @@ def _cache_nonce(self, sealed: bytes) -> None:
"""
package = self.unpack(sealed)
if not isinstance(package, dict) or package.get("v", None) != 1:
_location = "Dask worker" if NonceCache._on_worker() else "head node"
_err_msg = (
"Invalid sealed package or version on {0} nonce cache! "
f"Invalid sealed package or version on {_location} nonce cache! "
+ f"Received: {type(package)!r}"
)
if NonceCache._on_worker():
raise ValueError(_err_msg.format("worker"))
else:
raise ValueError(_err_msg.format("host"))
raise ValueError(_err_msg)

_instance_id = package["a"] # `str`: PyRosettaCluster instance identifier/App
_data = package["d"] # `bytes`: Data bytestring
_mac = package["m"] # `bytes`: MAC (HMAC tag)
_nonce = package["n"] # `bytes`: Nonce
_version = package["v"] # `int`: Version

if _instance_id != self.instance_id:
raise ValueError("PyRosettaCluster instance identifier mismatch in sealed package.")

msg = self.pack([_instance_id, _data, _nonce, _version])
_expected_mac = hmac_digest(bytes(self.prk), msg)
if not compare_digest(_expected_mac, _mac):
_location = "Dask worker" if NonceCache._on_worker() else "head node"
_err_msg = (
"Task HMAC verification failed during nonce cache on {0}!\n"
f"Task HMAC verification failed during nonce cache on {_location}!\n"
+ f"Expected: {_expected_mac!r}\n"
+ f"Value: {_mac!r}\n"
)
if NonceCache._on_worker():
raise SystemExit(_err_msg.format("worker"))
else:
raise SystemExit(_err_msg.format("host"))
raise SystemExit(_err_msg)

if _nonce is not None:
if _nonce in self._seen:
# Replay protection
_location = "Dask worker" if NonceCache._on_worker() else "head node"
_err_msg = (
"PyRosettaCluster detected a repeat nonce on the {0} for the instance identifier "
f"PyRosettaCluster detected a repeat nonce on the {_location} for the instance identifier "
+ f"'{self.instance_id}', which might indicate a replay attack is in progress! "
+ "Exiting process for security. Please ensure that `PyRosettaCluster(security=True)` "
+ f"is enabled in future PyRosettaCluster simulations. Received: '{_nonce}'."
)
if NonceCache._on_worker():
raise SystemExit(_err_msg.format("worker"))
else:
raise SystemExit(_err_msg.format("host"))
raise SystemExit(_err_msg)

self._seen.add(_nonce)
self._order.append(_nonce)
while len(self._seen) > self._order.maxlen:
Expand All @@ -279,9 +273,9 @@ def _cache_nonce(self, sealed: bytes) -> None:
+ f"Example: {sorted(self._seen)[0]}"
)
if NonceCache._on_worker():
print(f"Remote worker ({get_worker().contact_address}) nonce cache: {_msg}")
print(f"Remote Dask worker ({get_worker().contact_address}) nonce cache: {_msg}")
else:
print(f"Local host nonce cache: {_msg}")
print(f"Local head node nonce cache: {_msg}")


@attr.s(kw_only=True, slots=False, frozen=False)
Expand Down
129 changes: 66 additions & 63 deletions source/src/python/PyRosetta/src/pyrosetta/distributed/cluster/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,73 +574,76 @@ def reproduce(
)

_tmp_dir = None
if isinstance(input_file, str):
if input_file.endswith((".init", ".init.bz2")):
_tmp_dir = tempfile.TemporaryDirectory(prefix="PyRosettaCluster_reproduce_")
default_init_from_file_kwargs = dict(
output_dir=os.path.join(_tmp_dir.name, "pyrosetta_init_input_files"),
skip_corrections=skip_corrections,
relative_paths=True,
dry_run=False,
max_decompressed_bytes=pow(2, 30), # 1 GiB
restore_rg_state=True,
database=None,
verbose=True,
set_logging_handler="logging",
notebook=None,
silent=False,
)
input_packed_pose, input_file = parse_init_file(
input_file,
input_packed_pose,
skip_corrections,
toolz.dicttoolz.merge(
default_init_from_file_kwargs,
init_from_file_kwargs if is_dict(init_from_file_kwargs) else {},
),
)
elif input_file.endswith((".pkl_pose", ".pkl_pose.bz2", ".b64_pose", ".b64_pose.bz2")):
if not was_init_called():
raise PyRosettaIsNotInitializedError(
"If providing a '.pkl_pose', '.pkl_pose.bz2', '.b64_pose', or '.b64_pose.bz2' file to the 'input_file' "
+ "keyword argument parameter, please ensure `pyrosetta.init()` or `pyrosetta.init_from_file()` has been "
+ "properly called (with the same residue type set as that used to generate the original '.pkl_pose', "
+ "'.pkl_pose.bz2', '.b64_pose', or '.b64_pose.bz2' file) before running the `reproduce()` function. "
+ "If an output '.init' file from the original simulation is available, it is recommended to run "
+ "`pyrosetta.init_from_file()` with that '.init' file before running the `reproduce()` function."
try:
if isinstance(input_file, str):
if input_file.endswith((".init", ".init.bz2")):
_tmp_dir = tempfile.TemporaryDirectory(prefix="PyRosettaCluster_reproduce_")
default_init_from_file_kwargs = dict(
output_dir=os.path.join(_tmp_dir.name, "pyrosetta_init_input_files"),
skip_corrections=skip_corrections,
relative_paths=True,
dry_run=False,
max_decompressed_bytes=pow(2, 30), # 1 GiB
restore_rg_state=True,
database=None,
verbose=True,
set_logging_handler="logging",
notebook=None,
silent=False,
)
input_packed_pose, input_file = parse_init_file(
input_file,
input_packed_pose,
skip_corrections,
toolz.dicttoolz.merge(
default_init_from_file_kwargs,
init_from_file_kwargs if is_dict(init_from_file_kwargs) else {},
),
)
elif input_file.endswith((".pkl_pose", ".pkl_pose.bz2", ".b64_pose", ".b64_pose.bz2")):
if not was_init_called():
raise PyRosettaIsNotInitializedError(
"If providing a '.pkl_pose', '.pkl_pose.bz2', '.b64_pose', or '.b64_pose.bz2' file to the 'input_file' "
+ "keyword argument parameter, please ensure `pyrosetta.init()` or `pyrosetta.init_from_file()` has been "
+ "properly called (with the same residue type set as that used to generate the original '.pkl_pose', "
+ "'.pkl_pose.bz2', '.b64_pose', or '.b64_pose.bz2' file) before running the `reproduce()` function. "
+ "If an output '.init' file from the original simulation is available, it is recommended to run "
+ "`pyrosetta.init_from_file()` with that '.init' file before running the `reproduce()` function."
)

PyRosettaCluster(
**toolz.dicttoolz.keyfilter(
lambda a: a not in ["client", "clients", "input_packed_pose"],
toolz.dicttoolz.merge(
get_instance_kwargs(
input_file=input_file,
scorefile=scorefile,
decoy_name=decoy_name,
skip_corrections=skip_corrections,
with_metadata_kwargs=False,
PyRosettaCluster(
**toolz.dicttoolz.keyfilter(
lambda a: a not in ["client", "clients", "input_packed_pose"],
toolz.dicttoolz.merge(
get_instance_kwargs(
input_file=input_file,
scorefile=scorefile,
decoy_name=decoy_name,
skip_corrections=skip_corrections,
with_metadata_kwargs=False,
),
parse_instance_kwargs(instance_kwargs),
),
parse_instance_kwargs(instance_kwargs),
),
),
client=parse_client(client),
clients=clients,
input_packed_pose=input_packed_pose,
).distribute(
protocols=get_protocols(
protocols=protocols,
input_file=input_file,
scorefile=scorefile,
decoy_name=decoy_name,
),
clients_indices=clients_indices,
resources=resources,
priorities=None,
retries=retries,
)
if isinstance(_tmp_dir, tempfile.TemporaryDirectory):
_tmp_dir.cleanup()
client=parse_client(client),
clients=clients,
input_packed_pose=input_packed_pose,
).distribute(
protocols=get_protocols(
protocols=protocols,
input_file=input_file,
scorefile=scorefile,
decoy_name=decoy_name,
),
clients_indices=clients_indices,
resources=resources,
priorities=None,
retries=retries,
)

finally:
if _tmp_dir:
_tmp_dir.cleanup()


def produce(**kwargs: Any) -> Optional[NoReturn]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import argparse
import os
import sys
import unittest

def flatten(suite):
Expand All @@ -19,7 +20,8 @@ def flatten(suite):
def main(root):
suite = unittest.defaultTestLoader.discover(root)
test_cases = (test_case.id() for test_case in flatten(suite))
print(*test_cases, sep="\n")
print(*test_cases, sep="\n", flush=True)
sys.stdout.flush()

# if __name__ == "__main__":
# parser = argparse.ArgumentParser()
Expand Down
Loading