Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,69 @@ Alternatively, to run the script immediately:

This will trigger the cronjob to spawn a job manually.


### scaled-nb-culler
1. Ensure you are logged in to your OpenShift account via the CLI and you have access to ope-rhods-testing namespace.
Then run:
```
oc project ope-rhods-testing
```

2. Ensure the environment variables are correctly set in `cronjobs/scaled-nb-culler/cronjob`: <br>

Create a json dict for each class, the key must be the group name of the class
The first value should be "cutoff" which is the cutoff time for the class
The next value should be "ns" which is the namespace in which the class is running, or if there are multiple namespaces, the prefix of the namespace that should be matched
The last value should be "multiple-ns" which value should be set to `true` if the class runs in multiple namespaces or `false` without quotes, if the class runs in a single namespace

For example:
```
value: |
{
"cs391": {
"cutoff": 43200,
"ns": "bu-cs391-pmpp",
"multiple-ns": true
},
"ds100": {
"cutoff": 7200,
"ns": "rhods-notebooks",
"multiple-ns": false
},
"cs210": {
"cutoff": 43200,
"ns": "rhods-notebooks",
"multiple-ns": false
},
"dsp562": {
"cutoff": 10800,
"ns": "rhods-notebooks",
"multiple-ns": false
}
}
```
3. Ensure that the namespace value in `kustomization.yaml` is correct.

4. From cronjobs/scaled-nb-culler/ directory run:
```
oc apply -k . --as system:admin
```

This will deploy all the necessary resources for the cronjob to run on the specified schedule.

Alternatively, to run the script immediately:

1. Ensure you followed the steps above
2. Verify the cronjob `scaled-culler` exists
```
oc get cronjob scaled-culler
```

3. Run:
```
kubectl create -n rhods-notebooks job --from=cronjob/scaled-culler scaled-culler
```

### multiple-ns-group-sync
This cronjob runs once every hours at the top of the hour, adding all users with the edit rolebinding in the specified namespaces to the specified group. This cronjob differs from the original `group-sync` cronjob by syncing with multiple namespaces rather than just one namespace.

Expand Down
14 changes: 14 additions & 0 deletions container-images/scaled-nb-culler/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM quay.io/operate-first/opf-toolbox:v0.8.0

# Create the destination directory
WORKDIR /scripts

# Install pip first
RUN dnf install -y python3-pip

# Install requirements first to maximize caching
COPY requirements.txt ./requirements.txt
RUN pip3 install -r requirements.txt

# Install nb-culler application
COPY nb-culler.py helpers.py ./
121 changes: 121 additions & 0 deletions container-images/scaled-nb-culler/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from datetime import datetime, timezone
import openshift_client as oc
from typing import Optional
import logging
import json
import sys

LOG = logging.getLogger(__name__)


def build_user_cutoff_map(single_ns: dict[str, dict]) -> dict[str, dict]:
"""Build a user -> {class, cutoff} lookup from all single-namespace class configs.
If a user belongs to multiple groups, the more lenient (higher) cutoff wins.
"""
user_to_info: dict[str, dict] = {}

for class_name, config in single_ns.items():
cutoff = int(config["cutoff"])
users = get_group_users(class_name)

LOG.info("Loaded group %s (%d users, cutoff=%ss)", class_name, len(users), cutoff)

for u in users:
u = str(u).strip()
if not u:
continue

existing = user_to_info.get(u)
if existing:
if cutoff > existing["cutoff"]:
LOG.warning(
"User %s in multiple groups (%s, %s). Using more lenient cutoff %ss from %s.",
u, existing["class"], class_name, cutoff, class_name,
)
user_to_info[u] = {"class": class_name, "cutoff": cutoff}
else:
user_to_info[u] = {"class": class_name, "cutoff": cutoff}

LOG.info("Built user_to_info map with %d total users", len(user_to_info))
return user_to_info


def parse_rfc3339(ts: str) -> datetime:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))


def as_bool(v) -> bool:
"""Ensure strings passed in are normalized to booleans"""
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.strip().lower() in {"true", "1", "yes", "y"}
return bool(v)


def get_notebook_username(nb: dict) -> Optional[str]:
"""return the notebook user from annotations, if present."""
ann = (nb.get("metadata") or {}).get("annotations") or {}

for k in ("opendatahub.io/username", "notebooks.opendatahub.io/username"):
v = ann.get(k)
if v:
return str(v).strip()
return None


def get_group_users(group_name: str) -> set[str]:
"get users from the class group"
try:
result = oc.invoke("get", ["group", group_name, "-o", "json"])
group = json.loads(result.out())
if not group.get("users"):
LOG.warning("Group %s is empty. This could lead to notebooks being deleted.", group_name)
return set(group.get("users") or [])
except Exception as e:
LOG.error("Failed to get users for group %s: %s", group_name, e)
return set()

def get_running_notebooks(namespace: Optional[str] = None) -> list[dict]:
"""
Return running notebooks. If namespace is given, scope to that namespace;
otherwise query all namespaces and include a 'namespace' key in each result.
"""
if namespace:
ns_args = ["-n", namespace]
jsonpath = (
'{range .items[?(@.status.containerState.running)]}'
'{.metadata.name}{"\\t"}{.status.containerState.running.startedAt}{"\\n"}{end}'
)
else:
ns_args = ["-A"]
jsonpath = (
'{range .items[?(@.status.containerState.running)]}'
'{.metadata.namespace}{"\\t"}{.metadata.name}{"\\t"}'
'{.status.containerState.running.startedAt}{"\\n"}{end}'
)

result = oc.invoke("get", ["notebooks"] + ns_args + ["-o", f"jsonpath={jsonpath}"])

notebooks = []
for line in result.out().strip().splitlines():
parts = line.strip().split("\t")
if namespace and len(parts) == 2:
notebooks.append({"name": parts[0], "startedAt": parts[1]})
elif not namespace and len(parts) == 3:
notebooks.append({"namespace": parts[0], "name": parts[1], "startedAt": parts[2]})
return notebooks


def get_notebook_username_map(namespace: str) -> dict[str, str]:
"""
Fetch all notebooks in a namespace and return a
name -> username map derived from annotations.
"""
result = oc.invoke("get", ["notebooks", "-n", namespace, "-o", "json"])
items = json.loads(result.out()).get("items", [])
return {
item["metadata"]["name"]: username
for item in items
if (username := get_notebook_username(item))
}
165 changes: 165 additions & 0 deletions container-images/scaled-nb-culler/nb-culler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import os
import sys
import openshift_client as oc
import logging
import json
from datetime import datetime, timezone
from helpers import (
parse_rfc3339,
as_bool,
build_user_cutoff_map,
get_running_notebooks,
get_notebook_username_map,
)

LOG = logging.getLogger(__name__)


def get_class_ns(culler: dict) -> tuple[dict, dict]:
"""
build multi ns and single ns dicts
"""
multi_ns: dict[str, dict] = {}
single_ns: dict[str, dict] = {}

for class_name, config in culler.items():
cutoff = int(config["cutoff"])
ns = config["ns"]
mult_ns = as_bool(config.get("multiple-ns", False))

if mult_ns:
multi_ns[class_name] = {
"cutoff": cutoff,
"prefix": ns,
}
else:
single_ns[class_name] = {
"cutoff": cutoff,
"namespace": ns,
}

return multi_ns, single_ns


def stop_notebook(nb_name: str, started_at: str, namespace: str, cutoff_seconds: int) -> bool:
""" Patch notebook if past cutoff. Returns True if stopped. """
start_dt = parse_rfc3339(started_at)
age_seconds = int((datetime.now(timezone.utc) - start_dt).total_seconds())

if age_seconds <= cutoff_seconds:
LOG.info(
"Notebook %s/%s within cutoff (age=%ss < cutoff=%ss)",
namespace, nb_name, age_seconds, cutoff_seconds
)
return False

now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
patch_obj = {
"metadata": {
"annotations": {"kubeflow-resource-stopped": now_utc}
}
}

try:
oc.invoke("patch", ["notebook", nb_name, "-n", namespace, "--type=merge", "-p", json.dumps(patch_obj)])
LOG.info("Patched notebook %s/%s (age=%ss > cutoff=%ss)", namespace, nb_name, age_seconds, cutoff_seconds)
return True
except Exception as e:
LOG.error("Failed to patch notebook %s/%s: %s", namespace, nb_name, e)
return False


def process_single(single_ns: dict[str, dict]) -> None:
if not single_ns:
LOG.info("No single namespace classes configured")
return

namespace = next(iter(single_ns.values()))["namespace"]

LOG.info("Processing shared single namespace %s for %d class(es)", namespace, len(single_ns))

try:
user_to_info = build_user_cutoff_map(single_ns)

with oc.project(namespace):
running = get_running_notebooks(namespace)
username_map = get_notebook_username_map(namespace)

for nb in running:
nb_name = nb["name"]

username = username_map.get(nb_name)
if not username:
LOG.warning("Notebook %s missing username annotation, skipping", nb_name)
continue

info = user_to_info.get(username)
if not info:
try:
pvc = f"jupyterhub-nb-{nb_name.removeprefix('jupyter-nb-')}-pvc"
oc.invoke("delete", ["notebook", nb_name, "-n", namespace])
oc.invoke("delete", ["pvc", pvc, "-n", namespace])

LOG.info("Deleted notebook %s and pvc %s in namespace %s", nb_name, pvc, namespace)
except Exception as e:
LOG.error("Failed deleting notebook %s or pvc in namespace %s: %s", nb_name,namespace, e)
continue

stop_notebook(nb_name, nb["startedAt"], namespace, info["cutoff"])

except Exception as e:
LOG.error("Error processing shared single namespace %s: %s", namespace, e)


def process_multi(multi_ns: dict[str, dict]) -> None:
matchers: list[tuple[str, int]] = []
for class_name, cfg in multi_ns.items():
prefix = str(cfg["prefix"]).strip()
cutoff = int(cfg["cutoff"])
if prefix:
matchers.append((prefix, cutoff))

if not matchers:
LOG.info("No multi-namespace classes configured")
return

matchers.sort(key=lambda x: len(x[0]), reverse=True)

try:
running_all = get_running_notebooks()
except Exception as e:
LOG.error("Failed to list running notebooks across all namespaces: %s", e)
return

matched = 0
for nb in running_all:
ns = nb["namespace"]

cutoff = None
for prefix, c in matchers:
if ns == prefix or ns.startswith(prefix + "-"):
cutoff = c if cutoff is None else max(cutoff, c)

if cutoff is None:
continue

matched += 1
stop_notebook(nb["name"], nb["startedAt"], ns, cutoff)

LOG.info("Matched %d running notebooks in multi namespaces", matched)


if __name__ == '__main__':
logging.basicConfig(level='INFO')

culler_dict = json.loads(os.environ["CULLER_DICT"])

if not culler_dict:
LOG.error('CULLER_DICT environment variables is required.')
sys.exit(1)

class_info = get_class_ns(culler_dict)
multi_ns, single_ns = class_info

process_single(single_ns)
process_multi(multi_ns)
1 change: 1 addition & 0 deletions container-images/scaled-nb-culler/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
openshift-client==2.0.5
Loading
Loading