Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions python/pacemaker/_cts/CTS.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pacemaker._cts.environment import EnvFactory
from pacemaker._cts.input import should_continue
from pacemaker._cts import logging
from pacemaker._cts.remote import RemoteFactory
from pacemaker._cts.remote import RemoteExec


class CtsLab:
Expand Down Expand Up @@ -60,9 +60,9 @@ def __contains__(self, key):
def __getitem__(self, key):
"""Return the given environment key, or raise KeyError if it does not exist."""
# Throughout this file, pylint has trouble understanding that EnvFactory
# and RemoteFactory are singleton instances that can be treated as callable
# and subscriptable objects. Various warnings are disabled because of this.
# See also a comment about self._rsh in environment.py.
# is a singleton instance that can be treated as a callable and subscriptable
# object. Various warnings are disabled because of this. See also a comment
# about self._rsh in environment.py.
# pylint: disable=unsubscriptable-object
return self._env[key]

Expand Down Expand Up @@ -133,14 +133,12 @@ def __init__(self, env):

def _node_booted(self, node):
"""Return True if the given node is booted (responds to pings)."""
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()("localhost", f"ping -nq -c1 -w1 {node}", verbose=0)
(rc, _) = RemoteExec().call("localhost", f"ping -nq -c1 -w1 {node}", verbose=0)
return rc == 0

def _sshd_up(self, node):
"""Return true if sshd responds on the given node."""
# pylint: disable=not-callable
(rc, _) = RemoteFactory().getInstance()(node, "true", verbose=0)
(rc, _) = RemoteExec().call(node, "true", verbose=0)
return rc == 0

def wait_for_node(self, node, timeout=300):
Expand Down Expand Up @@ -225,7 +223,7 @@ def signal(self, sig, node):
search_re = f"({word_begin}valgrind )?.*{word_begin}{self.name}{word_end}"

if sig in ["SIGKILL", "KILL", 9, "SIGTERM", "TERM", 15]:
(rc, _) = self._cm.rsh(node, f"pgrep --full '{search_re}'")
(rc, _) = self._cm.rsh.call(node, f"pgrep --full '{search_re}'")
if rc == 1:
# No matching process, so nothing to kill/terminate
return
Expand All @@ -237,6 +235,6 @@ def signal(self, sig, node):
# 0: One or more processes were successfully signaled.
# 1: No processes matched or none of them could be signalled.
# This is why we check for no matching process above.
(rc, _) = self._cm.rsh(node, f"pkill --signal {sig} --full '{search_re}'")
(rc, _) = self._cm.rsh.call(node, f"pkill --signal {sig} --full '{search_re}'")
if rc != 0:
self._cm.log(f"ERROR: Sending signal {sig} to {self.name} failed on node {node}")
53 changes: 25 additions & 28 deletions python/pacemaker/_cts/audits.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Auditing classes for Pacemaker's Cluster Test Suite (CTS)."""

__all__ = ["AuditConstraint", "AuditResource", "ClusterAudit", "audit_list"]
__copyright__ = "Copyright 2000-2025 the Pacemaker project contributors"
__copyright__ = "Copyright 2000-2026 the Pacemaker project contributors"
__license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"

import re
Expand Down Expand Up @@ -82,16 +82,16 @@ def _restart_cluster_logging(self, nodes=None):

for node in nodes:
if self._cm.env["have_systemd"]:
(rc, _) = self._cm.rsh(node, "systemctl stop systemd-journald.socket")
(rc, _) = self._cm.rsh.call(node, "systemctl stop systemd-journald.socket")
if rc != 0:
self._cm.log(f"ERROR: Cannot stop 'systemd-journald' on {node}")

(rc, _) = self._cm.rsh(node, "systemctl start systemd-journald.service")
(rc, _) = self._cm.rsh.call(node, "systemctl start systemd-journald.service")
if rc != 0:
self._cm.log(f"ERROR: Cannot start 'systemd-journald' on {node}")

if "syslogd" in self._cm.env:
(rc, _) = self._cm.rsh(node, f"service {self._cm.env['syslogd']} restart")
(rc, _) = self._cm.rsh.call(node, f"service {self._cm.env['syslogd']} restart")
if rc != 0:
self._cm.log(f"""ERROR: Cannot restart '{self._cm.env["syslogd"]}' on {node}""")

Expand Down Expand Up @@ -136,10 +136,7 @@ def _test_logging(self):

for node in self._cm.env["nodes"]:
cmd = f"logger -p {self._cm.env['syslog_facility']}.info {prefix} {node} {suffix}"

(rc, _) = self._cm.rsh(node, cmd, synchronous=False, verbose=0)
if rc != 0:
self._cm.log(f"ERROR: Cannot execute remote command [{cmd}] on {node}")
self._cm.rsh.call_async(node, cmd)

for k, w in watch.items():
if watch_pref is None:
Expand Down Expand Up @@ -213,7 +210,7 @@ def __call__(self):

self._cm.ns.wait_for_all_nodes(self._cm.env["nodes"])
for node in self._cm.env["nodes"]:
(_, dfout) = self._cm.rsh(node, dfcmd, verbose=1)
(_, dfout) = self._cm.rsh.call(node, dfcmd, verbose=1)
if not dfout:
self._cm.log(f"ERROR: Cannot execute remote df command [{dfcmd}] on {node}")
continue
Expand Down Expand Up @@ -282,13 +279,13 @@ def _output_has_core(self, output, node):

def _find_core_with_coredumpctl(self, node):
"""Use coredumpctl to find core dumps on the given node."""
(_, lsout) = self._cm.rsh(node, "coredumpctl --no-legend --no-pager")
(_, lsout) = self._cm.rsh.call(node, "coredumpctl --no-legend --no-pager")
return self._output_has_core(lsout, node)

def _find_core_on_fs(self, node, paths):
"""Check for core dumps on the given node, under any of the given paths."""
(_, lsout) = self._cm.rsh(node, f"ls -al {' '.join(paths)} | grep core.[0-9]",
verbose=1)
(_, lsout) = self._cm.rsh.call(node, f"ls -al {' '.join(paths)} | grep core.[0-9]",
verbose=1)
return self._output_has_core(lsout, node)

def __call__(self):
Expand Down Expand Up @@ -320,20 +317,20 @@ def __call__(self):

if self._cm.expected_status.get(node) == "down":
clean = False
(_, lsout) = self._cm.rsh(node, "ls -al /dev/shm | grep qb-", verbose=1)
(_, lsout) = self._cm.rsh.call(node, "ls -al /dev/shm | grep qb-", verbose=1)

for line in lsout:
passed = False
clean = True
self._cm.log(f"Warning: Stale IPC file on {node}: {line}")

if clean:
(_, lsout) = self._cm.rsh(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)
(_, lsout) = self._cm.rsh.call(node, "ps axf | grep -e pacemaker -e corosync", verbose=1)

for line in lsout:
self._cm.debug(f"ps[{node}]: {line}")

self._cm.rsh(node, "rm -rf /dev/shm/qb-*")
self._cm.rsh.call(node, "rm -rf /dev/shm/qb-*")

else:
self._cm.debug(f"Skipping {node}")
Expand Down Expand Up @@ -511,8 +508,8 @@ def _setup(self):
self.debug(f"No nodes active - skipping {self.name}")
return False

(_, lines) = self._cm.rsh(self._target, "crm_resource --list-cts",
verbose=1)
(_, lines) = self._cm.rsh.call(self._target, "crm_resource --list-cts",
verbose=1)

for line in lines:
if re.search("^Resource", line):
Expand Down Expand Up @@ -670,9 +667,9 @@ def __init__(self, cm):

def _crm_location(self, resource):
"""Return a list of cluster nodes where a given resource is running."""
(rc, lines) = self._cm.rsh(self._target,
f"crm_resource --locate -r {resource} -Q",
verbose=1)
(rc, lines) = self._cm.rsh.call(self._target,
f"crm_resource --locate -r {resource} -Q",
verbose=1)
hosts = []

if rc == 0:
Expand Down Expand Up @@ -823,7 +820,7 @@ def _audit_cib_contents(self, hostlist):
passed = False

else:
(rc, result) = self._cm.rsh(
(rc, result) = self._cm.rsh.call(
node0, f"crm_diff -VV -cf --new {node_xml} --original {node0_xml}", verbose=1)

if rc != 0:
Expand All @@ -850,16 +847,16 @@ def _store_remote_cib(self, node, target):
if not target:
target = node

(rc, lines) = self._cm.rsh(node, self._cm.templates["CibQuery"], verbose=1)
(rc, lines) = self._cm.rsh.call(node, self._cm.templates["CibQuery"], verbose=1)
if rc != 0:
self._cm.log("Could not retrieve configuration")
return None

self._cm.rsh("localhost", f"rm -f {filename}")
self._cm.rsh.call("localhost", f"rm -f {filename}")
for line in lines:
self._cm.rsh("localhost", f"echo \'{line[:-1]}\' >> {filename}", verbose=0)
self._cm.rsh.call("localhost", f"echo \'{line[:-1]}\' >> {filename}", verbose=0)

if self._cm.rsh.copy(filename, f"root@{target}:{filename}", silent=True) != 0:
if self._cm.rsh.copy(filename, f"root@{target}:{filename}") != 0:
self._cm.log("Could not store configuration")
return None

Expand Down Expand Up @@ -960,13 +957,13 @@ def _audit_partition(self, partition):
# not in itself a reason to fail the audit (not what we're
# checking for in this audit)

(_, out) = self._cm.rsh(node, self._cm.templates["StatusCmd"] % node, verbose=1)
(_, out) = self._cm.rsh.call(node, self._cm.templates["StatusCmd"] % node, verbose=1)
self._node_state[node] = out[0].strip()

(_, out) = self._cm.rsh(node, self._cm.templates["EpochCmd"], verbose=1)
(_, out) = self._cm.rsh.call(node, self._cm.templates["EpochCmd"], verbose=1)
self._node_epoch[node] = out[0].strip()

(_, out) = self._cm.rsh(node, self._cm.templates["QuorumCmd"], verbose=1)
(_, out) = self._cm.rsh.call(node, self._cm.templates["QuorumCmd"], verbose=1)
self._node_quorum[node] = out[0].strip()

self.debug(f"Node {node}: {self._node_state[node]} - {self._node_epoch[node]} - {self._node_quorum[node]}.")
Expand Down
10 changes: 5 additions & 5 deletions python/pacemaker/_cts/cib.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, cm, version, factory, tmpfile=None):
def _show(self):
"""Query a cluster node for its generated CIB; log and return the result."""
output = ""
(_, result) = self._factory.rsh(self._factory.target, f"HOME=/root CIB_file={self._factory.tmpfile} cibadmin -Q", verbose=1)
(_, result) = self._factory.rsh.call(self._factory.target, f"HOME=/root CIB_file={self._factory.tmpfile} cibadmin -Q", verbose=1)

for line in result:
output += line
Expand Down Expand Up @@ -105,7 +105,7 @@ def get_node_id(self, node_name):
r"""{gsub(/.*nodeid:\s*/,"");gsub(/\s+.*$/,"");print}' %s""" \
% (node_name, BuildOptions.COROSYNC_CONFIG_FILE)

(rc, output) = self._factory.rsh(self._factory.target, awk, verbose=1)
(rc, output) = self._factory.rsh.call(self._factory.target, awk, verbose=1)

if rc == 0 and len(output) == 1:
try:
Expand All @@ -124,7 +124,7 @@ def install(self, target):

self._factory.tmpfile = f"{BuildOptions.CIB_DIR}/cib.xml"
self.contents(target)
self._factory.rsh(self._factory.target, f"chown {BuildOptions.DAEMON_USER} {self._factory.tmpfile}")
self._factory.rsh.call(self._factory.target, f"chown {BuildOptions.DAEMON_USER} {self._factory.tmpfile}")

self._factory.tmpfile = old

Expand All @@ -138,7 +138,7 @@ def contents(self, target):
if target:
self._factory.target = target

self._factory.rsh(self._factory.target, f"HOME=/root cibadmin --empty {self.version} > {self._factory.tmpfile}")
self._factory.rsh.call(self._factory.target, f"HOME=/root cibadmin --empty {self.version} > {self._factory.tmpfile}")
self._num_nodes = len(self._cm.env["nodes"])

no_quorum = "stop"
Expand Down Expand Up @@ -300,7 +300,7 @@ def contents(self, target):
self._cib = self._show()

if self._factory.tmpfile != f"{BuildOptions.CIB_DIR}/cib.xml":
self._factory.rsh(self._factory.target, f"rm -f {self._factory.tmpfile}")
self._factory.rsh.call(self._factory.target, f"rm -f {self._factory.tmpfile}")

return self._cib

Expand Down
2 changes: 1 addition & 1 deletion python/pacemaker/_cts/cibxml.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _run(self, operation, xml, section, options=""):
fixed = f"HOME=/root CIB_file={self._factory.tmpfile}"
fixed += f" cibadmin --{operation} --scope {section} {options} --xml-text '{xml}'"

(rc, _) = self._factory.rsh(self._factory.target, fixed)
(rc, _) = self._factory.rsh.call(self._factory.target, fixed)
if rc != 0:
raise RuntimeError(f"Configure call failed: {fixed}")

Expand Down
Loading