forked from OCP-on-NERC/python-batchtools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbps.py
More file actions
117 lines (99 loc) · 3.91 KB
/
bps.py
File metadata and controls
117 lines (99 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing_extensions import override
from typing import cast
from collections import defaultdict
import sys
import argparse
import openshift_client as oc
from .basecommand import Command
from .basecommand import SubParserFactory
class ListPodsCommandArgs(argparse.Namespace):
verbose: int = 0
node_names: list[str] = []
class ListPodsCommand(Command):
"""
List active GPU pods per node. By default prints only BUSY nodes.
With -v/--verbose, prints FREE for nodes seen with Running pods but 0 GPUs.
"""
name: str = "bps"
help: str = "List active GPU pods per node"
@classmethod
@override
def build_parser(cls, subparsers: SubParserFactory):
p = super().build_parser(subparsers)
p.add_argument("node_names", nargs="*", help="Optional list of node names")
return p
@staticmethod
@override
def run(args: argparse.Namespace):
args = cast(ListPodsCommandArgs, args)
try:
with oc.timeout(120):
all_pods = oc.selector("pods", all_namespaces=True).objects()
if args.node_names:
# get individual nodes without repeats
node_set = set(args.node_names)
# Filter to Running pods on requested nodes
pods_for_nodes = [
p
for p in all_pods
if getattr(p.model.status, "phase", None) == "Running"
and (getattr(p.model.spec, "nodeName", None) or "") in node_set
]
# Group by node
pods_by_node = defaultdict(list)
for p in pods_for_nodes:
n = getattr(p.model.spec, "nodeName", None) or ""
pods_by_node[n].append(p)
for node in node_set:
lines = summarize_gpu_pods(
pods_by_node.get(node, []), args.verbose > 0
)
if not lines and args.verbose:
print(f"{node}: FREE")
else:
for ln in lines:
print(ln)
else:
# One global summary over all Running pods
running = [
p
for p in all_pods
if getattr(p.model.status, "phase", None) == "Running"
]
for ln in summarize_gpu_pods(running, args.verbose > 0):
print(ln)
except oc.OpenShiftPythonException as e:
sys.exit(f"Error interacting with OpenShift: {e}")
def summarize_gpu_pods(pods, verbose: bool) -> list[str]:
totals: defaultdict[str, int] = defaultdict(int)
busy_pods: defaultdict[str, set[str]] = defaultdict(set)
seen_nodes: set[str] = set()
for pod in pods or []:
try:
if pod.model.status.phase != "Running":
continue
node = (pod.model.spec.nodeName or "").strip()
if not node:
continue
seen_nodes.add(node)
ns = (pod.model.metadata.namespace or "").strip()
name = (pod.model.metadata.name or "").strip()
pod_id = f"{ns}/{name}" if ns and name else name or ns
for ctr in pod.model.spec.containers or []:
reqs = getattr(ctr.resources, "requests", {}) or {}
g = int(reqs.get("nvidia.com/gpu", 0) or 0)
if g > 0:
totals[node] += g
busy_pods[node].add(pod_id)
except Exception:
continue
lines = []
nodes = sorted(seen_nodes or totals.keys())
for node in nodes:
total = totals.get(node, 0)
if total > 0:
pods_str = " ".join(sorted(busy_pods.get(node, [])))
lines.append(f"{node}: BUSY {total} {pods_str}".rstrip())
elif verbose:
lines.append(f"{node}: FREE")
return lines