diff --git a/CodeEntropy/config/argparse.py b/CodeEntropy/config/argparse.py index 98ff4432..72670bc6 100644 --- a/CodeEntropy/config/argparse.py +++ b/CodeEntropy/config/argparse.py @@ -25,7 +25,7 @@ import logging import os from dataclasses import dataclass -from typing import Any, Dict, Optional, Set +from typing import Any import yaml @@ -48,11 +48,11 @@ class ArgSpec: help: str default: Any = None type: Any = None - action: Optional[str] = None - nargs: Optional[str] = None + action: str | None = None + nargs: str | None = None -ARG_SPECS: Dict[str, ArgSpec] = { +ARG_SPECS: dict[str, ArgSpec] = { "top_traj_file": ArgSpec( type=str, nargs="+", @@ -145,6 +145,12 @@ class ArgSpec: default=True, help="Use bonded axes to rotate forces for UA level vibrational entropies", ), + "search_type": ArgSpec( + type=str, + default="RAD", + help="Type of neighbor search to use." + "Default RAD; grid search is also available", + ), } @@ -159,7 +165,7 @@ class ConfigResolver: - validating trajectory-related numeric parameters """ - def __init__(self, arg_specs: Optional[Dict[str, ArgSpec]] = None) -> None: + def __init__(self, arg_specs: dict[str, ArgSpec] | None = None) -> None: """Initialize the manager. Args: @@ -168,7 +174,7 @@ def __init__(self, arg_specs: Optional[Dict[str, ArgSpec]] = None) -> None: """ self._arg_specs = dict(arg_specs or ARG_SPECS) - def load_config(self, directory_path: str) -> Dict[str, Any]: + def load_config(self, directory_path: str) -> dict[str, Any]: """Load the first YAML config file found in a directory. The current behavior matches your existing workflow: @@ -188,7 +194,7 @@ def load_config(self, directory_path: str) -> Dict[str, Any]: config_path = yaml_files[0] try: - with open(config_path, "r", encoding="utf-8") as file: + with open(config_path, encoding="utf-8") as file: config = yaml.safe_load(file) or {"run1": {}} logger.info("Loaded configuration from: %s", config_path) return config @@ -253,7 +259,7 @@ def build_parser(self) -> argparse.ArgumentParser: ) continue - kwargs: Dict[str, Any] = {} + kwargs: dict[str, Any] = {} if spec.type is not None: kwargs["type"] = spec.type if spec.default is not None: @@ -266,7 +272,7 @@ def build_parser(self) -> argparse.ArgumentParser: return parser def resolve( - self, args: argparse.Namespace, run_config: Optional[Dict[str, Any]] + self, args: argparse.Namespace, run_config: dict[str, Any] | None ) -> argparse.Namespace: """Merge CLI arguments with YAML configuration and adjust logging level. @@ -306,8 +312,8 @@ def resolve( @staticmethod def _detect_cli_overrides( - args_dict: Dict[str, Any], default_dict: Dict[str, Any] - ) -> Set[str]: + args_dict: dict[str, Any], default_dict: dict[str, Any] + ) -> set[str]: """Detect which args were explicitly overridden in the CLI. Args: @@ -322,8 +328,8 @@ def _detect_cli_overrides( def _apply_yaml_defaults( self, args: argparse.Namespace, - run_config: Dict[str, Any], - cli_provided: Set[str], + run_config: dict[str, Any], + cli_provided: set[str], ) -> None: """Apply YAML values onto args for keys not provided by CLI. @@ -336,7 +342,7 @@ def _apply_yaml_defaults( if yaml_value is None or key in cli_provided: continue if key in self._arg_specs: - logger.debug("Using YAML value for %s: %s", key, yaml_value) + logger.debug(f"Using YAML value for {key}: {yaml_value}") setattr(args, key, yaml_value) def _ensure_defaults(self, args: argparse.Namespace) -> None: diff --git a/CodeEntropy/config/runtime.py b/CodeEntropy/config/runtime.py index f807261e..b55bcbe9 100644 --- a/CodeEntropy/config/runtime.py +++ b/CodeEntropy/config/runtime.py @@ -18,7 +18,7 @@ import logging import os import pickle -from typing import Any, Dict, Optional +from typing import Any import MDAnalysis as mda import requests @@ -112,7 +112,7 @@ def create_job_folder() -> str: return new_folder_path - def load_citation_data(self) -> Optional[Dict[str, Any]]: + def load_citation_data(self) -> dict[str, Any] | None: """Load CITATION.cff from GitHub. If the request fails (offline, blocked, etc.), returns None. @@ -335,7 +335,7 @@ def _build_universe( kcal_units = args.kcal_force_units if forcefile is None: - logger.debug("Loading Universe with %s and %s", tprfile, trrfile) + logger.debug(f"Loading Universe with {tprfile} and {trrfile}") return mda.Universe(tprfile, trrfile, format=fileformat) return universe_operations.merge_forces( diff --git a/CodeEntropy/core/logging.py b/CodeEntropy/core/logging.py index 4a0bfac4..740a53f2 100644 --- a/CodeEntropy/core/logging.py +++ b/CodeEntropy/core/logging.py @@ -16,7 +16,6 @@ import logging import os -from typing import Dict, Optional from rich.console import Console from rich.logging import RichHandler @@ -55,7 +54,7 @@ class LoggingConfig: handlers: Mapping of handler name to handler instance. """ - _console: Optional[Console] = None + _console: Console | None = None @classmethod def get_console(cls) -> Console: @@ -80,7 +79,7 @@ def __init__(self, folder: str, level: int = logging.INFO) -> None: self.level = level self.console = self.get_console() - self.handlers: Dict[str, logging.Handler] = {} + self.handlers: dict[str, logging.Handler] = {} self._setup_handlers() diff --git a/CodeEntropy/entropy/configurational.py b/CodeEntropy/entropy/configurational.py index c3b908c8..804a718b 100644 --- a/CodeEntropy/entropy/configurational.py +++ b/CodeEntropy/entropy/configurational.py @@ -10,46 +10,15 @@ from __future__ import annotations import logging -from dataclasses import dataclass -from typing import Any, Optional +from typing import Any import numpy as np logger = logging.getLogger(__name__) -@dataclass(frozen=True) -class ConformationConfig: - """Configuration for assigning conformational states from a dihedral. - - Attributes: - bin_width: Histogram bin width in degrees for peak detection. - start: Inclusive start frame index for trajectory slicing. - end: Exclusive end frame index for trajectory slicing. - step: Stride for trajectory slicing (must be positive). - """ - - bin_width: int - start: int - end: int - step: int - - class ConformationalEntropy: - """Assign dihedral conformational states and compute conformational entropy. - - This class contains two independent responsibilities: - 1) `assign_conformation`: Map a single dihedral angle time series to discrete - state labels by detecting histogram peaks and assigning the nearest peak. - 2) `conformational_entropy_calculation`: Compute Shannon entropy of the - state distribution (in J/mol/K). - - Notes: - `number_frames` is accepted by `conformational_entropy_calculation` for - compatibility with calling sites that track frame counts, but the entropy - is computed from the observed state counts (i.e., `len(states)`), which is - the correct normalization for the sampled distribution. - """ + """Compute conformational entropy from states information.""" _GAS_CONST: float = 8.3144598484848 @@ -61,69 +30,7 @@ def __init__(self) -> None: """ pass - def assign_conformation( - self, - data_container: Any, - dihedral: Any, - number_frames: int, - bin_width: int, - start: int, - end: int, - step: int, - ) -> np.ndarray: - """Assign discrete conformational states for a single dihedral. - - The dihedral angle time series is: - 1) Collected across the trajectory slice [start:end:step]. - 2) Converted to [0, 360) degrees. - 3) Histogrammed using `bin_width`. - 4) Peaks are identified as bins with locally maximal population. - 5) Each frame is assigned the index of the nearest peak. - - Args: - data_container: MDAnalysis Universe/AtomGroup with a trajectory. - dihedral: Object providing `value()` for the current frame dihedral. - number_frames: Provided for call-site compatibility; not used for sizing. - bin_width: Histogram bin width in degrees. - start: Inclusive start frame index. - end: Exclusive end frame index. - step: Stride for trajectory slicing. - - Returns: - Array of integer state labels of length equal to the trajectory slice. - Returns an empty array if the slice is empty. - - Raises: - ValueError: If `bin_width` or `step` are invalid. - """ - _ = number_frames - - config = ConformationConfig( - bin_width=int(bin_width), - start=int(start), - end=int(end), - step=int(step), - ) - self._validate_assignment_config(config) - - traj_slice = data_container.trajectory[config.start : config.end : config.step] - n_slice = len(traj_slice) - if n_slice <= 0: - return np.array([], dtype=int) - - phi = self._collect_dihedral_angles(traj_slice, dihedral) - peak_values = self._find_histogram_peaks(phi, config.bin_width) - - if peak_values.size == 0: - return np.zeros(n_slice, dtype=int) - - states = self._assign_nearest_peaks(phi, peak_values) - logger.debug("Final conformations: %s", states) - return states - - def conformational_entropy_calculation( - self, states: Any, number_frames: int - ) -> float: + def conformational_entropy_calculation(self, states: Any) -> float: """Compute conformational entropy for a sequence of state labels. Entropy is computed as: @@ -139,8 +46,6 @@ def conformational_entropy_calculation( Returns: float: Conformational entropy in J/mol/K. """ - _ = number_frames - arr = self._to_1d_array(states) if arr is None or arr.size == 0: return 0.0 @@ -154,96 +59,11 @@ def conformational_entropy_calculation( probs = probs[probs > 0.0] s_conf = -self._GAS_CONST * float(np.sum(probs * np.log(probs))) - logger.debug("Total conformational entropy: %s", s_conf) + logger.debug(f"Total conformational entropy: {s_conf}") return s_conf @staticmethod - def _validate_assignment_config(config: ConformationConfig) -> None: - """Validate conformation assignment configuration. - - Args: - config: Assignment configuration. - - Raises: - ValueError: If configuration values are invalid. - """ - if config.step <= 0: - raise ValueError("step must be a positive integer") - if config.bin_width <= 0 or config.bin_width > 360: - raise ValueError("bin_width must be in the range (0, 360]") - if 360 % config.bin_width != 0: - logger.warning( - "bin_width=%s does not evenly divide 360; histogram bins will be " - "uneven.", - config.bin_width, - ) - - @staticmethod - def _collect_dihedral_angles(traj_slice: Any, dihedral: Any) -> np.ndarray: - """Collect dihedral angles for each frame in the trajectory slice. - - Args: - traj_slice: Slice of a trajectory iterable where iterating advances frames. - dihedral: Object with `value()` returning the dihedral in degrees. - - Returns: - Array of dihedral values mapped into [0, 360). - """ - phi = np.zeros(len(traj_slice), dtype=float) - for i, _ts in enumerate(traj_slice): - value = float(dihedral.value()) - if value < 0.0: - value += 360.0 - phi[i] = value - return phi - - @staticmethod - def _find_histogram_peaks(phi: np.ndarray, bin_width: int) -> np.ndarray: - """Identify peak bin centers from a histogram of dihedral angles. - - A peak is defined as a bin whose population is greater than or equal to - its immediate neighbors (with circular handling at the final bin). - - Args: - phi: Dihedral angles in degrees, in [0, 360). - bin_width: Histogram bin width in degrees. - - Returns: - 1D array of peak bin center values (degrees). Empty if no peaks found. - """ - number_bins = int(360 / bin_width) - popul, bin_edges = np.histogram(phi, bins=number_bins, range=(0.0, 360.0)) - bin_centers = 0.5 * (bin_edges[:-1] + bin_edges[1:]) - - peaks: list[float] = [] - for idx in range(number_bins): - if popul[idx] == 0: - continue - - left = popul[idx - 1] if idx > 0 else popul[number_bins - 1] - right = popul[idx + 1] if idx < number_bins - 1 else popul[0] - - if popul[idx] >= left and popul[idx] >= right: - peaks.append(float(bin_centers[idx])) - - return np.asarray(peaks, dtype=float) - - @staticmethod - def _assign_nearest_peaks(phi: np.ndarray, peak_values: np.ndarray) -> np.ndarray: - """Assign each phi value to the index of its nearest peak. - - Args: - phi: Dihedral angles in degrees. - peak_values: Peak centers (degrees). - - Returns: - Integer state labels aligned with `phi`. - """ - distances = np.abs(phi[:, None] - peak_values[None, :]) - return np.argmin(distances, axis=1).astype(int) - - @staticmethod - def _to_1d_array(states: Any) -> Optional[np.ndarray]: + def _to_1d_array(states: Any) -> np.ndarray | None: """Convert a state sequence into a 1D numpy array. Args: diff --git a/CodeEntropy/entropy/graph.py b/CodeEntropy/entropy/graph.py index d243907d..be5ca9e3 100644 --- a/CodeEntropy/entropy/graph.py +++ b/CodeEntropy/entropy/graph.py @@ -15,18 +15,19 @@ import logging from dataclasses import dataclass -from typing import Any, Dict +from typing import Any import networkx as nx from CodeEntropy.entropy.nodes.aggregate import AggregateEntropyNode from CodeEntropy.entropy.nodes.configurational import ConfigurationalEntropyNode +from CodeEntropy.entropy.nodes.orientational import OrientationalEntropyNode from CodeEntropy.entropy.nodes.vibrational import VibrationalEntropyNode logger = logging.getLogger(__name__) -SharedData = Dict[str, Any] +SharedData = dict[str, Any] @dataclass(frozen=True) @@ -57,9 +58,9 @@ class EntropyGraph: def __init__(self) -> None: """Initialize an empty entropy graph.""" self._graph: nx.DiGraph = nx.DiGraph() - self._nodes: Dict[str, Any] = {} + self._nodes: dict[str, Any] = {} - def build(self) -> "EntropyGraph": + def build(self) -> EntropyGraph: """Populate the graph with the standard entropy workflow. Returns: @@ -68,10 +69,15 @@ def build(self) -> "EntropyGraph": specs = ( NodeSpec("vibrational_entropy", VibrationalEntropyNode()), NodeSpec("configurational_entropy", ConfigurationalEntropyNode()), + NodeSpec("orientational_entropy", OrientationalEntropyNode()), NodeSpec( "aggregate_entropy", AggregateEntropyNode(), - deps=("vibrational_entropy", "configurational_entropy"), + deps=( + "vibrational_entropy", + "configurational_entropy", + "orientational_entropy", + ), ), ) @@ -82,7 +88,7 @@ def build(self) -> "EntropyGraph": def execute( self, shared_data: SharedData, *, progress: object | None = None - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Execute the entropy graph in topological order. Nodes are executed in dependency order (topological sort). Each node reads @@ -105,7 +111,7 @@ def execute( Raises: KeyError: If a node name is missing from the internal node registry. """ - results: Dict[str, Any] = {} + results: dict[str, Any] = {} for node_name in nx.topological_sort(self._graph): node = self._nodes[node_name] diff --git a/CodeEntropy/entropy/nodes/aggregate.py b/CodeEntropy/entropy/nodes/aggregate.py index 6855ab12..bcf468ff 100644 --- a/CodeEntropy/entropy/nodes/aggregate.py +++ b/CodeEntropy/entropy/nodes/aggregate.py @@ -2,10 +2,11 @@ from __future__ import annotations +from collections.abc import Mapping, MutableMapping from dataclasses import dataclass -from typing import Any, Dict, Mapping, MutableMapping, Optional +from typing import Any -EntropyResults = Dict[str, Any] +EntropyResults = dict[str, Any] @dataclass(frozen=True, slots=True) @@ -25,11 +26,12 @@ class AggregateEntropyNode: vibrational_key: str = "vibrational_entropy" configurational_key: str = "configurational_entropy" + orientational_key: str = "orientational_entropy" output_key: str = "entropy_results" def run( self, shared_data: MutableMapping[str, Any], **_: Any - ) -> Dict[str, EntropyResults]: + ) -> dict[str, EntropyResults]: """Run the aggregation step. Args: @@ -69,10 +71,13 @@ def _collect_entropy_results( "configurational_entropy": self._get_optional( shared_data, self.configurational_key ), + "orientational_entropy": self._get_optional( + shared_data, self.orientational_key + ), } @staticmethod - def _get_optional(shared_data: Mapping[str, Any], key: str) -> Optional[Any]: + def _get_optional(shared_data: Mapping[str, Any], key: str) -> Any | None: """Fetch an optional value from shared data. Args: diff --git a/CodeEntropy/entropy/nodes/configurational.py b/CodeEntropy/entropy/nodes/configurational.py index a6a8c483..720db9c9 100644 --- a/CodeEntropy/entropy/nodes/configurational.py +++ b/CodeEntropy/entropy/nodes/configurational.py @@ -3,16 +3,9 @@ from __future__ import annotations import logging +from collections.abc import Iterable, Mapping, MutableMapping, Sequence from typing import ( Any, - Dict, - Iterable, - Mapping, - MutableMapping, - Optional, - Sequence, - Tuple, - Union, ) import numpy as np @@ -23,8 +16,8 @@ GroupId = int ResidueId = int -StateKey = Tuple[GroupId, ResidueId] -StateSequence = Union[Sequence[Any], np.ndarray] +StateKey = tuple[GroupId, ResidueId] +StateSequence = Sequence[Any] | np.ndarray class ConfigurationalEntropyNode: @@ -36,7 +29,7 @@ class ConfigurationalEntropyNode: Results are written back into ``shared_data["configurational_entropy"]``. """ - def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> Dict[str, Any]: + def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> dict[str, Any]: """Execute configurational entropy calculation. Args: @@ -58,7 +51,7 @@ def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> Dict[str, Any] ce = self._build_entropy_engine() fragments = universe.atoms.fragments - results: Dict[int, Dict[str, float]] = {} + results: dict[int, dict[str, float]] = {} for group_id, mol_ids in groups.items(): results[group_id] = {"ua": 0.0, "res": 0.0, "poly": 0.0} @@ -104,9 +97,9 @@ def _build_entropy_engine(self) -> ConformationalEntropy: def _get_state_containers( self, shared_data: Mapping[str, Any] - ) -> Tuple[ - Dict[StateKey, StateSequence], - Union[Dict[GroupId, StateSequence], Sequence[Optional[StateSequence]]], + ) -> tuple[ + dict[StateKey, StateSequence], + dict[GroupId, StateSequence] | Sequence[StateSequence | None], ]: """Retrieve conformational state containers. @@ -144,7 +137,7 @@ def _compute_ua_entropy_for_group( residues: Iterable[Any], states_ua: Mapping[StateKey, StateSequence], n_frames: int, - reporter: Optional[Any], + reporter: Any | None, ) -> float: """Compute united atom entropy for a group. @@ -163,7 +156,7 @@ def _compute_ua_entropy_for_group( for res_id, res in enumerate(residues): states = states_ua.get((group_id, res_id)) - val = self._entropy_or_zero(ce, states, n_frames) + val = self._entropy_or_zero(ce, states) total += val if reporter is not None: @@ -186,29 +179,28 @@ def _compute_residue_entropy_for_group( *, ce: ConformationalEntropy, group_id: int, - states_res: Union[Dict[int, StateSequence], Sequence[Optional[StateSequence]]], + states_res: dict[int, StateSequence] | Sequence[StateSequence | None], n_frames: int, ) -> float: """Compute residue-level entropy for a group.""" group_states = self._get_group_states(states_res, group_id) - return self._entropy_or_zero(ce, group_states, n_frames) + return self._entropy_or_zero(ce, group_states) def _entropy_or_zero( self, ce: ConformationalEntropy, - states: Optional[StateSequence], - n_frames: int, + states: StateSequence | None, ) -> float: """Return entropy value or zero if no state data exists.""" if not self._has_state_data(states): return 0.0 - return float(ce.conformational_entropy_calculation(states, n_frames)) + return float(ce.conformational_entropy_calculation(states)) @staticmethod def _get_group_states( - states_res: Union[Dict[int, StateSequence], Sequence[Optional[StateSequence]]], + states_res: dict[int, StateSequence] | Sequence[StateSequence | None], group_id: int, - ) -> Optional[StateSequence]: + ) -> StateSequence | None: """Fetch group states from container.""" if isinstance(states_res, dict): return states_res.get(group_id) @@ -217,7 +209,7 @@ def _get_group_states( return None @staticmethod - def _has_state_data(states: Optional[StateSequence]) -> bool: + def _has_state_data(states: StateSequence | None) -> bool: """Check if state container has usable data.""" if states is None: return False diff --git a/CodeEntropy/entropy/nodes/orientational.py b/CodeEntropy/entropy/nodes/orientational.py new file mode 100644 index 00000000..b3e387e5 --- /dev/null +++ b/CodeEntropy/entropy/nodes/orientational.py @@ -0,0 +1,84 @@ +"""Node for computing orientational entropy from neighbors.""" + +from __future__ import annotations + +import logging +from collections.abc import MutableMapping, Sequence +from typing import ( + Any, +) + +import numpy as np + +from CodeEntropy.entropy.orientational import OrientationalEntropy + +logger = logging.getLogger(__name__) + +GroupId = int +ResidueId = int +StateKey = tuple[GroupId, ResidueId] +StateSequence = Sequence[Any] | np.ndarray + + +class OrientationalEntropyNode: + """Compute orientational entropy using precomputed neighbors and symmetry. + + This node reads number of neighbors and symmetry from ``shared_data`` and + computes entropy contributions at the molecular (highest) level. + + Results are written back into ``shared_data["orientational_entropy"]``. + """ + + def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> dict[str, Any]: + """Execute orientational entropy calculation. + + Args: + shared_data: Shared workflow state dictionary. + + Returns: + Dictionary containing orientational entropy results. + + Raises: + KeyError: If required keys are missing. + """ + groups = shared_data["groups"] + levels = shared_data["levels"] + neighbors = shared_data["neighbors"] + symmetry_number = shared_data["symmetry_number"] + linear = shared_data["linear"] + reporter = shared_data.get("reporter") + + oe = self._build_entropy_engine() + + results: dict[int, float] = {} + + for group_id, mol_ids in groups.items(): + results[group_id] = 0 + if not mol_ids: + continue + rep_mol_id = mol_ids[0] + highest_level = levels[rep_mol_id][-1] + + neighbor = neighbors[group_id] + symmetry = symmetry_number[group_id] + line = linear[group_id] + + result_value = oe.calculate( + neighbor, + symmetry, + line, + ) + results[group_id] = result_value + + if reporter is not None: + reporter.add_results_data( + group_id, highest_level, "Orientational", result_value + ) + + shared_data["orientational_entropy"] = results + + return {"orientational_entropy": results} + + def _build_entropy_engine(self) -> OrientationalEntropy: + """Create the entropy calculation engine.""" + return OrientationalEntropy() diff --git a/CodeEntropy/entropy/nodes/vibrational.py b/CodeEntropy/entropy/nodes/vibrational.py index 6191e14b..40cabced 100644 --- a/CodeEntropy/entropy/nodes/vibrational.py +++ b/CodeEntropy/entropy/nodes/vibrational.py @@ -3,8 +3,9 @@ from __future__ import annotations import logging +from collections.abc import Mapping, MutableMapping from dataclasses import dataclass -from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple +from typing import Any import numpy as np @@ -16,7 +17,7 @@ GroupId = int ResidueId = int -CovKey = Tuple[GroupId, ResidueId] +CovKey = tuple[GroupId, ResidueId] @dataclass(frozen=True) @@ -52,7 +53,7 @@ def __init__(self) -> None: self._mat_ops = MatrixUtils() self._zero_atol = 1e-8 - def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> Dict[str, Any]: + def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> dict[str, Any]: """Run vibrational entropy calculations and update the shared data mapping. Args: @@ -86,7 +87,7 @@ def run(self, shared_data: MutableMapping[str, Any], **_: Any) -> Dict[str, Any] ua_frame_counts = self._get_ua_frame_counts(shared_data) reporter = shared_data.get("reporter") - results: Dict[int, Dict[str, Dict[str, float]]] = {} + results: dict[int, dict[str, dict[str, float]]] = {} for group_id, mol_ids in groups.items(): results[group_id] = {} @@ -170,7 +171,7 @@ def _build_entropy_engine( run_manager=shared_data["run_manager"], ) - def _get_group_id_to_index(self, shared_data: Mapping[str, Any]) -> Dict[int, int]: + def _get_group_id_to_index(self, shared_data: Mapping[str, Any]) -> dict[int, int]: """Return a mapping from group_id to contiguous index used by covariance lists. If a precomputed mapping is provided under "group_id_to_index", it is used. @@ -189,7 +190,7 @@ def _get_group_id_to_index(self, shared_data: Mapping[str, Any]) -> Dict[int, in groups = shared_data["groups"] return {gid: i for i, gid in enumerate(groups.keys())} - def _get_ua_frame_counts(self, shared_data: Mapping[str, Any]) -> Dict[CovKey, int]: + def _get_ua_frame_counts(self, shared_data: Mapping[str, Any]) -> dict[CovKey, int]: """Extract per-(group,residue) frame counts for united-atom covariances. Args: @@ -217,7 +218,7 @@ def _compute_united_atom_entropy( force_ua: Mapping[CovKey, Any], torque_ua: Mapping[CovKey, Any], ua_frame_counts: Mapping[CovKey, int], - reporter: Optional[Any], + reporter: Any | None, n_frames_default: int, highest: bool, ) -> EntropyPair: @@ -368,7 +369,7 @@ def _compute_ft_entropy( @staticmethod def _store_results( - results: Dict[int, Dict[str, Dict[str, float]]], + results: dict[int, dict[str, dict[str, float]]], group_id: int, level: str, pair: EntropyPair, @@ -385,7 +386,7 @@ def _store_results( @staticmethod def _log_molecule_level_results( - reporter: Optional[Any], + reporter: Any | None, group_id: int, level: str, pair: EntropyPair, diff --git a/CodeEntropy/entropy/orientational.py b/CodeEntropy/entropy/orientational.py index bd4786e4..11f4e6c6 100644 --- a/CodeEntropy/entropy/orientational.py +++ b/CodeEntropy/entropy/orientational.py @@ -1,11 +1,7 @@ """Orientational entropy calculations. This module defines `OrientationalEntropy`, which computes orientational entropy -from a neighbor-count mapping. - -The current implementation supports non-water neighbors. Water-specific behavior -can be implemented later behind an interface so the core calculation remains -stable and testable. +from a neighbor count and symmetry information. """ from __future__ import annotations @@ -13,7 +9,6 @@ import logging import math from dataclasses import dataclass -from typing import Any, Mapping import numpy as np @@ -48,111 +43,75 @@ class OrientationalEntropy: def __init__( self, - run_manager: Any, - args: Any, - universe: Any, - reporter: Any, - group_molecules: Any, gas_constant: float = _GAS_CONST_J_PER_MOL_K, ) -> None: """Initialize the orientational entropy calculator. Args: - run_manager: Run manager (currently unused by this class). - args: User arguments (currently unused by this class). - universe: MDAnalysis Universe (currently unused by this class). - reporter: Data logger (currently unused by this class). - group_molecules: Grouping helper (currently unused by this class). gas_constant: Gas constant in J/(mol*K). """ - self._run_manager = run_manager - self._args = args - self._universe = universe - self._reporter = reporter - self._group_molecules = group_molecules self._gas_constant = float(gas_constant) - def calculate(self, neighbours: Mapping[str, int]) -> OrientationalEntropyResult: + def calculate( + self, + neighbor_count: float, + symmetry_number: int, + linear: bool, + ) -> OrientationalEntropyResult: """Calculate orientational entropy from neighbor counts. - For each neighbor species (except water), the number of orientations is - estimated as: - - Ω = sqrt(Nc^3 * π) + The number of orientations is estimated as: + Ω = sqrt(N_av^3 * π)/symmetry_number for non-linear molecules + Ω = N_av / symmetry_number for linear molecules and the entropy contribution is: S = R * ln(Ω) - where Nc is the neighbor count and R is the gas constant. + where N_av is the average number of neighbors and R is the gas constant. Args: - neighbours: Mapping of neighbor species name to count. + neighbors: average number of neighbors + symmetry_number: symmetry number of molecule of interest + linear: True if molecule of interest is linear Returns: OrientationalEntropyResult containing the total entropy in J/mol/K. - """ - total = 0.0 - for species, count in neighbours.items(): - if self._is_water(species): - logger.debug( - "Skipping water species %s in orientational entropy.", species - ) - continue - - contribution = self._entropy_contribution(count) - logger.debug( - "Orientational entropy contribution for %s: %s", species, contribution - ) - total += contribution - - logger.debug("Final orientational entropy total: %s", total) - return OrientationalEntropyResult(total=float(total)) - - @staticmethod - def _is_water(species: str) -> bool: - """Return True if the species should be treated as water. - - Args: - species: Species identifier. - - Returns: - True if the species is considered water. - """ - return species in {"H2O", "WAT", "HOH"} - - def _entropy_contribution(self, neighbour_count: int) -> float: - """Compute the entropy contribution for a single neighbor count. - - Args: - neighbour_count: Number of neighbors (Nc). - - Returns: - Entropy contribution in J/mol/K. Raises: - ValueError: If neighbour_count is negative. + ValueError if number of neighbors is negative. """ - if neighbour_count < 0: - raise ValueError(f"neighbour_count must be >= 0, got {neighbour_count}") + if neighbor_count < 0: + raise ValueError(f"neighbor_count must be >= 0, got {neighbor_count}") - if neighbour_count == 0: - return 0.0 + omega = self._omega(neighbor_count, symmetry_number, linear) - omega = self._omega(neighbour_count) - if omega <= 0.0: - return 0.0 + total = self._gas_constant * math.log(omega) + logger.debug(f"Orientational entropy total: {total}") - return self._gas_constant * math.log(omega) + return total - @staticmethod - def _omega(neighbour_count: int) -> float: + def _omega(self, neighbor_count: int, symmetry: int, linear: bool) -> float: """Compute the number of orientations Ω. Args: - neighbour_count: Number of neighbors (Nc). + neighbor_count: average number of neighbors. + symmetry_number: The symmetry number of the molecule. + linear: Is the molecule linear (True or False). Returns: Ω (unitless). """ - return float(np.sqrt((neighbour_count**3) * math.pi)) + # symmetry number 0 = spherically symmetric = no orientational entropy + if symmetry == 0: + omega = 1 + else: + if linear: + omega = neighbor_count / symmetry + else: + omega = np.sqrt((neighbor_count**3) * math.pi) / symmetry + + # avoid negative orientational entropy + omega = max(omega, 1) + + return omega diff --git a/CodeEntropy/entropy/vibrational.py b/CodeEntropy/entropy/vibrational.py index f1baf132..17779bc7 100644 --- a/CodeEntropy/entropy/vibrational.py +++ b/CodeEntropy/entropy/vibrational.py @@ -14,7 +14,7 @@ import logging from dataclasses import dataclass -from typing import Any, Literal, Tuple +from typing import Any, Literal import numpy as np from numpy import linalg as la @@ -207,7 +207,7 @@ def _entropy_components_from_frequencies( return components * self._gas_const @staticmethod - def _split_halves(components: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + def _split_halves(components: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Split a component array into two equal halves. Args: diff --git a/CodeEntropy/entropy/water.py b/CodeEntropy/entropy/water.py index 5be7d13c..23ab2202 100644 --- a/CodeEntropy/entropy/water.py +++ b/CodeEntropy/entropy/water.py @@ -7,8 +7,9 @@ from __future__ import annotations import logging +from collections.abc import Callable, Mapping from dataclasses import dataclass -from typing import Any, Callable, Mapping, Optional, Tuple +from typing import Any import numpy as np import waterEntropy.recipes.interfacial_solvent as GetSolvent @@ -34,7 +35,7 @@ class WaterEntropyInput: end: int step: int temperature: float - group_id: Optional[int] = None + group_id: int | None = None class WaterEntropy: @@ -52,7 +53,7 @@ def __init__( self, args: Any, reporter: Any, - solver: Callable[..., Tuple[dict, Any, Any, Any, Any]] = ( + solver: Callable[..., tuple[dict, Any, Any, Any, Any]] = ( GetSolvent.get_interfacial_water_orient_entropy ), ) -> None: @@ -76,7 +77,7 @@ def calculate_and_log( start: int, end: int, step: int, - group_id: Optional[int] = None, + group_id: int | None = None, ) -> None: """Compute water entropy and write results to the data logger. @@ -133,7 +134,7 @@ def _run_solver(self, inputs: WaterEntropyInput): ) def _log_orientational_entropy( - self, Sorient_dict: Mapping[Any, Mapping[str, Any]], group_id: Optional[int] + self, Sorient_dict: Mapping[Any, Mapping[str, Any]], group_id: int | None ) -> None: """Log orientational entropy entries. @@ -150,7 +151,7 @@ def _log_orientational_entropy( ) def _log_translational_entropy( - self, vibrations: Any, covariances: Any, group_id: Optional[int] + self, vibrations: Any, covariances: Any, group_id: int | None ) -> None: """Log translational vibrational entropy entries. @@ -175,7 +176,7 @@ def _log_translational_entropy( ) def _log_rotational_entropy( - self, vibrations: Any, covariances: Any, group_id: Optional[int] + self, vibrations: Any, covariances: Any, group_id: int | None ) -> None: """Log rotational vibrational entropy entries. @@ -203,7 +204,7 @@ def _log_group_label( self, universe: Any, Sorient_dict: Mapping[Any, Mapping[str, Any]], - group_id: Optional[int], + group_id: int | None, ) -> None: """Log a group label summarizing the water entries. diff --git a/CodeEntropy/entropy/workflow.py b/CodeEntropy/entropy/workflow.py index 56e2326d..fe50e516 100644 --- a/CodeEntropy/entropy/workflow.py +++ b/CodeEntropy/entropy/workflow.py @@ -18,8 +18,9 @@ import logging import math from collections import defaultdict +from collections.abc import Mapping from dataclasses import dataclass -from typing import Any, Dict, Mapping, Tuple +from typing import Any import pandas as pd @@ -32,7 +33,7 @@ logger = logging.getLogger(__name__) console = LoggingConfig.get_console() -SharedData = Dict[str, Any] +SharedData = dict[str, Any] @dataclass(frozen=True) @@ -207,7 +208,7 @@ def _build_trajectory_slice(self) -> TrajectorySlice: n_frames = self._get_number_frames(start, end, step) return TrajectorySlice(start=start, end=end, step=step, n_frames=n_frames) - def _get_trajectory_bounds(self) -> Tuple[int, int, int]: + def _get_trajectory_bounds(self) -> tuple[int, int, int]: """Return start, end, and step frame indices from args. Returns: @@ -265,7 +266,7 @@ def _split_water_groups( self, universe: Any, groups: Mapping[int, Any], - ) -> Tuple[Dict[int, Any], Dict[int, Any]]: + ) -> tuple[dict[int, Any], dict[int, Any]]: """Partition molecule groups into water and non-water groups. This method identifies which molecule groups correspond to water @@ -333,8 +334,8 @@ def _compute_water_entropy( else "not water" ) - logger.debug("WaterEntropy: molecule_data=%s", self._reporter.molecule_data) - logger.debug("WaterEntropy: residue_data=%s", self._reporter.residue_data) + logger.debug(f"WaterEntropy: molecule_data= {self._reporter.molecule_data}") + logger.debug(f"WaterEntropy: residue_data= {self._reporter.residue_data}") def _finalize_molecule_results(self) -> None: """Aggregate group totals and persist results to JSON. diff --git a/CodeEntropy/levels/axes.py b/CodeEntropy/levels/axes.py index 2d05fcb3..57ef8912 100644 --- a/CodeEntropy/levels/axes.py +++ b/CodeEntropy/levels/axes.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging -from typing import Sequence, Tuple +from collections.abc import Sequence import numpy as np from MDAnalysis.lib.mdamath import make_whole @@ -67,7 +67,7 @@ def get_residue_axes(self, data_container, index: int, residue=None): The translational and rotational axes at the residue level. - Identify the residue (either provided or selected by `resindex index`). - - Determine whether the residue is bonded to neighbouring residues + - Determine whether the residue is bonded to neighboring residues (previous/next in sequence) using MDAnalysis bonded selections. - If there are *no* bonds to other residues: * Use a custom principal axes, from a moment-of-inertia (MOI) tensor @@ -209,10 +209,10 @@ def get_UA_axes(self, data_container, index: int): if rot_axes is None or moment_of_inertia is None: raise ValueError("Unable to compute bonded axes for UA bead.") - logger.debug("Translational Axes: %s", trans_axes) - logger.debug("Rotational Axes: %s", rot_axes) - logger.debug("Center: %s", center) - logger.debug("Moment of Inertia: %s", moment_of_inertia) + logger.debug(f"Translational Axes: {trans_axes}") + logger.debug(f"Rotational Axes: {rot_axes}") + logger.debug(f"Center: {center}") + logger.debug(f"Moment of Inertia: {moment_of_inertia}") return trans_axes, rot_axes, center, moment_of_inertia @@ -576,7 +576,7 @@ def get_moment_of_inertia_tensor( def get_custom_principal_axes( self, moment_of_inertia_tensor: np.ndarray - ) -> Tuple[np.ndarray, np.ndarray]: + ) -> tuple[np.ndarray, np.ndarray]: """Compute principal axes and moments from a custom MOI tensor. Principal axes and centre of axes from the ordered eigenvalues and diff --git a/CodeEntropy/levels/dihedrals.py b/CodeEntropy/levels/dihedrals.py index ccdad688..9a8fc5aa 100644 --- a/CodeEntropy/levels/dihedrals.py +++ b/CodeEntropy/levels/dihedrals.py @@ -6,14 +6,13 @@ """ import logging -from typing import Dict, List, Tuple import numpy as np from MDAnalysis.analysis.dihedrals import Dihedral logger = logging.getLogger(__name__) -UAKey = Tuple[int, int] +UAKey = tuple[int, int] class ConformationStateBuilder: @@ -79,8 +78,8 @@ def build_conformational_states( helpers as implemented in this module. """ number_groups = len(groups) - states_ua: Dict[UAKey, List[str]] = {} - states_res: List[List[str]] = [None] * number_groups + states_ua: dict[UAKey, list[str]] = {} + states_res: list[list[str]] = [None] * number_groups task = None if progress is not None: @@ -163,8 +162,8 @@ def _collect_dihedrals_for_group(self, mol, level_list): dihedrals_res: List of residue-level dihedral AtomGroups. """ num_residues = len(mol.residues) - dihedrals_ua: List[List] = [[] for _ in range(num_residues)] - dihedrals_res: List = [] + dihedrals_ua: list[list] = [[] for _ in range(num_residues)] + dihedrals_res: list = [] for level in level_list: if level == "united_atom": @@ -230,7 +229,7 @@ def _get_dihedrals(self, data_container, level: str): ) atom_groups.append(atom1 + atom2 + atom3 + atom4) - logger.debug("Level: %s, Dihedrals: %s", level, atom_groups) + logger.debug(f"Level: {level}, Dihedrals: {atom_groups}") return atom_groups def _collect_peaks_for_group( @@ -345,13 +344,25 @@ def _identify_peaks( peaks = self._find_histogram_peaks(popul=popul, bin_value=bin_value) peak_values.append(peaks) - logger.debug("Dihedral: %s, Peak Values: %s", dihedral_index, peak_values) + logger.debug(f"Dihedral: {dihedral_index}, Peak Values: {peak_values}") return peak_values @staticmethod def _find_histogram_peaks(popul, bin_value): - """Return convex turning-point peaks from a histogram.""" + """Return convex turning-point peaks from a histogram. + + The selection of the population of the right adjacent bin takes into + account that the dihedral angles are circular. + + Args: + popul: the array of counts for each bin + bin_value: the array of dihedral angle value at the center of each + bin. + + Returns: + peaks: list of values associated with peaks. + """ number_bins = len(popul) peaks = [] @@ -359,18 +370,11 @@ def _find_histogram_peaks(popul, bin_value): if popul[bin_index] == 0: continue - if bin_index == number_bins - 1: - if ( - popul[bin_index] >= popul[bin_index - 1] - and popul[bin_index] >= popul[0] - ): - peaks.append(bin_value[bin_index]) - else: - if ( - popul[bin_index] >= popul[bin_index - 1] - and popul[bin_index] >= popul[bin_index + 1] - ): - peaks.append(bin_value[bin_index]) + left = popul[bin_index - 1] + right = popul[0] if bin_index == number_bins - 1 else popul[bin_index + 1] + + if popul[bin_index] >= left and popul[bin_index] > right: + peaks.append(bin_value[bin_index]) return peaks @@ -488,5 +492,5 @@ def _assign_states( else: states.extend(mol_states) - logger.debug("States: %s", states) + logger.debug(f"States: {states}") return states diff --git a/CodeEntropy/levels/forces.py b/CodeEntropy/levels/forces.py index 3b382b0c..ea57079b 100644 --- a/CodeEntropy/levels/forces.py +++ b/CodeEntropy/levels/forces.py @@ -8,8 +8,9 @@ from __future__ import annotations import logging +from collections.abc import Sequence from dataclasses import dataclass -from typing import Any, Optional, Sequence, Tuple +from typing import Any import numpy as np @@ -39,8 +40,8 @@ class TorqueInputs: center: Vector3 force_partitioning: float moment_of_inertia: Vector3 - axes_manager: Optional[Any] = None - box: Optional[np.ndarray] = None + axes_manager: Any | None = None + box: np.ndarray | None = None class ForceTorqueCalculator: @@ -90,8 +91,8 @@ def get_weighted_torques( center: Vector3, force_partitioning: float, moment_of_inertia: Vector3, - axes_manager: Optional[Any], - box: Optional[np.ndarray], + axes_manager: Any | None, + box: np.ndarray | None, ) -> Vector3: """Compute a moment-weighted generalized torque. @@ -124,7 +125,7 @@ def compute_frame_covariance( self, force_vecs: Sequence[Vector3], torque_vecs: Sequence[Vector3], - ) -> Tuple[Matrix, Matrix]: + ) -> tuple[Matrix, Matrix]: """Compute per-frame second-moment matrices for force/torque vectors. Note: @@ -240,7 +241,7 @@ def _compute_frame_second_moments( self, force_vectors: Sequence[Vector3], torque_vectors: Sequence[Vector3], - ) -> Tuple[Matrix, Matrix]: + ) -> tuple[Matrix, Matrix]: """Build outer-product second-moment matrices for a single frame. Args: @@ -260,8 +261,8 @@ def _displacements_relative_to_center( *, center: Vector3, positions: np.ndarray, - axes_manager: Optional[Any], - box: Optional[np.ndarray], + axes_manager: Any | None, + box: np.ndarray | None, ) -> np.ndarray: """Compute displacement vectors from center to positions. diff --git a/CodeEntropy/levels/frame_dag.py b/CodeEntropy/levels/frame_dag.py index dd27a3b2..f70f672f 100644 --- a/CodeEntropy/levels/frame_dag.py +++ b/CodeEntropy/levels/frame_dag.py @@ -10,7 +10,7 @@ import logging from dataclasses import dataclass -from typing import Any, Dict, Optional +from typing import Any import networkx as nx @@ -30,10 +30,10 @@ class FrameContext: data: Additional frame-local scratch space for nodes, if needed. """ - shared: Dict[str, Any] + shared: dict[str, Any] frame_index: int frame_covariance: Any = None - data: Dict[str, Any] = None + data: dict[str, Any] = None class FrameGraph: @@ -46,7 +46,7 @@ class FrameGraph: - "frame_covariance" """ - def __init__(self, universe_operations: Optional[Any] = None) -> None: + def __init__(self, universe_operations: Any | None = None) -> None: """Initialise a FrameGraph. Args: @@ -55,9 +55,9 @@ def __init__(self, universe_operations: Optional[Any] = None) -> None: """ self._universe_operations = universe_operations self._graph = nx.DiGraph() - self._nodes: Dict[str, Any] = {} + self._nodes: dict[str, Any] = {} - def build(self) -> "FrameGraph": + def build(self) -> FrameGraph: """Build the default frame DAG topology. Returns: @@ -66,7 +66,7 @@ def build(self) -> "FrameGraph": self._add("frame_covariance", FrameCovarianceNode()) return self - def execute_frame(self, shared_data: Dict[str, Any], frame_index: int) -> Any: + def execute_frame(self, shared_data: dict[str, Any], frame_index: int) -> Any: """Execute the frame DAG for a single trajectory frame. Args: @@ -79,12 +79,12 @@ def execute_frame(self, shared_data: Dict[str, Any], frame_index: int) -> Any: ctx = self._make_frame_ctx(shared_data=shared_data, frame_index=frame_index) for node_name in nx.topological_sort(self._graph): - logger.debug("[FrameGraph] running %s @ frame=%s", node_name, frame_index) + logger.debug(f"[FrameGraph] running {node_name} @ frame={frame_index}") self._nodes[node_name].run(ctx) return ctx["frame_covariance"] - def _add(self, name: str, node: Any, deps: Optional[list[str]] = None) -> None: + def _add(self, name: str, node: Any, deps: list[str] | None = None) -> None: """Register a node and its dependencies in the DAG.""" self._nodes[name] = node self._graph.add_node(name) @@ -93,8 +93,8 @@ def _add(self, name: str, node: Any, deps: Optional[list[str]] = None) -> None: @staticmethod def _make_frame_ctx( - shared_data: Dict[str, Any], frame_index: int - ) -> Dict[str, Any]: + shared_data: dict[str, Any], frame_index: int + ) -> dict[str, Any]: """Create a frame context dictionary for node execution. Notes: diff --git a/CodeEntropy/levels/hierarchy.py b/CodeEntropy/levels/hierarchy.py index 863443fd..1400a370 100644 --- a/CodeEntropy/levels/hierarchy.py +++ b/CodeEntropy/levels/hierarchy.py @@ -16,7 +16,6 @@ from __future__ import annotations import logging -from typing import List, Tuple logger = logging.getLogger(__name__) @@ -33,7 +32,7 @@ class HierarchyBuilder: provides structural information (levels and beads). """ - def select_levels(self, data_container) -> Tuple[int, List[List[str]]]: + def select_levels(self, data_container) -> tuple[int, list[list[str]]]: """Select applicable hierarchy levels for each molecule in the container. A molecule is always assigned the `united_atom` level. @@ -53,10 +52,10 @@ def select_levels(self, data_container) -> Tuple[int, List[List[str]]]: (strings) for that molecule in increasing coarseness. """ number_molecules = len(data_container.atoms.fragments) - logger.debug("The number of molecules is %d.", number_molecules) + logger.debug(f"The number of molecules is {number_molecules}.") fragments = data_container.atoms.fragments - levels: List[List[str]] = [[] for _ in range(number_molecules)] + levels: list[list[str]] = [[] for _ in range(number_molecules)] for mol_id in range(number_molecules): levels[mol_id].append("united_atom") @@ -69,10 +68,10 @@ def select_levels(self, data_container) -> Tuple[int, List[List[str]]]: if number_residues > 1: levels[mol_id].append("polymer") - logger.debug("Selected levels: %s", levels) + logger.debug(f"Selected levels: {levels}") return number_molecules, levels - def get_beads(self, data_container, level: str) -> List: + def get_beads(self, data_container, level: str) -> list: """Build beads for a given container at a given hierarchy level. Args: @@ -97,7 +96,7 @@ def get_beads(self, data_container, level: str) -> List: raise ValueError(f"Unknown level: {level}") - def _build_residue_beads(self, data_container) -> List: + def _build_residue_beads(self, data_container) -> list: """Build one bead per residue using the container's residues. Args: @@ -110,7 +109,7 @@ def _build_residue_beads(self, data_container) -> List: logger.debug("Residue beads sizes: %s", [len(b) for b in beads]) return beads - def _build_united_atom_beads(self, data_container) -> List: + def _build_united_atom_beads(self, data_container) -> list: """Build united-atom beads from heavy atoms and their bonded hydrogens. For each heavy atom, a bead is defined as: diff --git a/CodeEntropy/levels/level_dag.py b/CodeEntropy/levels/level_dag.py index 39b4a1da..0be5fb88 100644 --- a/CodeEntropy/levels/level_dag.py +++ b/CodeEntropy/levels/level_dag.py @@ -17,7 +17,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, Optional +from typing import Any import networkx as nx @@ -28,6 +28,7 @@ from CodeEntropy.levels.nodes.conformations import ComputeConformationalStatesNode from CodeEntropy.levels.nodes.detect_levels import DetectLevelsNode from CodeEntropy.levels.nodes.detect_molecules import DetectMoleculesNode +from CodeEntropy.levels.nodes.find_neighbors import ComputeNeighborsNode logger = logging.getLogger(__name__) @@ -44,7 +45,7 @@ class LevelDAG: molecules within a group when frame nodes average within-frame first). """ - def __init__(self, universe_operations: Optional[Any] = None) -> None: + def __init__(self, universe_operations: Any | None = None) -> None: """Initialise a LevelDAG. Args: @@ -54,11 +55,11 @@ def __init__(self, universe_operations: Optional[Any] = None) -> None: self._universe_operations = universe_operations self._static_graph = nx.DiGraph() - self._static_nodes: Dict[str, Any] = {} + self._static_nodes: dict[str, Any] = {} self._frame_dag = FrameGraph(universe_operations=universe_operations) - def build(self) -> "LevelDAG": + def build(self) -> LevelDAG: """Build the static and frame DAG topology. This registers all static nodes and their dependencies, and builds the @@ -81,13 +82,16 @@ def build(self) -> "LevelDAG": ComputeConformationalStatesNode(self._universe_operations), deps=["detect_levels"], ) + self._add_static( + "find_neighbors", ComputeNeighborsNode(), deps=["detect_levels"] + ) self._frame_dag.build() return self def execute( - self, shared_data: Dict[str, Any], *, progress: object | None = None - ) -> Dict[str, Any]: + self, shared_data: dict[str, Any], *, progress: object | None = None + ) -> dict[str, Any]: """Execute the full hierarchy workflow and mutate shared_data. This method ensures required shared components exist, runs the static stage @@ -109,7 +113,7 @@ def execute( return shared_data def _run_static_stage( - self, shared_data: Dict[str, Any], *, progress: object | None = None + self, shared_data: dict[str, Any], *, progress: object | None = None ) -> None: """Run all static nodes in dependency order. @@ -130,9 +134,7 @@ def _run_static_stage( pass node.run(shared_data) - def _add_static( - self, name: str, node: Any, deps: Optional[list[str]] = None - ) -> None: + def _add_static(self, name: str, node: Any, deps: list[str] | None = None) -> None: """Register a static node and its dependencies in the static DAG. Args: @@ -149,7 +151,7 @@ def _add_static( self._static_graph.add_edge(dep, name) def _run_frame_stage( - self, shared_data: Dict[str, Any], *, progress: object | None = None + self, shared_data: dict[str, Any], *, progress: object | None = None ) -> None: """Execute the per-frame DAG stage and reduce frame outputs. @@ -233,7 +235,7 @@ def _incremental_mean(old: Any, new: Any, n: int) -> Any: return old + (new - old) / float(n) def _reduce_one_frame( - self, shared_data: Dict[str, Any], frame_out: Dict[str, Any] + self, shared_data: dict[str, Any], frame_out: dict[str, Any] ) -> None: """Reduce one frame's covariance outputs into shared running means. @@ -245,7 +247,7 @@ def _reduce_one_frame( self._reduce_forcetorque(shared_data, frame_out) def _reduce_force_and_torque( - self, shared_data: Dict[str, Any], frame_out: Dict[str, Any] + self, shared_data: dict[str, Any], frame_out: dict[str, Any] ) -> None: """Reduce force/torque covariance outputs into shared accumulators. @@ -305,7 +307,7 @@ def _reduce_force_and_torque( t_cov["poly"][gi] = self._incremental_mean(t_cov["poly"][gi], T, n) def _reduce_forcetorque( - self, shared_data: Dict[str, Any], frame_out: Dict[str, Any] + self, shared_data: dict[str, Any], frame_out: dict[str, Any] ) -> None: """Reduce combined force-torque covariance outputs into shared accumulators. diff --git a/CodeEntropy/levels/linalg.py b/CodeEntropy/levels/linalg.py index b86defec..4411d2f9 100644 --- a/CodeEntropy/levels/linalg.py +++ b/CodeEntropy/levels/linalg.py @@ -48,7 +48,7 @@ def create_submatrix(self, data_i: np.ndarray, data_j: np.ndarray) -> np.ndarray ) submatrix = np.outer(v_i, v_j) - logger.debug("Submatrix: %s", submatrix) + logger.debug(f"Submatrix: {submatrix}") return submatrix def filter_zero_rows_columns( @@ -86,12 +86,11 @@ def filter_zero_rows_columns( final_shape = mat.shape if init_shape != final_shape: logger.debug( - "Matrix shape changed %s -> %s after removing zero rows/cols.", - init_shape, - final_shape, + f"Matrix shape changed {init_shape}" + f"-> {final_shape} after removing zero rows/cols." ) - logger.debug("Filtered matrix: %s", mat) + logger.debug(f"Filtered matrix: {mat}") return mat @staticmethod diff --git a/CodeEntropy/levels/mda.py b/CodeEntropy/levels/mda.py index 308157bd..f8cdec40 100644 --- a/CodeEntropy/levels/mda.py +++ b/CodeEntropy/levels/mda.py @@ -9,7 +9,6 @@ from __future__ import annotations import logging -from typing import Optional import MDAnalysis as mda from MDAnalysis.analysis.base import AnalysisFromFunction @@ -35,8 +34,8 @@ def __init__(self) -> None: def select_frames( self, u: mda.Universe, - start: Optional[int] = None, - end: Optional[int] = None, + start: int | None = None, + end: int | None = None, step: int = 1, ) -> mda.Universe: """Create a reduced universe by dropping frames according to user selection. @@ -75,7 +74,7 @@ def select_frames( dimensions=dimensions, ) - logger.debug("MDAnalysis.Universe - reduced universe (frame-selected): %s", u2) + logger.debug(f"MDAnalysis.Universe - reduced universe (frame-selected): {u2}") return u2 def select_atoms(self, u: mda.Universe, select_string: str = "all") -> mda.Universe: @@ -103,7 +102,7 @@ def select_atoms(self, u: mda.Universe, select_string: str = "all") -> mda.Unive dimensions=dimensions, ) - logger.debug("MDAnalysis.Universe - reduced universe (atom-selected): %s", u2) + logger.debug(f"MDAnalysis.Universe - reduced universe (atom-selected): {u2}") return u2 def extract_fragment( @@ -127,10 +126,10 @@ def merge_forces( tprfile: str, trrfile, forcefile: str, - fileformat: Optional[str] = None, + fileformat: str | None = None, kcal: bool = False, *, - force_format: Optional[str] = None, + force_format: str | None = None, fallback_to_positions_if_no_forces: bool = True, ) -> mda.Universe: """Create a universe by merging coordinates and forces from different files. @@ -168,11 +167,11 @@ def merge_forces( dimensions loaded into memory. """ - logger.debug("Loading coordinate Universe with %s", trrfile) + logger.debug(f"Loading coordinate Universe with {trrfile}") u = mda.Universe(tprfile, trrfile, format=fileformat) ff = force_format if force_format is not None else fileformat - logger.debug("Loading force Universe with %s", forcefile) + logger.debug(f"Loading force Universe with {forcefile}") u_force = mda.Universe(tprfile, forcefile, format=ff) select_atom = u.select_atoms("all") diff --git a/CodeEntropy/levels/neighbors.py b/CodeEntropy/levels/neighbors.py new file mode 100644 index 00000000..32636fec --- /dev/null +++ b/CodeEntropy/levels/neighbors.py @@ -0,0 +1,251 @@ +"""Neighbours info for orientational entropy. + +This module finds the average number of neighbors, symmetry numbers, and +and linearity for each group. +These are used downstream to compute the orientational entropy. +""" + +import logging + +import numpy as np +from rdkit import Chem + +from CodeEntropy.levels.search import Search + +logger = logging.getLogger(__name__) + + +class Neighbors: + """ + Class to find the neighbors and any related information needed for the + calculation of orientational entropy. + """ + + def __init__(self): + """ + Initializes the Neighbors class with placeholders for data, + including the system trajectory, groups, and levels. + """ + + self._universe = None + self._groups = None + self._levels = None + self._search = Search() + + def get_neighbors(self, universe, levels, groups, search_type): + """ + Find the neighbors relative to the central molecule. + + The search defaults to using RAD, but an MDAnalysis method based + on grid searches is also available. + The average number of neighbors is calculated. + + Args: + universe: MDAnalysis universe object for the system + groups: list of groups for averaging + levels: list of levels for each molecule + search_type: str, how to find neigbours + + Returns: + average_number_neighbors (dict): average number of neighbors + at each frame for each group + """ + + number_neighbors = {} + average_number_neighbors = {} + + number_frames = len(universe.trajectory) + + for group_id in groups.keys(): + molecules = groups[group_id] + highest_level = levels[molecules[0]][-1] + + for mol_id in molecules: + for timestep in range(number_frames): + # This is to get MDAnalysis to get the information from the + # correct frame of the trajectory + universe.trajectory[timestep] + + if search_type == "RAD": + # Use the relative angular distance method to find neighbors + neighbors = self._search.get_RAD_neighbors( + universe=universe, mol_id=mol_id + ) + + elif search_type == "grid": + # Use MDAnalysis neighbor search. + neighbors = self._search.get_grid_neighbors( + universe=universe, + mol_id=mol_id, + highest_level=highest_level, + ) + else: + # Raise error for unavailale search_type + raise ValueError(f"unknown search_type {search_type}") + + if group_id in number_neighbors: + number_neighbors[group_id].append(len(neighbors)) + else: + number_neighbors[group_id] = [len(neighbors)] + + # Get the average number of neighbors: + # dividing the sum by the number of molecules and number of frames + number = np.sum(number_neighbors[group_id]) + average_number_neighbors[group_id] = number / ( + len(molecules) * number_frames + ) + logger.debug( + "group: {group_id}number neighbors {average_number_neighbors[group_id]}" + ) + + return average_number_neighbors + + def get_symmetry(self, universe, groups): + """ + Calculate symmetry number for the molecule. + + This function converts the MDAnalysis instance of a molecule into + an RDKit object and then calculates the symmetry number and + determines if the molecule is linear. + + Args: + universe: MDAnalysis object + mol_id: index of the molecule of interest + + Returns: + symmetry_number: dict, symmetry number of each group + linear: dict, linear for each group + """ + symmetry_number = {} + linear = {} + + for group_id in groups.keys(): + molecules = groups[group_id] + + rdkit_mol, number_heavy, number_hydrogen = self._get_rdkit_mol( + universe, molecules[0] + ) + + symmetry_number[group_id] = self._get_symmetry_number( + rdkit_mol, number_heavy, number_hydrogen + ) + + linear[group_id] = self._get_linear(rdkit_mol, number_heavy) + + logger.debug( + f"group: {group_id}, symmetry: {symmetry_number}, linear: {linear}" + ) + + return symmetry_number, linear + + def _get_rdkit_mol(self, universe, mol_id): + """ + Convert molecule to rdkit object. + + MDAnalysis convert_to(RDKIT) needs elements. + We are removing dummy atoms and converting to rkdit format. + If there are dummy atoms you need inferrer=None otherwise you + get errors from it getting the valence wrong. + If possible it is better to use the inferrer to get the bonds + and hybridization correct. + The convert_to argument force=True forces it to continue even when + it cannot find hydrogens, this is needed to avoid errors for molecules + like carbon dioxide which do not have hydrogens. + + Args: + universe: MDAnalysis object + mol_id: index of the molecule of interest + + Returns: + rdkit_mol + number_heavy + number_hydrogen + """ + + if not hasattr(universe.atoms, "elements"): + universe.guess_TopologyAttrs(to_guess=["elements"]) + + molecule = universe.atoms.fragments[mol_id] + + dummy = molecule.select_atoms("prop mass < 0.1") + if len(dummy) > 0: + frag = molecule.select_atoms("prop mass > 0.1") + rdkit_mol = frag.convert_to("RDKIT", force=True, inferrer=None) + logger.debug("Warning: Dummy atoms found") + else: + rdkit_mol = molecule.convert_to("RDKIT", force=True) + + number_heavy = rdkit_mol.GetNumHeavyAtoms() + number_hydrogen = rdkit_mol.GetNumAtoms() - number_heavy + + return rdkit_mol, number_heavy, number_hydrogen + + def _get_symmetry_number(self, rdkit_mol, number_heavy, number_hydrogen): + """ + Calculate symmetry number for the molecule. + + For larger molecules, removing the hydrogens reduces the over counting + of symmetry states. When there is only one heavy atom the hydrogens + are important to the symmetry. If there is a single heavy atom with + no hydrogens then the molecule is spherically symmetric. + + Using the RDKit GetSubstructMatches function often works well as + a guess for the symmetry number, but it occasionally returns a + symmetry number 2x the expected value (for example, cyclohexane). + + Args: + rdkit_mol: rdkit object for molecule of interest + number_heavy (int): number of heavy atoms + number_hydrogen (int): number of hydrogen atoms + + Returns: + symmetry_number (int): symmetry number of molecule + """ + + if number_heavy > 1: + rdkit_heavy = Chem.RemoveHs(rdkit_mol) + matches = rdkit_mol.GetSubstructMatches( + rdkit_heavy, uniquify=False, useChirality=True + ) + symmetry_number = len(matches) + elif number_hydrogen > 0: + matches = rdkit_mol.GetSubstructMatches( + rdkit_mol, uniquify=False, useChirality=True + ) + symmetry_number = len(matches) + else: + symmetry_number = 0 + + return symmetry_number + + def _get_linear(self, rdkit_mol, number_heavy): + """ + Determine if the molecule is linear. + + We are not considering the hydrogens, just the united atom beads. + So, molecules like methanol are treated as linear since they have only + two united atoms. + + Args: + rkdit_mol: rdkit object for molecule of interest + number_heavy (int): number of heavy atoms + + Returns: + linear (bool): True if molecule linear + """ + rdkit_heavy = Chem.RemoveHs(rdkit_mol) + + linear = False + if number_heavy == 1: + linear = False + elif number_heavy == 2: + linear = True + else: + sp_count = 0 + for x in rdkit_heavy.GetAtoms(): + if x.GetHybridization() == Chem.HybridizationType.SP: + sp_count += 1 + if sp_count >= (number_heavy - 2): + linear = True + + return linear diff --git a/CodeEntropy/levels/nodes/accumulators.py b/CodeEntropy/levels/nodes/accumulators.py index 8c2cb64c..722cc96b 100644 --- a/CodeEntropy/levels/nodes/accumulators.py +++ b/CodeEntropy/levels/nodes/accumulators.py @@ -16,8 +16,9 @@ from __future__ import annotations import logging +from collections.abc import MutableMapping from dataclasses import dataclass -from typing import Any, Dict, List, MutableMapping +from typing import Any import numpy as np @@ -30,19 +31,19 @@ class GroupIndex: """Bidirectional mapping between group ids and contiguous indices.""" - group_id_to_index: Dict[int, int] - index_to_group_id: List[int] + group_id_to_index: dict[int, int] + index_to_group_id: list[int] @dataclass(frozen=True) class CovarianceAccumulators: """Container for covariance mean accumulators and frame counters.""" - force_covariances: Dict[str, Any] - torque_covariances: Dict[str, Any] - frame_counts: Dict[str, Any] - forcetorque_covariances: Dict[str, Any] - forcetorque_counts: Dict[str, Any] + force_covariances: dict[str, Any] + torque_covariances: dict[str, Any] + frame_counts: dict[str, Any] + forcetorque_covariances: dict[str, Any] + forcetorque_counts: dict[str, Any] class InitCovarianceAccumulatorsNode: @@ -68,7 +69,7 @@ class InitCovarianceAccumulatorsNode: - force_torque_counts -> forcetorque_counts """ - def run(self, shared_data: Dict[str, Any]) -> Dict[str, Any]: + def run(self, shared_data: dict[str, Any]) -> dict[str, Any]: """Initialize and attach all accumulator structures into shared_data. Args: @@ -93,7 +94,7 @@ def run(self, shared_data: Dict[str, Any]) -> Dict[str, Any]: return self._build_return_payload(shared_data) @staticmethod - def _build_group_index(groups: Dict[int, Any]) -> GroupIndex: + def _build_group_index(groups: dict[int, Any]) -> GroupIndex: """Build group id <-> index mappings. Args: @@ -171,7 +172,7 @@ def _attach_backwards_compatible_aliases(shared_data: SharedData) -> None: shared_data["force_torque_counts"] = shared_data["forcetorque_counts"] @staticmethod - def _build_return_payload(shared_data: SharedData) -> Dict[str, Any]: + def _build_return_payload(shared_data: SharedData) -> dict[str, Any]: """Build the return payload containing initialized keys. Args: diff --git a/CodeEntropy/levels/nodes/beads.py b/CodeEntropy/levels/nodes/beads.py index abd5b122..f81d5e86 100644 --- a/CodeEntropy/levels/nodes/beads.py +++ b/CodeEntropy/levels/nodes/beads.py @@ -12,8 +12,9 @@ import logging from collections import defaultdict +from collections.abc import MutableMapping from dataclasses import dataclass -from typing import Any, DefaultDict, Dict, List, MutableMapping, Tuple +from typing import Any import numpy as np @@ -21,8 +22,8 @@ logger = logging.getLogger(__name__) -BeadKey = Tuple[int, str] | Tuple[int, str, int] -BeadsMap = Dict[BeadKey, List[np.ndarray]] +BeadKey = tuple[int, str] | tuple[int, str, int] +BeadsMap = dict[BeadKey, list[np.ndarray]] @dataclass(frozen=True) @@ -62,7 +63,7 @@ def __init__(self, hierarchy: HierarchyBuilder | None = None) -> None: """ self._hier = hierarchy or HierarchyBuilder() - def run(self, shared_data: Dict[str, Any]) -> Dict[str, Any]: + def run(self, shared_data: dict[str, Any]) -> dict[str, Any]: """Build bead definitions for all molecules and levels. Args: @@ -77,7 +78,7 @@ def run(self, shared_data: Dict[str, Any]) -> Dict[str, Any]: KeyError: If required keys are missing from `shared_data`. """ u = shared_data["reduced_universe"] - levels: List[List[str]] = shared_data["levels"] + levels: list[list[str]] = shared_data["levels"] beads: BeadsMap = {} fragments = u.atoms.fragments @@ -98,7 +99,7 @@ def run(self, shared_data: Dict[str, Any]) -> Dict[str, Any]: return {"beads": beads} def _add_united_atom_beads( - self, beads: MutableMapping[BeadKey, List[np.ndarray]], mol_id: int, mol + self, beads: MutableMapping[BeadKey, list[np.ndarray]], mol_id: int, mol ) -> None: """Compute and store united-atom beads grouped into residue buckets. @@ -109,7 +110,7 @@ def _add_united_atom_beads( """ ua_beads = self._hier.get_beads(mol, "united_atom") - buckets: DefaultDict[int, List[np.ndarray]] = defaultdict(list) + buckets: defaultdict[int, list[np.ndarray]] = defaultdict(list) for bead_i, bead in enumerate(ua_beads): atom_indices = self._validate_bead_indices( bead, mol_id=mol_id, level="united_atom", bead_i=bead_i @@ -124,7 +125,7 @@ def _add_united_atom_beads( beads[(mol_id, "united_atom", local_res_id)] = buckets.get(local_res_id, []) def _add_residue_beads( - self, beads: MutableMapping[BeadKey, List[np.ndarray]], mol_id: int, mol + self, beads: MutableMapping[BeadKey, list[np.ndarray]], mol_id: int, mol ) -> None: """Compute and store residue beads. @@ -134,7 +135,7 @@ def _add_residue_beads( mol: MDAnalysis AtomGroup representing the molecule. """ res_beads = self._hier.get_beads(mol, "residue") - kept: List[np.ndarray] = [] + kept: list[np.ndarray] = [] for bead_i, bead in enumerate(res_beads): atom_indices = self._validate_bead_indices( @@ -154,7 +155,7 @@ def _add_residue_beads( ) def _add_polymer_beads( - self, beads: MutableMapping[BeadKey, List[np.ndarray]], mol_id: int, mol + self, beads: MutableMapping[BeadKey, list[np.ndarray]], mol_id: int, mol ) -> None: """Compute and store polymer beads. @@ -164,7 +165,7 @@ def _add_polymer_beads( mol: MDAnalysis AtomGroup representing the molecule. """ poly_beads = self._hier.get_beads(mol, "polymer") - kept: List[np.ndarray] = [] + kept: list[np.ndarray] = [] for bead_i, bead in enumerate(poly_beads): atom_indices = self._validate_bead_indices( diff --git a/CodeEntropy/levels/nodes/conformations.py b/CodeEntropy/levels/nodes/conformations.py index 90a088fb..5e379ca3 100644 --- a/CodeEntropy/levels/nodes/conformations.py +++ b/CodeEntropy/levels/nodes/conformations.py @@ -9,12 +9,12 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any, Dict +from typing import Any from CodeEntropy.levels.dihedrals import ConformationStateBuilder -SharedData = Dict[str, Any] -ConformationalStates = Dict[str, Any] +SharedData = dict[str, Any] +ConformationalStates = dict[str, Any] @dataclass(frozen=True) @@ -58,7 +58,7 @@ def __init__(self, universe_operations: Any) -> None: def run( self, shared_data: SharedData, *, progress: object | None = None - ) -> Dict[str, ConformationalStates]: + ) -> dict[str, ConformationalStates]: """Compute conformational states and store them in shared_data. Args: diff --git a/CodeEntropy/levels/nodes/covariance.py b/CodeEntropy/levels/nodes/covariance.py index dd56d180..27b9edbf 100644 --- a/CodeEntropy/levels/nodes/covariance.py +++ b/CodeEntropy/levels/nodes/covariance.py @@ -19,7 +19,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import Any import numpy as np from MDAnalysis.lib.mdamath import make_whole @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) -FrameCtx = Dict[str, Any] +FrameCtx = dict[str, Any] Matrix = np.ndarray @@ -52,7 +52,7 @@ def __init__(self) -> None: """Initialise the frame covariance node.""" self._ft = ForceTorqueCalculator() - def run(self, ctx: FrameCtx) -> Dict[str, Any]: + def run(self, ctx: FrameCtx) -> dict[str, Any]: """Compute and store per-frame force/torque (and optional FT) matrices. Args: @@ -83,15 +83,15 @@ def run(self, ctx: FrameCtx) -> Dict[str, Any]: box = self._try_get_box(u) fragments = u.atoms.fragments - out_force: Dict[str, Dict[Any, Matrix]] = {"ua": {}, "res": {}, "poly": {}} - out_torque: Dict[str, Dict[Any, Matrix]] = {"ua": {}, "res": {}, "poly": {}} - out_ft: Optional[Dict[str, Dict[Any, Matrix]]] = ( + out_force: dict[str, dict[Any, Matrix]] = {"ua": {}, "res": {}, "poly": {}} + out_torque: dict[str, dict[Any, Matrix]] = {"ua": {}, "res": {}, "poly": {}} + out_ft: dict[str, dict[Any, Matrix]] | None = ( {"ua": {}, "res": {}, "poly": {}} if combined else None ) - ua_molcount: Dict[Tuple[int, int], int] = {} - res_molcount: Dict[int, int] = {} - poly_molcount: Dict[int, int] = {} + ua_molcount: dict[tuple[int, int], int] = {} + res_molcount: dict[int, int] = {} + poly_molcount: dict[int, int] = {} for group_id, mol_ids in groups.items(): for mol_id in mol_ids: @@ -152,7 +152,7 @@ def run(self, ctx: FrameCtx) -> Dict[str, Any]: combined=combined, ) - frame_cov: Dict[str, Any] = {"force": out_force, "torque": out_torque} + frame_cov: dict[str, Any] = {"force": out_force, "torque": out_torque} if combined and out_ft is not None: frame_cov["forcetorque"] = out_ft @@ -166,15 +166,15 @@ def _process_united_atom( mol: Any, mol_id: int, group_id: int, - beads: Dict[Any, List[Any]], + beads: dict[Any, list[Any]], axes_manager: Any, - box: Optional[np.ndarray], + box: np.ndarray | None, force_partitioning: float, customised_axes: bool, is_highest: bool, - out_force: Dict[str, Dict[Any, Matrix]], - out_torque: Dict[str, Dict[Any, Matrix]], - molcount: Dict[Tuple[int, int], int], + out_force: dict[str, dict[Any, Matrix]], + out_torque: dict[str, dict[Any, Matrix]], + molcount: dict[tuple[int, int], int], ) -> None: """Compute UA-level force/torque second moments for one molecule. @@ -235,16 +235,16 @@ def _process_residue( mol: Any, mol_id: int, group_id: int, - beads: Dict[Any, List[Any]], + beads: dict[Any, list[Any]], axes_manager: Any, - box: Optional[np.ndarray], + box: np.ndarray | None, customised_axes: bool, force_partitioning: float, is_highest: bool, - out_force: Dict[str, Dict[Any, Matrix]], - out_torque: Dict[str, Dict[Any, Matrix]], - out_ft: Optional[Dict[str, Dict[Any, Matrix]]], - molcount: Dict[int, int], + out_force: dict[str, dict[Any, Matrix]], + out_torque: dict[str, dict[Any, Matrix]], + out_ft: dict[str, dict[Any, Matrix]] | None, + molcount: dict[int, int], combined: bool, ) -> None: """Compute residue-level force/torque (and optional FT) moments for one @@ -317,15 +317,15 @@ def _process_polymer( mol: Any, mol_id: int, group_id: int, - beads: Dict[Any, List[Any]], + beads: dict[Any, list[Any]], axes_manager: Any, - box: Optional[np.ndarray], + box: np.ndarray | None, force_partitioning: float, is_highest: bool, - out_force: Dict[str, Dict[Any, Matrix]], - out_torque: Dict[str, Dict[Any, Matrix]], - out_ft: Optional[Dict[str, Dict[Any, Matrix]]], - molcount: Dict[int, int], + out_force: dict[str, dict[Any, Matrix]], + out_torque: dict[str, dict[Any, Matrix]], + out_ft: dict[str, dict[Any, Matrix]] | None, + molcount: dict[int, int], combined: bool, ) -> None: """Compute polymer-level force/torque (and optional FT) moments for one @@ -412,14 +412,14 @@ def _process_polymer( def _build_ua_vectors( self, *, - bead_groups: List[Any], + bead_groups: list[Any], residue_atoms: Any, axes_manager: Any, - box: Optional[np.ndarray], + box: np.ndarray | None, force_partitioning: float, customised_axes: bool, is_highest: bool, - ) -> Tuple[List[np.ndarray], List[np.ndarray]]: + ) -> tuple[list[np.ndarray], list[np.ndarray]]: """Build force/torque vectors for UA-level beads of one residue. Args: @@ -435,8 +435,8 @@ def _build_ua_vectors( A tuple (force_vecs, torque_vecs), each a list of (3,) vectors ordered by UA bead index within the residue. """ - force_vecs: List[np.ndarray] = [] - torque_vecs: List[np.ndarray] = [] + force_vecs: list[np.ndarray] = [] + torque_vecs: list[np.ndarray] = [] for ua_i, bead in enumerate(bead_groups): if customised_axes: @@ -477,13 +477,13 @@ def _build_residue_vectors( self, *, mol: Any, - bead_groups: List[Any], + bead_groups: list[Any], axes_manager: Any, - box: Optional[np.ndarray], + box: np.ndarray | None, customised_axes: bool, force_partitioning: float, is_highest: bool, - ) -> Tuple[List[np.ndarray], List[np.ndarray]]: + ) -> tuple[list[np.ndarray], list[np.ndarray]]: """Build force/torque vectors for residue-level beads of one molecule. Args: @@ -499,8 +499,8 @@ def _build_residue_vectors( A tuple (force_vecs, torque_vecs), each a list of (3,) vectors ordered by residue index within the molecule. """ - force_vecs: List[np.ndarray] = [] - torque_vecs: List[np.ndarray] = [] + force_vecs: list[np.ndarray] = [] + torque_vecs: list[np.ndarray] = [] for local_res_i, bead in enumerate(bead_groups): trans_axes, rot_axes, center, moi = self._get_residue_axes( @@ -541,7 +541,7 @@ def _get_residue_axes( local_res_i: int, axes_manager: Any, customised_axes: bool, - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Get translation/rotation axes, center and MOI for a residue bead. Args: @@ -581,7 +581,7 @@ def _get_polymer_axes( mol: Any, bead: Any, axes_manager: Any, - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Get translation/rotation axes, center and MOI for a polymer bead. Args: @@ -608,7 +608,7 @@ def _get_polymer_axes( ) @staticmethod - def _get_shared(ctx: FrameCtx) -> Dict[str, Any]: + def _get_shared(ctx: FrameCtx) -> dict[str, Any]: """Fetch shared context from a frame context dict. Args: @@ -625,7 +625,7 @@ def _get_shared(ctx: FrameCtx) -> Dict[str, Any]: return ctx["shared"] @staticmethod - def _try_get_box(u: Any) -> Optional[np.ndarray]: + def _try_get_box(u: Any) -> np.ndarray | None: """Extract a (3,) box vector from an MDAnalysis universe when available. Args: @@ -641,7 +641,7 @@ def _try_get_box(u: Any) -> Optional[np.ndarray]: return None @staticmethod - def _inc_mean(old: Optional[np.ndarray], new: np.ndarray, n: int) -> np.ndarray: + def _inc_mean(old: np.ndarray | None, new: np.ndarray, n: int) -> np.ndarray: """Compute an incremental mean (streaming average). Args: @@ -658,7 +658,7 @@ def _inc_mean(old: Optional[np.ndarray], new: np.ndarray, n: int) -> np.ndarray: @staticmethod def _build_ft_block( - force_vecs: List[np.ndarray], torque_vecs: List[np.ndarray] + force_vecs: list[np.ndarray], torque_vecs: list[np.ndarray] ) -> np.ndarray: """Build a combined force-torque block matrix for a frame. @@ -683,7 +683,7 @@ def _build_ft_block( if n == 0: raise ValueError("No bead vectors available to build an FT matrix.") - bead_vecs: List[np.ndarray] = [] + bead_vecs: list[np.ndarray] = [] for Fi, Ti in zip(force_vecs, torque_vecs, strict=True): Fi = np.asarray(Fi, dtype=float).reshape(-1) Ti = np.asarray(Ti, dtype=float).reshape(-1) @@ -691,7 +691,7 @@ def _build_ft_block( raise ValueError("Each force/torque vector must be length 3.") bead_vecs.append(np.concatenate([Fi, Ti], axis=0)) - blocks: List[List[np.ndarray]] = [[None] * n for _ in range(n)] + blocks: list[list[np.ndarray]] = [[None] * n for _ in range(n)] for i in range(n): for j in range(i, n): sub = np.outer(bead_vecs[i], bead_vecs[j]) diff --git a/CodeEntropy/levels/nodes/detect_levels.py b/CodeEntropy/levels/nodes/detect_levels.py index 6faecaaa..292d6034 100644 --- a/CodeEntropy/levels/nodes/detect_levels.py +++ b/CodeEntropy/levels/nodes/detect_levels.py @@ -6,12 +6,12 @@ from __future__ import annotations -from typing import Any, Dict, List, Tuple +from typing import Any from CodeEntropy.levels.hierarchy import HierarchyBuilder -SharedData = Dict[str, Any] -Levels = List[List[str]] +SharedData = dict[str, Any] +Levels = list[list[str]] class DetectLevelsNode: @@ -26,7 +26,7 @@ def __init__(self) -> None: """Initialize the node with a HierarchyBuilder helper.""" self._hierarchy = HierarchyBuilder() - def run(self, shared_data: SharedData) -> Dict[str, Any]: + def run(self, shared_data: SharedData) -> dict[str, Any]: """Detect levels and store results in shared_data. Args: @@ -53,7 +53,7 @@ def run(self, shared_data: SharedData) -> Dict[str, Any]: "number_molecules": number_molecules, } - def _detect_levels(self, universe: Any) -> Tuple[int, Levels]: + def _detect_levels(self, universe: Any) -> tuple[int, Levels]: """Delegate level detection to HierarchyBuilder. Args: diff --git a/CodeEntropy/levels/nodes/detect_molecules.py b/CodeEntropy/levels/nodes/detect_molecules.py index bfc188d5..5710dc5b 100644 --- a/CodeEntropy/levels/nodes/detect_molecules.py +++ b/CodeEntropy/levels/nodes/detect_molecules.py @@ -8,13 +8,13 @@ from __future__ import annotations import logging -from typing import Any, Dict +from typing import Any from CodeEntropy.molecules.grouping import MoleculeGrouper logger = logging.getLogger(__name__) -SharedData = Dict[str, Any] +SharedData = dict[str, Any] class DetectMoleculesNode: @@ -30,7 +30,7 @@ def __init__(self) -> None: """Initialize the node with a molecule grouping helper.""" self._grouping = MoleculeGrouper() - def run(self, shared_data: SharedData) -> Dict[str, Any]: + def run(self, shared_data: SharedData) -> dict[str, Any]: """Detect molecules and create grouping definitions. Args: diff --git a/CodeEntropy/levels/nodes/find_neighbors.py b/CodeEntropy/levels/nodes/find_neighbors.py new file mode 100644 index 00000000..3c9bd80f --- /dev/null +++ b/CodeEntropy/levels/nodes/find_neighbors.py @@ -0,0 +1,93 @@ +"""Find neighbor and symmetry info for orientational entropy calculations. + +This module defines a static DAG node that scans the trajectory and +finds neighbors and symmetry numbers. The resulting states are stored +in `shared_data` for later use by configurational entropy calculations. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from CodeEntropy.levels.neighbors import Neighbors + +SharedData = dict[str, Any] +ConformationalStates = dict[str, Any] + + +@dataclass(frozen=True) +class NeighborConfig: + """Configuration for neighbor finding. + + Attributes: + start: Start frame index (inclusive). + end: End frame index (exclusive). + step: Frame stride. + """ + + start: int + end: int + step: int + + +class ComputeNeighborsNode: + """Static node that finds neighbors from trajectory. + + Produces: + shared_data["neighbors"] = {} + shared_data["symmetry_number"] = {} + shared_data["linear"] = {} + + Where: + - neighbors is the average number of neighbors + - symmetry_number is the symmetry number of the molecule, int + - linear is a boolean; True for linear, False for non-linear + """ + + def __init__(self) -> None: + """Initialize the node.""" + self._neighbor_analysis = Neighbors() + + def run( + self, shared_data: SharedData, *, progress: object | None = None + ) -> SharedData: + """Compute conformational states and store them in shared_data. + + Args: + shared_data: Shared data dictionary. Requires: + - "reduced_universe" + - "levels" + - "groups" + - "start", "end", "step" + - "args" with attribute "bin_width" + progress: Optional progress sink provided by ResultsReporter.progress(). + + Returns: + shared_data: SharedData + """ + u = shared_data["reduced_universe"] + levels = shared_data["levels"] + groups = shared_data["groups"] + search_type = shared_data["args"].search_type + + # Get average number of neighbors + number_neighbors = self._neighbor_analysis.get_neighbors( + universe=u, + levels=levels, + groups=groups, + search_type=search_type, + ) + + # Get symmetry numbers and linearity + symmetry_number, linear = self._neighbor_analysis.get_symmetry( + universe=u, + groups=groups, + ) + + # Add information to shared_data + shared_data["neighbors"] = number_neighbors + shared_data["symmetry_number"] = symmetry_number + shared_data["linear"] = linear + + return shared_data diff --git a/CodeEntropy/levels/search.py b/CodeEntropy/levels/search.py new file mode 100644 index 00000000..d578a181 --- /dev/null +++ b/CodeEntropy/levels/search.py @@ -0,0 +1,240 @@ +"""These functions find neighbors. + +There are different functions for different types of neighbor searching. +Currently RAD is the default with grid as an alternative. +""" + +import MDAnalysis as mda +import numpy as np + + +class Search: + """ + Class for functions to find neighbors. + """ + + def __init__(self): + """ + Initializes the Search class with a placeholder for the system + trajectory. + """ + + self._universe = None + self._mol_id = None + + def get_RAD_neighbors(self, universe, mol_id): + """ + Find the neighbors of molecule with index mol_id. + + Args: + universe: The MDAnalysis universe of the system. + mol_id (int): the index for the central molecule. + + Returns: + neighbor_indices (list of ints): the list of neighboring molecule + indices. + """ + number_molecules = len(universe.atoms.fragments) + + central_position = universe.atoms.fragments[mol_id].center_of_mass() + + # Find distances between molecule of interest and other molecules in the system + distances = {} + for molecule_index_j in range(number_molecules): + if molecule_index_j != mol_id: + j_position = universe.atoms.fragments[molecule_index_j].center_of_mass() + distances[molecule_index_j] = self.get_distance( + j_position, central_position, universe.dimensions + ) + + # Sort distances smallest to largest + sorted_dist = sorted(distances.items(), key=lambda item: item[1]) + + # Get indices of neighbors + neighbor_indices = self._get_RAD_indices( + central_position, sorted_dist, universe, number_molecules + ) + + return neighbor_indices + + def _get_RAD_indices(self, i_coords, sorted_distances, system, number_molecules): + # pylint: disable=too-many-locals + r""" + For a given set of atom coordinates, find its RAD shell from the distance + sorted list, truncated to the closest 30 molecules. + + This function calculates coordination shells using RAD the relative + angular distance, as defined first in DOI:10.1063/1.4961439 + where molecules are defined as neighbors if + they fulfil the following condition: + + .. math:: + \Bigg(\frac{1}{r_{ij}}\Bigg)^2 > + \Bigg(\frac{1}{r_{ik}}\Bigg)^2 \cos \theta_{jik} + + For a given particle :math:`i`, neighbor :math:`j` is in its coordination + shell if :math:`k` is not blocking particle :math:`j`. In this implementation + of RAD, we enforce symmetry, whereby neighboring particles must be in each + others coordination shells. + + Args: + i_coords: xyz centre of mass of molecule :math:`i` + sorted_indices: dict of index and distance pairs sorted by distance + system: mdanalysis instance of atoms in a frame + + Returns: + shell: list of indices of particles in the RAD shell of neighbors. + """ + # 1. truncate neighbor list to closest 30 united atoms and iterate + # through neighbors from closest to furthest/ + shell = [] + count = -1 + limit = min(number_molecules - 1, 30) + for y in range(limit): + count += 1 + j_idx = sorted_distances[y][0] + j_coords = system.atoms.fragments[j_idx].center_of_mass() + r_ij = sorted_distances[y][1] + blocked = False + # 3. iterate through neighbors other than atom j and check if they block + # it from molecule i + for z in range(count): # only closer units can block + k_idx = sorted_distances[z][0] + k_coords = system.atoms.fragments[k_idx].center_of_mass() + r_ik = sorted_distances[z][1] + # 4. find the angle jik + costheta_jik = self.get_angle( + j_coords, i_coords, k_coords, system.dimensions[:3] + ) + if np.isnan(costheta_jik): + break + # 5. check if k blocks j from i + LHS = (1 / r_ij) ** 2 + RHS = ((1 / r_ik) ** 2) * costheta_jik + if LHS < RHS: + blocked = True + break + # 6. if j is not blocked from i by k, then its in i's shell + if blocked is False: + shell.append(j_idx) + + return shell + + def get_angle( + self, a: np.ndarray, b: np.ndarray, c: np.ndarray, dimensions: np.ndarray + ): + """ + Get the angle between three atoms, taking into account periodic + bondary conditions. + + b is the vertex of the angle. + + Pairwise differences between the coordinates are used with the + distances calculated as the square root of the sum of the squared + x, y, and z coordinates. + + Args: + a: (3,) array of atom cooordinates + b: (3,) array of atom cooordinates + c: (3,) array of atom cooordinates + dimensions: (3,) array of system box dimensions. + + Returns: + cosine_angle: float, cosine of the angle abc. + """ + # Differences in positions + ba = np.abs(a - b) + bc = np.abs(c - b) + ac = np.abs(c - a) + + # Correct for periodic boundary conditions + ba = np.where(ba > 0.5 * dimensions, ba - dimensions, ba) + bc = np.where(bc > 0.5 * dimensions, bc - dimensions, bc) + ac = np.where(ac > 0.5 * dimensions, ac - dimensions, ac) + + # Get distances + dist_ba = np.sqrt((ba**2).sum(axis=-1)) + dist_bc = np.sqrt((bc**2).sum(axis=-1)) + dist_ac = np.sqrt((ac**2).sum(axis=-1)) + + # Trigonometry + cosine_angle = (dist_ac**2 - dist_bc**2 - dist_ba**2) / (-2 * dist_bc * dist_ba) + + return cosine_angle + + def get_distance(self, j_position, i_position, dimensions): + """ + Function to calculate the distance between two points. + Take periodic boundary conditions into account. + + Args: + j_position: the x, y, z coordinates of point 1 + i_position: the x, y, z coordinates of the other point + dimensions: the dimensions of the simulation box + + Returns: + distance: float, the distance between the two points + """ + # Difference in positions + delta = np.abs(j_position - i_position) + + # Account for periodic boundary conditions + delta = np.where(delta > 0.5 * dimensions, delta - dimensions, delta) + + # Get distance value + distance = np.sqrt((delta**2).sum(axis=-1)) + + return distance + + def get_grid_neighbors(self, universe, search_object, mol_id, highest_level): + """ + Use MDAnalysis neighbor search to find neighbors. + + For molecules with just one united atom, use the "A" search level to + find neighboring atoms. For larger molecules use the "R" search level + to find neighboring residues. + + The atoms/residues of the molecule of interest are removed from the + neighbor list. + + Args: + universe: MDAnalysis universe object for system. + mol_id: int, the index for the molecule of interest. + highest_level: str, molecule level. + + Returns: + neighbors: MDAnalysis atomgroup of the neighboring particles. + """ + search_object = mda.lib.NeighborSearch.AtomNeighborSearch(universe.atoms) + fragment = universe.atoms.fragments[mol_id] + selection_string = f"index {fragment.indices[0]}:{fragment.indices[-1]}" + molecule_atom_group = universe.select_atoms(selection_string) + + if highest_level == "united_atom": + # For united atom size molecules, use the grid search + # to find neighboring atoms + search_level = "A" + search = mda.lib.NeighborSearch.AtomNeighborSearch.search( + search_object, + molecule_atom_group, + radius=3.0, + level=search_level, + ) + # Make sure that the neighbors list does not include + # atoms from the central molecule + # neighbors = search - fragment.residues + neighbors = search - molecule_atom_group + else: + # For larger molecules, use the grid search to find neighboring residues + search_level = "R" + search = mda.lib.NeighborSearch.AtomNeighborSearch.search( + search_object, + molecule_atom_group, + radius=3.5, + level=search_level, + ) + # Make sure that the neighbors list does not include + # residues from the central molecule + neighbors = search - fragment.residues + + return neighbors diff --git a/CodeEntropy/molecules/grouping.py b/CodeEntropy/molecules/grouping.py index 3d3a5f29..96591843 100644 --- a/CodeEntropy/molecules/grouping.py +++ b/CodeEntropy/molecules/grouping.py @@ -14,15 +14,15 @@ """ import logging +from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass -from typing import Callable, Dict, List, Mapping, Sequence, Tuple logger = logging.getLogger(__name__) GroupId = int MoleculeId = int -MoleculeGroups = Dict[GroupId, List[MoleculeId]] -Signature = Tuple[int, Tuple[str, ...]] +MoleculeGroups = dict[GroupId, list[MoleculeId]] +Signature = tuple[int, tuple[str, ...]] @dataclass(frozen=True) @@ -128,7 +128,7 @@ def _group_by_signature(self, universe) -> MoleculeGroups: """ fragments = self._fragments(universe) - signature_to_rep: Dict[Signature, MoleculeId] = {} + signature_to_rep: dict[Signature, MoleculeId] = {} groups: MoleculeGroups = {} for mol_id, fragment in enumerate(fragments): @@ -174,7 +174,7 @@ def _signature(self, fragment) -> Signature: def _representative_id( self, - signature_to_rep: Dict[Signature, MoleculeId], + signature_to_rep: dict[Signature, MoleculeId], signature: Signature, candidate_id: MoleculeId, ) -> MoleculeId: @@ -200,5 +200,5 @@ def _log_summary(self, groups: MoleculeGroups) -> None: Args: groups: Group mapping to summarize. """ - logger.debug("Number of molecule groups: %d", len(groups)) - logger.debug("Molecule groups: %s", groups) + logger.debug(f"Number of molecule groups: {len(groups)}") + logger.debug(f"Molecule groups: {groups}") diff --git a/CodeEntropy/results/reporter.py b/CodeEntropy/results/reporter.py index c411139e..04ce5e24 100644 --- a/CodeEntropy/results/reporter.py +++ b/CodeEntropy/results/reporter.py @@ -21,7 +21,7 @@ import sys from contextlib import contextmanager from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any import numpy as np from rich.console import Console @@ -106,7 +106,7 @@ class ResultsReporter: """ - def __init__(self, console: Optional[Console] = None) -> None: + def __init__(self, console: Console | None = None) -> None: """Initialise a ResultsReporter. Args: @@ -114,9 +114,9 @@ def __init__(self, console: Optional[Console] = None) -> None: Console instance is created. """ self.console: Console = console or Console() - self.molecule_data: List[Tuple[Any, Any, Any, Any]] = [] - self.residue_data: List[List[Any]] = [] - self.group_labels: Dict[Any, Dict[str, Any]] = {} + self.molecule_data: list[tuple[Any, Any, Any, Any]] = [] + self.residue_data: list[list[Any]] = [] + self.group_labels: dict[Any, dict[str, Any]] = {} @staticmethod def clean_residue_name(resname: Any) -> str: @@ -131,7 +131,7 @@ def clean_residue_name(resname: Any) -> str: return re.sub(r"[-–—]", "", str(resname)) @staticmethod - def _gid_sort_key(x: Any) -> Tuple[int, Any]: + def _gid_sort_key(x: Any) -> tuple[int, Any]: """Stable sort key for group IDs. Group IDs may be numeric strings, ints, or other objects. @@ -153,7 +153,7 @@ def _gid_sort_key(x: Any) -> Tuple[int, Any]: return (1, sx) @staticmethod - def _safe_float(value: Any) -> Optional[float]: + def _safe_float(value: Any) -> float | None: """Convert value to float if possible; otherwise return None. Args: @@ -213,8 +213,8 @@ def add_group_label( self, group_id: Any, label: str, - residue_count: Optional[int] = None, - atom_count: Optional[int] = None, + residue_count: int | None = None, + atom_count: int | None = None, ) -> None: """Store metadata label for a group. @@ -244,7 +244,7 @@ def _log_grouped_results_tables(self) -> None: if not self.molecule_data: return - grouped: Dict[Any, List[Tuple[Any, Any, Any, Any]]] = {} + grouped: dict[Any, list[tuple[Any, Any, Any, Any]]] = {} for row in self.molecule_data: gid = row[0] grouped.setdefault(gid, []).append(row) @@ -259,8 +259,8 @@ def _log_grouped_results_tables(self) -> None: table.add_column("Result (J/mol/K)", justify="center", style="yellow") rows = grouped[gid] - non_total: List[Tuple[str, str, Any]] = [] - totals: List[Tuple[str, str, Any]] = [] + non_total: list[tuple[str, str, Any]] = [] + totals: list[tuple[str, str, Any]] = [] for _gid, level, typ, val in rows: level_s = str(level) @@ -286,7 +286,7 @@ def _log_residue_table_grouped(self) -> None: if not self.residue_data: return - grouped: Dict[Any, List[List[Any]]] = {} + grouped: dict[Any, list[list[Any]]] = {} for row in self.residue_data: gid = row[0] grouped.setdefault(gid, []).append(row) @@ -335,7 +335,7 @@ def save_dataframes_as_json( residue_df, output_file: str, *, - args: Optional[Any] = None, + args: Any | None = None, include_raw_tables: bool = False, ) -> None: """Save results to a grouped JSON structure. @@ -368,9 +368,9 @@ def _build_grouped_payload( *, molecule_df, residue_df, - args: Optional[Any], + args: Any | None, include_raw_tables: bool, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Build a grouped JSON-serializable payload from result dataframes. Args: @@ -385,7 +385,7 @@ def _build_grouped_payload( mol_rows = molecule_df.to_dict(orient="records") res_rows = residue_df.to_dict(orient="records") - groups: Dict[str, Dict[str, Any]] = {} + groups: dict[str, dict[str, Any]] = {} for row in mol_rows: gid = str(row.get("Group ID")) @@ -413,7 +413,7 @@ def _build_grouped_payload( comps = g["components"].values() g["total"] = float(sum(comps)) if comps else 0.0 - payload: Dict[str, Any] = { + payload: dict[str, Any] = { "args": self._serialize_args(args), "provenance": self._provenance(), "groups": groups, @@ -426,7 +426,7 @@ def _build_grouped_payload( return payload @staticmethod - def _serialize_args(args: Optional[Any]) -> Dict[str, Any]: + def _serialize_args(args: Any | None) -> dict[str, Any]: """Turn argparse Namespace / dict / object into a JSON-serializable dict. Args: @@ -449,7 +449,7 @@ def _serialize_args(args: Optional[Any]) -> Dict[str, Any]: except Exception: return {} - out: Dict[str, Any] = {} + out: dict[str, Any] = {} for k, v in base.items(): if isinstance(v, np.ndarray): out[k] = v.tolist() @@ -460,14 +460,14 @@ def _serialize_args(args: Optional[Any]) -> Dict[str, Any]: return out @staticmethod - def _provenance() -> Dict[str, Any]: + def _provenance() -> dict[str, Any]: """Build a provenance dictionary for exported results. Returns: Dictionary with python version, platform string, CodeEntropy package version (if available), and git sha (if available). """ - prov: Dict[str, Any] = { + prov: dict[str, Any] = { "python": sys.version.split()[0], "platform": platform.platform(), } @@ -483,7 +483,7 @@ def _provenance() -> Dict[str, Any]: return prov @staticmethod - def _try_get_git_sha() -> Optional[str]: + def _try_get_git_sha() -> str | None: """Try to determine the current git commit SHA. The SHA is obtained from: @@ -513,8 +513,7 @@ def _try_get_git_sha() -> Optional[str]: proc = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=str(repo_guess), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + capture_output=True, text=True, ) if proc.returncode != 0: diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index 2caa8065..7104d80f 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -34,6 +34,7 @@ requirements: - networkx >=3.6,<3.7 - matplotlib >=3.10,<3.11 - requests >=2.32,<3.0 + - rdkit>=2025.9.5 - dask >=2026.1.2,<2026.2.0 - distributed >=2026.1.2,<2026.2.0 - dask-jobqueue >=0.9,<0.10 diff --git a/docs/conf.py b/docs/conf.py index de663408..b24b6f32 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Configuration file for the Sphinx documentation builder. # diff --git a/docs/config.yaml b/docs/config.yaml index 3b1240ae..7de274a7 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -3,7 +3,7 @@ run1: top_traj_file: ["1AKI_prod.tpr", "1AKI_prod.trr"] force_file: - file_formate: + file_format: selection_string: 'all' start: 0 end: 500 @@ -14,3 +14,6 @@ run1: force_partitioning: 0.5 water_entropy: False grouping: 'molecules' + customised_axes: True + combinded_forcetorque: False + search_type: 'RAD' diff --git a/docs/getting_started.rst b/docs/getting_started.rst index ff3cdbaf..ff9ef211 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -204,16 +204,20 @@ The ``top_traj_file`` argument is required; other arguments have default values. - ``str`` * - ``--kcal_force_units`` - Set input units as kcal/mol - - ``bool`` - ``False`` + - ``bool`` * - ``--combined_forcetorque`` - Use the combined force-torque covariance matrix for the highest level to match the 2019 paper - - ``bool`` - ``True`` + - ``bool`` * - ``--customised_axes`` - Use custom bonded axes to get COM, MOI and PA that match the 2019 paper - - ``bool`` - ``True`` + - ``bool`` + * - ``--search_type`` + - Method for finding neighbouring molecules + - ``RAD`` + - ``str`` Averaging --------- @@ -224,6 +228,15 @@ controls how averaging is done. * ``molecules`` (default): molecules are grouped by atom names and counts. * ``each``: each molecule is treated as its own group (no averaging). +Counting Neighbours +------------------- + +The code counts the number of neighbours for the orientational entropy. +There are currently two options, chosen by the ``search_type`` argument. + +* ``RAD`` (default): Uses the relative angular distance method. +* ``grid``: Uses the MDAnalysis NeighborSearch method. + Examples -------- diff --git a/docs/science.rst b/docs/science.rst index 6d89005e..1a98f977 100644 --- a/docs/science.rst +++ b/docs/science.rst @@ -4,7 +4,7 @@ Multiscale Cell Correlation Theory This section is to describe the scientific theory behind the method used in CodeEntropy. The multiscale cell correlation (MCC) method [1-3] has been developed in the group of Richard Henchman to calculate entropy from molecular dynamics (MD) simulations. -It has been applied to liquids [1,3,4], proteins [2,5,6], solutions [6-9], and complexes [6,7]. +It has been applied to liquids [1,3,4], proteins [2,5,6], solutions [6,8-10], and complexes [6,8]. The purpose of this project is to develop and release well written code that enables users from any group to calculate the entropy from their simulations using the MCC. The latest code can be found at github.com/ccpbiosim/codeentropy. @@ -32,13 +32,15 @@ Model. 60 (2020), pp. 5540–5551. [6] Jas Kalayan et al. “Total Free Energy Analysis of Fully Hydrated Proteins”. In: Proteins 91 (2023), pp. 74–90. +[7] Jonathan Higham and Richard H. Henchman. "Locally adaptive method to define coordination shell". In: J. Chem. Phys. 145 (2016), pp. 084108 + Additional application examples ------------------------------- -[7] Hafiz Saqib Ali et al."Energy-entropy method using Multiscale Cell Correlation to calculate binding free energies in the SAMPL8 Host-Guest Challenge". In: Journal of Computer Aided Molecular Design 35 (2021), 911-921. +[8] Hafiz Saqib Ali et al."Energy-entropy method using Multiscale Cell Correlation to calculate binding free energies in the SAMPL8 Host-Guest Challenge". In: Journal of Computer Aided Molecular Design 35 (2021), 911-921. -[8] Fabio Falcioni et al. "Energy-entropy prediction of octanol-water logP of SAMPL7 N-acylsulfonamide bioisosters". In Journal of Computer Aided Molecular Design 35 (2021) 831-840. +[9] Fabio Falcioni et al. "Energy-entropy prediction of octanol-water logP of SAMPL7 N-acylsulfonamide bioisosters". In Journal of Computer Aided Molecular Design 35 (2021) 831-840. -[9] Hafiz Saqib Ali et al. "Energy-entropy Multiscale Cell Correlation method to predict toluene–water log P in the SAMPL9 challenge". In Physical Chemistry Chemical Physics 25 (2023), 27524-27531. +[10] Hafiz Saqib Ali et al. "Energy-entropy Multiscale Cell Correlation method to predict toluene–water log P in the SAMPL9 challenge". In Physical Chemistry Chemical Physics 25 (2023), 27524-27531. Hierarchy --------- @@ -98,6 +100,31 @@ Orientational Entropy --------------------- Orientational entropy is the term that comes from the molecule's environment (or the intermolecular configuration). The different environments are the different states for the molecule, and the statistics can be used to calculate the entropy. -The simplest part is counting the number of neighbours, but symmetry should be accounted for in determining the number of orientations. -For water, the hydrogen bonds are very important and the number of hydrogen bond donors and acceptors in the shell around the water molecule affects the number of unique orientations. +The number of orientations :math:`\Omega_{\mathrm{orient}}` relates to the number of neighbors. +We are using the relative angular distance (RAD) method for identifying neighbours [7]. +This method considers a molecule j as part of the coordination shell of the central molecule i, if for all other molecules k: + +.. math:: + \frac{1}{r_{ij}^2} > \frac{1}{r_{ik}^2} \mathrm{cos} \theta_{jik} + +where, :math:`r_{ij}` is the distance between i and j and :math:`\theta_{jik}` is the angle between j, i, and k (with i at the vertex). +The MDAnalysis NeighborSearch method can be used as an alternative to RAD, but the grid based search relies on an arbitrary cutoff. + +The number of orientations also depends on the symmetry number of the molecule and if it is linear. +Linear molecules have 2 rotational degrees of freedom and non-linear molecules have 3 rotational degrees of freedom. +CodeEntropy is using the united atom beads to determine if a molecule is treated as the linear case (for example, carbon dioxide and methanol are both considered linear). + +.. math:: + \Omega_{\mathrm{orient}} = max \left\{ 1, N^{3/2} \pi^{1/2} / \sigma \right\} + +.. math:: + \Omega_{\mathrm{orient,linear}} = max \left\{ 1, N / \sigma \right\} + +where N is the number of neighbours the molecule has averaged over the number of frames and :math:`\sigma` is the symmetry number of the molecule. +The max of 1 or the number of neighbours expression prevents the resulting orientational entropy value being less than zero. + +.. math:: + S_{\mathrm{orient}} = R \ln{ \Omega_{\mathrm{orient}} } + +where R is the gas constant. diff --git a/pyproject.toml b/pyproject.toml index 4aac93b0..4a3777b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ dependencies = [ "matplotlib>=3.10,<3.11", "waterEntropy>=2,<2.1", "requests>=2.32,<3.0", + "rdkit>=2025.9.5", ] [project.urls] @@ -87,7 +88,7 @@ line-length = 88 target-version = "py311" [tool.ruff.lint] -select = ["E", "F", "I", "B"] +select = ["E", "F", "I", "B", "UP"] [tool.ruff.format] quote-style = "double" diff --git a/tests/regression/baselines/dna.json b/tests/regression/baselines/dna.json index f1d58088..ab5a1e04 100644 --- a/tests/regression/baselines/dna.json +++ b/tests/regression/baselines/dna.json @@ -1,8 +1,8 @@ { "args": { "top_traj_file": [ - "/home/tdo96567/BioSim/CodeEntropy/tests/data/dna/md_A4_dna.tpr", - "/home/tdo96567/BioSim/CodeEntropy/tests/data/dna/md_A4_dna_xf.trr" + "/home/ogo12949/CodeEntropy/.testdata/dna/md_A4_dna.tpr", + "/home/ogo12949/CodeEntropy/.testdata/dna/md_A4_dna_xf.trr" ], "force_file": null, "file_format": null, @@ -14,18 +14,19 @@ "bin_width": 30, "temperature": 298.0, "verbose": false, - "output_file": "/tmp/pytest-of-tdo96567/pytest-60/test_regression_matches_baseli3/job001/output_file.json", + "output_file": "/tmp/pytest-of-ogo12949/pytest-5/test_regression_matches_baseli0/job001/output_file.json", "force_partitioning": 0.5, "water_entropy": true, "grouping": "molecules", "combined_forcetorque": true, - "customised_axes": true + "customised_axes": true, + "search_type": "RAD" }, "provenance": { - "python": "3.14.0", - "platform": "Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39", - "codeentropy_version": "1.0.7", - "git_sha": "226b37f7b206adba1b60253c41c7a0d467e75a58" + "python": "3.13.5", + "platform": "Linux-6.17.0-1011-oem-x86_64-with-glibc2.39", + "codeentropy_version": "2.0.0", + "git_sha": "44bd758498dfdd07eb391ee86e6ff2d86ea2bcb8" }, "groups": { "0": { @@ -36,10 +37,11 @@ "residue:Rovibrational": 3.376800684085249, "polymer:FTmat-Transvibrational": 12.341104347192612, "polymer:FTmat-Rovibrational": 0.0, - "united_atom:Conformational": 7.269386795471401, - "residue:Conformational": 0.0 + "united_atom:Conformational": 7.0411434528236345, + "residue:Conformational": 0.0, + "polymer:Orientational": 4.758905336627712 }, - "total": 22.989452505761392 + "total": 27.52011449974134 }, "1": { "components": { @@ -50,9 +52,10 @@ "polymer:FTmat-Transvibrational": 11.11037253388596, "polymer:FTmat-Rovibrational": 0.0, "united_atom:Conformational": 6.410455987098191, - "residue:Conformational": 0.46183561256411515 + "residue:Conformational": 0.46183561256411515, + "polymer:Orientational": 4.758905336627712 }, - "total": 20.387448519462218 + "total": 25.14635385608993 } } } diff --git a/tests/regression/baselines/methane.json b/tests/regression/baselines/methane.json index 482088b7..b509917a 100644 --- a/tests/regression/baselines/methane.json +++ b/tests/regression/baselines/methane.json @@ -1,10 +1,10 @@ { "args": { "top_traj_file": [ - "/home/tdo96567/BioSim/CodeEntropy/tests/data/methane/molecules.top", - "/home/tdo96567/BioSim/CodeEntropy/tests/data/methane/trajectory.crd" + "/home/ogo12949/CodeEntropy/.testdata/methane/molecules.top", + "/home/ogo12949/CodeEntropy/.testdata/methane/trajectory.crd" ], - "force_file": "/home/tdo96567/BioSim/CodeEntropy/tests/data/methane/forces.frc", + "force_file": "/home/ogo12949/CodeEntropy/.testdata/methane/forces.frc", "file_format": "MDCRD", "kcal_force_units": false, "selection_string": "all", @@ -14,27 +14,29 @@ "bin_width": 30, "temperature": 112.0, "verbose": false, - "output_file": "/tmp/pytest-of-tdo96567/pytest-60/test_regression_matches_baseli5/job001/output_file.json", + "output_file": "/tmp/pytest-of-ogo12949/pytest-5/test_regression_matches_baseli1/job001/output_file.json", "force_partitioning": 0.5, "water_entropy": true, "grouping": "molecules", "combined_forcetorque": true, - "customised_axes": true + "customised_axes": true, + "search_type": "RAD" }, "provenance": { - "python": "3.14.0", - "platform": "Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39", - "codeentropy_version": "1.0.7", - "git_sha": "226b37f7b206adba1b60253c41c7a0d467e75a58" + "python": "3.13.5", + "platform": "Linux-6.17.0-1011-oem-x86_64-with-glibc2.39", + "codeentropy_version": "2.0.0", + "git_sha": "44bd758498dfdd07eb391ee86e6ff2d86ea2bcb8" }, "groups": { "0": { "components": { "united_atom:Transvibrational": 75.73291215434239, "united_atom:Rovibrational": 68.80103728327107, - "united_atom:Conformational": 0.0 + "united_atom:Conformational": 0.0, + "united_atom:Orientational": 5.499485304173264 }, - "total": 144.53394943761344 + "total": 150.03343474178672 } } } diff --git a/tests/regression/baselines/methanol.json b/tests/regression/baselines/methanol.json index c5f1c8f3..62ae94c7 100644 --- a/tests/regression/baselines/methanol.json +++ b/tests/regression/baselines/methanol.json @@ -1,10 +1,10 @@ { "args": { "top_traj_file": [ - "/home/tdo96567/BioSim/CodeEntropy/tests/data/methanol/molecules.top", - "/home/tdo96567/BioSim/CodeEntropy/tests/data/methanol/trajectory.crd" + "/home/ogo12949/CodeEntropy/.testdata/methanol/molecules.top", + "/home/ogo12949/CodeEntropy/.testdata/methanol/trajectory.crd" ], - "force_file": "/home/tdo96567/BioSim/CodeEntropy/tests/data/methanol/forces.frc", + "force_file": "/home/ogo12949/CodeEntropy/.testdata/methanol/forces.frc", "file_format": "MDCRD", "kcal_force_units": false, "selection_string": "all", @@ -14,18 +14,19 @@ "bin_width": 30, "temperature": 298.0, "verbose": false, - "output_file": "/tmp/pytest-of-tdo96567/pytest-60/test_regression_matches_baseli6/job001/output_file.json", + "output_file": "/tmp/pytest-of-ogo12949/pytest-5/test_regression_matches_baseli2/job001/output_file.json", "force_partitioning": 0.5, "water_entropy": true, "grouping": "molecules", "combined_forcetorque": true, - "customised_axes": true + "customised_axes": true, + "search_type": "RAD" }, "provenance": { - "python": "3.14.0", - "platform": "Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39", - "codeentropy_version": "1.0.7", - "git_sha": "226b37f7b206adba1b60253c41c7a0d467e75a58" + "python": "3.13.5", + "platform": "Linux-6.17.0-1011-oem-x86_64-with-glibc2.39", + "codeentropy_version": "2.0.0", + "git_sha": "44bd758498dfdd07eb391ee86e6ff2d86ea2bcb8" }, "groups": { "0": { @@ -35,9 +36,10 @@ "residue:FTmat-Transvibrational": 93.59616431728384, "residue:FTmat-Rovibrational": 59.61417719536213, "united_atom:Conformational": 0.0, - "residue:Conformational": 0.0 + "residue:Conformational": 0.0, + "residue:Orientational": 16.370329454731248 }, - "total": 238.9590441528269 + "total": 255.32937360755813 } } } diff --git a/tests/regression/helpers.py b/tests/regression/helpers.py index 0404b967..9f27b41b 100644 --- a/tests/regression/helpers.py +++ b/tests/regression/helpers.py @@ -7,7 +7,7 @@ import urllib.request from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any import yaml @@ -30,7 +30,7 @@ class RunResult: workdir: Path job_dir: Path output_json: Path - payload: Dict[str, Any] + payload: dict[str, Any] stdout: str stderr: str @@ -217,7 +217,7 @@ def _pick_output_json(job_dir: Path) -> Path: return jsons[0] -def _resolve_path(value: Any, *, base_dir: Path) -> Optional[str]: +def _resolve_path(value: Any, *, base_dir: Path) -> str | None: """Resolve a path-like config value to an absolute path string. Paths beginning with '.testdata/' are resolved relative to the repository root. @@ -271,8 +271,8 @@ def _resolve_path_list(value: Any, *, base_dir: Path) -> list[str]: def _abspathify_config_paths( - config: Dict[str, Any], *, base_dir: Path -) -> Dict[str, Any]: + config: dict[str, Any], *, base_dir: Path +) -> dict[str, Any]: """Convert configured input paths into absolute paths. Args: @@ -285,7 +285,7 @@ def _abspathify_config_paths( path_keys = {"force_file"} list_path_keys = {"top_traj_file"} - out: Dict[str, Any] = {} + out: dict[str, Any] = {} for run_name, run_cfg in config.items(): if not isinstance(run_cfg, dict): out[run_name] = run_cfg @@ -303,7 +303,7 @@ def _abspathify_config_paths( return out -def _assert_inputs_exist(cooked: Dict[str, Any]) -> None: +def _assert_inputs_exist(cooked: dict[str, Any]) -> None: """Assert that required input files referenced in cooked config exist.""" run1 = cooked.get("run1") if not isinstance(run1, dict): @@ -369,8 +369,7 @@ def run_codeentropy_with_config(*, workdir: Path, config_src: Path) -> RunResult proc = subprocess.run( ["python", "-m", "CodeEntropy"], cwd=str(workdir), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + capture_output=True, text=True, env={**os.environ}, ) diff --git a/tests/regression/test_regression.py b/tests/regression/test_regression.py index f877899c..7b5e6da3 100644 --- a/tests/regression/test_regression.py +++ b/tests/regression/test_regression.py @@ -2,7 +2,7 @@ import json from pathlib import Path -from typing import Any, Dict +from typing import Any import numpy as np import pytest @@ -10,7 +10,7 @@ from tests.regression.helpers import run_codeentropy_with_config -def _group_index(payload: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: +def _group_index(payload: dict[str, Any]) -> dict[str, dict[str, Any]]: """Return the groups mapping from a regression payload. Args: @@ -30,8 +30,8 @@ def _group_index(payload: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: def _compare_grouped( *, - got_payload: Dict[str, Any], - baseline_payload: Dict[str, Any], + got_payload: dict[str, Any], + baseline_payload: dict[str, Any], rtol: float, atol: float, ) -> None: diff --git a/tests/unit/CodeEntropy/core/logging/test_export_console.py b/tests/unit/CodeEntropy/core/logging/test_export_console.py index 65f5276f..602cc8e9 100644 --- a/tests/unit/CodeEntropy/core/logging/test_export_console.py +++ b/tests/unit/CodeEntropy/core/logging/test_export_console.py @@ -11,7 +11,7 @@ def test_export_console_writes_recorded_output(config): out_path = os.path.join(config.log_dir, "out.txt") assert os.path.exists(out_path) - with open(out_path, "r", encoding="utf-8") as f: + with open(out_path, encoding="utf-8") as f: assert f.read() == "HELLO" config.console.export_text.assert_called_once() diff --git a/tests/unit/CodeEntropy/entropy/nodes/test_orientational_node.py b/tests/unit/CodeEntropy/entropy/nodes/test_orientational_node.py new file mode 100644 index 00000000..8118b8c6 --- /dev/null +++ b/tests/unit/CodeEntropy/entropy/nodes/test_orientational_node.py @@ -0,0 +1,39 @@ +from unittest.mock import MagicMock + +from CodeEntropy.entropy.nodes.orientational import OrientationalEntropyNode + + +def test_config_node_run_writes_results(shared_data): + node = OrientationalEntropyNode() + + shared_data["levels"] = {0: ["united_atom", "residue"]} + shared_data["groups"] = {0: [0]} + shared_data["neighbors"] = {0: 0} + shared_data["symmetry_number"] = {0: 0} + shared_data["linear"] = {0: False} + + out = node.run(shared_data) + + assert "orientational_entropy" in out + assert "orientational_entropy" in shared_data + assert 0 in shared_data["orientational_entropy"] + + +def test_run_skips_empty_mol_ids_group(): + node = OrientationalEntropyNode() + + shared_data = { + "n_frames": 5, + "groups": {0: []}, + "levels": {0: ["united_atom"]}, + "reduced_universe": MagicMock(), + "neighbors": {0: 0}, + "symmetry_number": {0: 0}, + "linear": {0: False}, + "reporter": None, + } + + out = node.run(shared_data) + + assert "orientational_entropy" in out + assert out["orientational_entropy"][0] == 0.0 diff --git a/tests/unit/CodeEntropy/entropy/test_configurational_edges.py b/tests/unit/CodeEntropy/entropy/test_configurational_edges.py index 1b8f9792..52da799a 100644 --- a/tests/unit/CodeEntropy/entropy/test_configurational_edges.py +++ b/tests/unit/CodeEntropy/entropy/test_configurational_edges.py @@ -1,90 +1,12 @@ -import logging -from types import SimpleNamespace -from unittest.mock import MagicMock - -import numpy as np -import pytest - from CodeEntropy.entropy.configurational import ConformationalEntropy -def test_validate_assignment_config_step_must_be_positive(): - ce = ConformationalEntropy() - with pytest.raises(ValueError): - ce.assign_conformation( - data_container=SimpleNamespace(trajectory=list(range(5))), - dihedral=MagicMock(value=lambda: 10.0), - number_frames=5, - bin_width=30, - start=0, - end=5, - step=0, - ) - - -def test_validate_assignment_config_bin_width_out_of_range(): - ce = ConformationalEntropy() - with pytest.raises(ValueError): - ce.assign_conformation( - data_container=SimpleNamespace(trajectory=list(range(5))), - dihedral=MagicMock(value=lambda: 10.0), - number_frames=5, - bin_width=0, - start=0, - end=5, - step=1, - ) - - -def test_validate_assignment_config_warns_when_bin_width_not_dividing_360(caplog): - ce = ConformationalEntropy() - caplog.set_level(logging.WARNING) - - data_container = SimpleNamespace(trajectory=list(range(5))) - dihedral = MagicMock() - dihedral.value.return_value = 10.0 - - ce.assign_conformation( - data_container=data_container, - dihedral=dihedral, - number_frames=5, - bin_width=7, - start=0, - end=5, - step=1, - ) - - assert any("does not evenly divide 360" in r.message for r in caplog.records) - - -def test_collect_dihedral_angles_normalizes_negative_values(): - ce = ConformationalEntropy() - - traj_slice = list(range(3)) - dihedral = MagicMock() - dihedral.value.side_effect = [-10.0, 0.0, 10.0] - - phi = ce._collect_dihedral_angles(traj_slice, dihedral) - - assert phi[0] == pytest.approx(350.0) - - def test_to_1d_array_returns_none_for_non_iterable_state_input(): ce = ConformationalEntropy() # int is not iterable -> list(states) raises TypeError -> returns None assert ce._to_1d_array(123) is None -def test_find_histogram_peaks_skips_zero_population_bins(): - ce = ConformationalEntropy() - - phi = np.zeros(50, dtype=float) - - peaks = ce._find_histogram_peaks(phi, bin_width=30) - - assert peaks.size >= 1 - - def test_to_1d_array_returns_none_when_states_is_none(): ce = ConformationalEntropy() assert ce._to_1d_array(None) is None diff --git a/tests/unit/CodeEntropy/entropy/test_configurational_entropy_math.py b/tests/unit/CodeEntropy/entropy/test_configurational_entropy_math.py index 7624fbf6..02962316 100644 --- a/tests/unit/CodeEntropy/entropy/test_configurational_entropy_math.py +++ b/tests/unit/CodeEntropy/entropy/test_configurational_entropy_math.py @@ -1,112 +1,17 @@ -from types import SimpleNamespace -from unittest.mock import MagicMock - import numpy as np import pytest from CodeEntropy.entropy.configurational import ConformationalEntropy -def test_find_histogram_peaks_empty_histogram_returns_empty(): - ce = ConformationalEntropy() - phi = np.zeros(100, dtype=float) - - peaks = ce._find_histogram_peaks(phi, bin_width=30) - - assert isinstance(peaks, np.ndarray) - assert peaks.dtype == float - - -def test_find_histogram_peaks_returns_empty_for_empty_phi(): - ce = ConformationalEntropy() - phi = np.array([], dtype=float) - - peaks = ce._find_histogram_peaks(phi, bin_width=30) - - assert isinstance(peaks, np.ndarray) - assert peaks.size == 0 - - -def test_assign_nearest_peaks_with_single_peak_assigns_all_zero(): - ce = ConformationalEntropy() - phi = np.array([0.0, 10.0, 20.0], dtype=float) - peak_values = np.array([15.0], dtype=float) - - states = ce._assign_nearest_peaks(phi, peak_values) - - assert np.all(states == 0) - - -def test_assign_conformation_no_peaks_returns_all_zero(): - ce = ConformationalEntropy() - - data_container = SimpleNamespace(trajectory=[]) - dihedral = MagicMock() - - states = ce.assign_conformation( - data_container=data_container, - dihedral=dihedral, - number_frames=0, - bin_width=30, - start=0, - end=0, - step=1, - ) - - assert states.size == 0 - - -def test_assign_conformation_fallback_when_peak_finder_returns_empty(monkeypatch): - ce = ConformationalEntropy() - data_container = SimpleNamespace(trajectory=list(range(5))) - dihedral = MagicMock() - dihedral.value.return_value = 10.0 - - monkeypatch.setattr( - ce, "_find_histogram_peaks", lambda phi, bw: np.array([], dtype=float) - ) - - states = ce.assign_conformation( - data_container=data_container, - dihedral=dihedral, - number_frames=5, - bin_width=30, - start=0, - end=5, - step=1, - ) - assert np.all(states == 0) - - -def test_assign_conformation_detects_multiple_states(): - ce = ConformationalEntropy() - - values = [0.0] * 50 + [180.0] * 50 - data_container = SimpleNamespace(trajectory=list(range(len(values)))) - dihedral = MagicMock() - dihedral.value.side_effect = values - - states = ce.assign_conformation( - data_container=data_container, - dihedral=dihedral, - number_frames=len(values), - bin_width=30, - start=0, - end=len(values), - step=1, - ) - - assert len(np.unique(states)) >= 2 - - def test_conformational_entropy_empty_returns_zero(): ce = ConformationalEntropy() - assert ce.conformational_entropy_calculation([], number_frames=10) == 0.0 + assert ce.conformational_entropy_calculation([]) == 0.0 def test_conformational_entropy_single_state_returns_zero(): ce = ConformationalEntropy() - assert ce.conformational_entropy_calculation([0, 0, 0], number_frames=3) == 0.0 + assert ce.conformational_entropy_calculation([0, 0, 0]) == 0.0 def test_conformational_entropy_known_distribution_matches_expected(): @@ -116,5 +21,5 @@ def test_conformational_entropy_known_distribution_matches_expected(): probs = np.array([2 / 6, 3 / 6, 1 / 6], dtype=float) expected = -ce._GAS_CONST * float(np.sum(probs * np.log(probs))) - got = ce.conformational_entropy_calculation(states, number_frames=6) + got = ce.conformational_entropy_calculation(states) assert got == pytest.approx(expected) diff --git a/tests/unit/CodeEntropy/entropy/test_graph.py b/tests/unit/CodeEntropy/entropy/test_graph.py index 92f4e2f3..e80069d5 100644 --- a/tests/unit/CodeEntropy/entropy/test_graph.py +++ b/tests/unit/CodeEntropy/entropy/test_graph.py @@ -11,11 +11,13 @@ def test_build_creates_expected_nodes_and_edges(): assert set(g._nodes.keys()) == { "vibrational_entropy", "configurational_entropy", + "orientational_entropy", "aggregate_entropy", } assert g._graph.has_edge("vibrational_entropy", "aggregate_entropy") assert g._graph.has_edge("configurational_entropy", "aggregate_entropy") + assert g._graph.has_edge("orientational_entropy", "aggregate_entropy") def test_execute_runs_nodes_in_topological_order_and_merges_dict_outputs(shared_data): diff --git a/tests/unit/CodeEntropy/entropy/test_orientational_entropy.py b/tests/unit/CodeEntropy/entropy/test_orientational_entropy.py index 1411d08d..caa52ddf 100644 --- a/tests/unit/CodeEntropy/entropy/test_orientational_entropy.py +++ b/tests/unit/CodeEntropy/entropy/test_orientational_entropy.py @@ -2,53 +2,33 @@ from CodeEntropy.entropy.orientational import OrientationalEntropy - -def test_orientational_skips_water_species(): - oe = OrientationalEntropy(None, None, None, None, None) - res = oe.calculate({"WAT": 10, "LIG": 2}) - assert res.total > 0 +_GAS_CONST: float = 8.3144598484848 def test_orientational_negative_count_raises(): - oe = OrientationalEntropy(None, None, None, None, None) + oe = OrientationalEntropy(_GAS_CONST) with pytest.raises(ValueError): - oe.calculate({"LIG": -1}) + oe.calculate(-1, 1, False) def test_orientational_zero_count_contributes_zero(): - oe = OrientationalEntropy(None, None, None, None, None) - res = oe.calculate({"LIG": 0}) - assert res.total == 0.0 - - -def test_orientational_skips_water_resname_all_uppercase(): - oe = OrientationalEntropy(None, None, None, None, None) - res = oe.calculate({"WAT": 10}) - assert res.total == 0.0 - - -def test_orientational_entropy_skips_water_species(): - oe = OrientationalEntropy(None, None, None, None, None) - res = oe.calculate({"WAT": 10, "Na+": 2}) - assert res.total > 0.0 - - -def test_orientational_calculate_only_water_returns_zero(): - oe = OrientationalEntropy(None, None, None, None, None) - res = oe.calculate({"WAT": 5}) - assert res.total == 0.0 - + oe = OrientationalEntropy(_GAS_CONST) + assert oe.calculate(0, 1, False) == 0.0 -def test_calculate_skips_water_species_branch(): - oe = OrientationalEntropy(None, None, None, None, None) - out = oe.calculate({"WAT": 10, "Na+": 2}) - assert out.total > 0.0 +def test_omega_linear(): + oe = OrientationalEntropy(_GAS_CONST) + omega = oe._omega(6, 2, True) + assert omega == 3.0 -def test_entropy_contribution_returns_zero_when_omega_nonpositive(monkeypatch): - oe = OrientationalEntropy(None, None, None, None, None) +def test_omega_nonlinear(): + oe = OrientationalEntropy(_GAS_CONST) + omega = oe._omega(6, 2, False) + assert omega == pytest.approx(13.02482) - monkeypatch.setattr(OrientationalEntropy, "_omega", staticmethod(lambda n: 0.0)) - assert oe._entropy_contribution(5) == 0.0 +def test_omega_no_symmetry(): + oe = OrientationalEntropy(_GAS_CONST) + omega = oe._omega(6, 0, False) + assert omega == 1.0 diff --git a/tests/unit/CodeEntropy/levels/test_neighbors.py b/tests/unit/CodeEntropy/levels/test_neighbors.py new file mode 100644 index 00000000..9f1c7d83 --- /dev/null +++ b/tests/unit/CodeEntropy/levels/test_neighbors.py @@ -0,0 +1,238 @@ +import contextlib +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from CodeEntropy.levels.neighbors import Neighbors + + +class _FakeProgress: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def add_task(self, *args, **kwargs): + return 1 + + def advance(self, *args, **kwargs): + return None + + +@contextlib.contextmanager +def _fake_progress_bar(*_args, **_kwargs): + yield _FakeProgress() + + +def test_raises_error_unknown_search_type(): + neighbors = Neighbors() + + universe = MagicMock() + universe.trajectory.__len__.return_value = 2 + levels = {0: ["united_atom"]} + groups = {0: [0]} + search_type = "weird" + + with pytest.raises(ValueError): + neighbors.get_neighbors(universe, levels, groups, search_type) + + +def test_average_number_neighbors_RAD(): + neighbors = Neighbors() + + universe = MagicMock() + universe.trajectory.__len__.return_value = 2 + levels = {0: ["united_atom"]} + groups = {0: [0]} + search_type = "RAD" + + neighbors._search.get_RAD_neighbors = MagicMock(side_effect=[[1, 2, 3], [1, 3]]) + + result = neighbors.get_neighbors(universe, levels, groups, search_type) + + assert result == {0: np.float64(2.5)} + + +def test_average_number_neighbors_grid(): + neighbors = Neighbors() + + universe = MagicMock() + universe.trajectory.__len__.return_value = 2 + levels = {0: ["united_atom"]} + groups = {0: [0]} + search_type = "grid" + + neighbors._search.get_grid_neighbors = MagicMock(side_effect=[[1, 2, 3], [1, 3]]) + + result = neighbors.get_neighbors(universe, levels, groups, search_type) + + assert result == {0: np.float64(2.5)} + + +def test_average_number_neighbors_RAD_multiple(): + neighbors = Neighbors() + + universe = MagicMock() + universe.trajectory.__len__.return_value = 2 + levels = {0: ["united_atom"]} + groups = {0: [0, 1]} + search_type = "RAD" + + neighbors._search.get_RAD_neighbors = MagicMock( + side_effect=[[1, 2, 3, 5], [1, 3], [2, 3, 4, 5], [3, 5]] + ) + + result = neighbors.get_neighbors(universe, levels, groups, search_type) + + assert result == {0: np.float64(3.0)} + + +def test_get_symmetry_number_res(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + number_heavy = 3 + number_hydrogen = 8 + + rdkit_mol.GetSubstructMatches = MagicMock( + side_effect=[((0, 1, 2), (0, 2, 1), (1, 0, 2))] + ) + + class _FakeRDKit_Chem: + """Class to mock rdkit functionality.""" + + def RemoveHs(mol): + rdkit_heavy = MagicMock() + return rdkit_heavy + + with patch("CodeEntropy.levels.neighbors.Chem", _FakeRDKit_Chem): + result = neighbors._get_symmetry_number( + rdkit_mol, number_heavy, number_hydrogen + ) + + assert result == 3 + + +def test_get_symmetry_number_ua(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + number_heavy = 1 + number_hydrogen = 2 + + rdkit_mol.GetSubstructMatches = MagicMock(side_effect=[((0, 1, 2), (0, 2, 1))]) + + result = neighbors._get_symmetry_number(rdkit_mol, number_heavy, number_hydrogen) + + assert result == 2 + + +def test_get_symmetry_number_sphere(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + number_heavy = 1 + number_hydrogen = 0 + + rdkit_mol.GetSubstructMatches = MagicMock(side_effect=[((0, 1, 2), (0, 2, 1))]) + + result = neighbors._get_symmetry_number(rdkit_mol, number_heavy, number_hydrogen) + + assert result == 0 + + +def test_get_linear_ua(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + number_heavy = 1 + + class _FakeRDKit_Chem: + """Class to mock rdkit functionality.""" + + def RemoveHs(mol): + rdkit_heavy = MagicMock() + return rdkit_heavy + + with patch("CodeEntropy.levels.neighbors.Chem", _FakeRDKit_Chem): + result = neighbors._get_linear(rdkit_mol, number_heavy) + + assert not result + + +def test_get_linear_diatomic(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + number_heavy = 2 + + class _FakeRDKit_Chem: + """Class to mock rdkit functionality.""" + + def RemoveHs(mol): + rdkit_heavy = MagicMock() + return rdkit_heavy + + with patch("CodeEntropy.levels.neighbors.Chem", _FakeRDKit_Chem): + result = neighbors._get_linear(rdkit_mol, number_heavy) + + assert result + + +def test_get_linear_true(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + rdkit_heavy = MagicMock() + number_heavy = 3 + a1 = MagicMock() + a2 = MagicMock() + a3 = MagicMock() + + class _FakeRDKit_Chem: + """Class to mock rdkit functionality.""" + + def RemoveHs(mol): + rdkit_heavy = MagicMock() + return rdkit_heavy + + class HybridizationType: + def SP(): + return "SP" + + rdkit_heavy.GetAtoms = MagicMock(return_value=[a1, a2, a3]) + a1.GetHybridization = MagicMock(return_value="SP2") + a2.GetHybridization = MagicMock(return_value="SP") + a3.GetHybridization = MagicMock(return_value="SP3") + + with patch("CodeEntropy.levels.neighbors.Chem", _FakeRDKit_Chem): + result = neighbors._get_linear(rdkit_mol, number_heavy) + + assert result + + +def test_get_linear_false(): + neighbors = Neighbors() + rdkit_mol = MagicMock() + rdkit_heavy = MagicMock() + number_heavy = 3 + a1 = MagicMock() + a2 = MagicMock() + a3 = MagicMock() + + class _FakeRDKit_Chem: + """Class to mock rdkit functionality.""" + + def RemoveHs(mol): + rdkit_heavy = MagicMock() + return rdkit_heavy + + class HybridizationType: + def SP(): + return "SP" + + rdkit_heavy.GetAtoms = MagicMock(return_value=[a1, a2, a3]) + a1.GetHybridization = MagicMock(return_value="SP3") + a2.GetHybridization = MagicMock(return_value="SP3") + a3.GetHybridization = MagicMock(return_value="SP3") + + with patch("CodeEntropy.levels.neighbors.Chem", _FakeRDKit_Chem): + result = neighbors._get_linear(rdkit_mol, number_heavy) + + assert not result diff --git a/tests/unit/CodeEntropy/levels/test_search.py b/tests/unit/CodeEntropy/levels/test_search.py new file mode 100644 index 00000000..5f99c096 --- /dev/null +++ b/tests/unit/CodeEntropy/levels/test_search.py @@ -0,0 +1,48 @@ +import numpy as np +import pytest + +from CodeEntropy.levels.search import Search + +# some dummy atom positions +a = np.array([0, 0, 1]) +b = np.array([0, 1, 0]) +c = np.array([1, 0, 0]) +d = np.array([0, 1, 1]) +e = np.array([0, 11, 11]) +dimensions = np.array([10, 10, 10]) + + +def test_get_angle(): + search = Search() + result1 = search.get_angle(a, b, c, dimensions) + result2 = search.get_angle(a, b, d, dimensions) + + assert result1 == 0.5 + assert result2 == pytest.approx(0.7071067811865477) + + +def test_angle_boundary_conditions(): + search = Search() + + result = search.get_angle(a, b, e, dimensions) + + assert result == pytest.approx(0.7071067811865477) + + +def test_distance(): + search = Search() + distance1 = search.get_distance(a, b, dimensions) + distance2 = search.get_distance(a, d, dimensions) + distance3 = search.get_distance(c, d, dimensions) + + assert distance1 == pytest.approx(1.4142135623730951) + assert distance2 == 1.0 + assert distance3 == pytest.approx(1.7320508075688772) + + +def test_distance_boundary_conditions(): + search = Search() + + distance4 = search.get_distance(c, e, dimensions) + + assert distance4 == pytest.approx(1.7320508075688772) diff --git a/tests/unit/CodeEntropy/results/test_reporter.py b/tests/unit/CodeEntropy/results/test_reporter.py index f21b0148..a0c4f26e 100644 --- a/tests/unit/CodeEntropy/results/test_reporter.py +++ b/tests/unit/CodeEntropy/results/test_reporter.py @@ -329,8 +329,7 @@ def fake_resolve(self): assert ResultsReporter._try_get_git_sha() == "sha" _args, kwargs = mock_run.call_args - assert "stdout" in kwargs - assert "stderr" in kwargs + assert "capture_output" in kwargs assert kwargs.get("text") is True