diff --git a/.gitignore b/.gitignore index a0c251a..d41f21e 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ challenges/*/terraform/versions.tf .vscode/ .idea +stats.html \ No newline at end of file diff --git a/ctf/__main__.py b/ctf/__main__.py index 0606e1a..daeaceb 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -13,6 +13,7 @@ from typing_extensions import Annotated from ctf import ENV, STATE +from ctf.askgod import app as askgod_app from ctf.check import app as check_app from ctf.deploy import app as deploy_app from ctf.destroy import app as destroy_app @@ -33,18 +34,23 @@ help="CLI tool to manage CTF challenges as code. Run from the root CTF repo directory or set the CTF_ROOT_DIR environment variable to run the tool.", no_args_is_help=True, ) -app.add_typer(validate_app) -app.add_typer(init_app) -app.add_typer(new_app) +app.add_typer( + askgod_app, + name="askgod", + help="Commands for interacting with a live askgod server (github.com/nsec/askgod).", +) +app.add_typer(check_app) +app.add_typer(deploy_app) app.add_typer(destroy_app) app.add_typer(flags_app) -app.add_typer(services_app) app.add_typer(generate_app) -app.add_typer(deploy_app) +app.add_typer(init_app) +app.add_typer(list_app) +app.add_typer(new_app) app.add_typer(redeploy_app) -app.add_typer(check_app) +app.add_typer(services_app) app.add_typer(stats_app) -app.add_typer(list_app) +app.add_typer(validate_app) app.add_typer(version_app) diff --git a/ctf/askgod/__init__.py b/ctf/askgod/__init__.py new file mode 100644 index 0000000..c22cf6b --- /dev/null +++ b/ctf/askgod/__init__.py @@ -0,0 +1,6 @@ +import typer + +from ctf.askgod.stats import app as stats_app + +app = typer.Typer() +app.add_typer(stats_app) diff --git a/ctf/askgod/stats.py b/ctf/askgod/stats.py new file mode 100644 index 0000000..070943e --- /dev/null +++ b/ctf/askgod/stats.py @@ -0,0 +1,426 @@ +import json +from datetime import datetime, timezone + +import requests +import rich +import typer +from typing_extensions import Annotated + +from ctf.logger import LOG + +app = typer.Typer() + + +@app.command( + help="Show stats from askgod, specifically regarding to AI agent flag submissions." +) +def stats( + askgod_url: Annotated[ + str, typer.Option("--askgod-url", "-u", help="Askgod server URL.") + ] = "https://askgod.nsec", + html: Annotated[ + bool, typer.Option("--html", help="Generate an HTML report (stats.html).") + ] = False, +) -> None: + stats = {} + session = requests.Session() + session.base_url = askgod_url + "/1.0" + LOG.info(f"Fetching stats from {session.base_url}") + flags = get(session, "/flags") + scores = get(session, "/scores") + scoreboard = get(session, "/scoreboard") + # rich.print(flags) + # rich.print(scores) + # rich.print(scoreboard) + + # Join the flags and scores data together based on flag's `id` and score's `flag_id` by modifying the `scores` list in place + for score in scores: + flag = next((f for f in flags if f["id"] == score["flag_id"]), None) + if flag: + score["flag"] = flag["flag"] + score["description"] = flag["description"] + score["return_string"] = flag["return_string"] + else: + LOG.warning( + f"Could not find flag for score with flag_id {score['flag_id']}" + ) + LOG.info(f"Analyzing {len(scores)} scores...") + ai_agent_scores = [s for s in scores if s["ai_agent"]] + stats["total_scores"] = len(scores) + stats["ai_agent_scores"] = len(ai_agent_scores) + stats["ai_agent_score_percentage"] = ( + round(len(ai_agent_scores) / len(scores) * 100) if scores else 0 + ) + + stats["total_points"] = sum(s["value"] for s in scores) + stats["ai_agent_points"] = sum(s["value"] for s in ai_agent_scores) + stats["ai_agent_points_percentage"] = ( + round(stats["ai_agent_points"] / stats["total_points"] * 100) + if stats["total_points"] + else 0 + ) + + stats["total_teams"] = len(set(s["team_id"] for s in scores)) + stats["teams_with_ai_agent_scores"] = len( + set(s["team_id"] for s in ai_agent_scores) + ) + stats["teams_with_ai_agent_scores_percentage"] = ( + round(stats["teams_with_ai_agent_scores"] / stats["total_teams"] * 100) + if stats["total_teams"] + else 0 + ) + + teams_per_quintile = {} + # Separate teams into quintiles based on the scoreboard. The rank of a team is its position in the index of the scoreboard + for i in range(5): + teams_per_quintile[4 - i] = scoreboard[ + len(scoreboard) // 5 * i : len(scoreboard) // 5 * (i + 1) + ] + # rich.print(teams_per_quintile) + + stats["ai_agent_points_per_quintile"] = {} + for i in range(5): + quintile_team_ids = set(t["team"]["id"] for t in teams_per_quintile[i]) + ai_agent_points_in_quintile = sum( + s["value"] for s in ai_agent_scores if s["team_id"] in quintile_team_ids + ) + total_points_in_quintile = sum( + s["value"] for s in scores if s["team_id"] in quintile_team_ids + ) + stats["ai_agent_points_per_quintile"][f"quintile_{i + 1}"] = { + "ai_agent_points": ai_agent_points_in_quintile, + "total_points": total_points_in_quintile, + "ai_agent_points_percentage": ( + round(ai_agent_points_in_quintile / total_points_in_quintile * 100) + if total_points_in_quintile + else 0 + ), + } + + stats["ai_agent_scores_per_quintile"] = {} + for i in range(5): + quintile_team_ids = set(t["team"]["id"] for t in teams_per_quintile[i]) + ai_agent_scores_in_quintile = sum( + 1 for s in ai_agent_scores if s["team_id"] in quintile_team_ids + ) + total_scores_in_quintile = sum( + 1 for s in scores if s["team_id"] in quintile_team_ids + ) + stats["ai_agent_scores_per_quintile"][f"quintile_{i + 1}"] = { + "ai_agent_scores": ai_agent_scores_in_quintile, + "total_scores": total_scores_in_quintile, + "ai_agent_scores_percentage": ( + round(ai_agent_scores_in_quintile / total_scores_in_quintile * 100) + if total_scores_in_quintile + else 0 + ), + } + + stats["ai_agent_solve_per_point"] = {} + for i in range(21): + stats["ai_agent_solve_per_point"][i] = { + "ai_agent_solves": 0, + "total_solves": 0, + "ai_agent_solve_percentage": 0, + } + for score in scores: + stats["ai_agent_solve_per_point"][score["value"]]["total_solves"] += 1 + if score["ai_agent"]: + stats["ai_agent_solve_per_point"][score["value"]]["ai_agent_solves"] += 1 + stats["ai_agent_solve_per_point"][score["value"]][ + "ai_agent_solve_percentage" + ] = round( + stats["ai_agent_solve_per_point"][score["value"]]["ai_agent_solves"] + / stats["ai_agent_solve_per_point"][score["value"]]["total_solves"] + * 100 + ) + stats["ai_agent_solve_per_point"] = dict( + sorted(stats["ai_agent_solve_per_point"].items(), key=lambda item: item[0]) + ) + + flags_with_ai_solves = len(set(s["flag_id"] for s in ai_agent_scores)) + total_flags_solved = len(set(s["flag_id"] for s in scores)) + stats["flags_with_ai_agent_solves"] = flags_with_ai_solves + stats["total_flags_solved"] = total_flags_solved + stats["percentage_of_flags_with_ai_agent_solves"] = ( + round(flags_with_ai_solves / total_flags_solved * 100) if scores else 0 + ) + + # Bucket submissions into 4-second intervals and compute AI% per bucket + bucket_size = 10 + buckets: dict[int, dict] = {} + for score in scores: + t = datetime.fromisoformat(score["submit_time"].replace("Z", "+00:00")) + epoch = int(t.timestamp()) + bucket_key = (epoch // bucket_size) * bucket_size + if bucket_key not in buckets: + buckets[bucket_key] = {"ai_count": 0, "total_count": 0} + buckets[bucket_key]["total_count"] += 1 + if score["ai_agent"]: + buckets[bucket_key]["ai_count"] += 1 + stats["ai_agent_percentage_over_time"] = [ + { + "bucket_start": datetime.fromtimestamp(k, tz=timezone.utc).strftime( + "%a %H:%M:%S" + ), + "ai_count": v["ai_count"], + "total_count": v["total_count"], + "ai_percentage": round(v["ai_count"] / v["total_count"] * 100), + } + for k, v in sorted(buckets.items()) + ] + + rich.print(stats) + + if html: + html_content = generate_html(stats) + with open("stats.html", "w") as f: + f.write(html_content) + LOG.info("HTML report written to stats.html") + + +def generate_html(stats: dict) -> str: + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + quintile_labels = ["Bottom 20%", "20-40%", "40-60%", "60-80%", "Top 20%"] + + points_ai_pct = [ + stats["ai_agent_points_per_quintile"][f"quintile_{i}"][ + "ai_agent_points_percentage" + ] + for i in range(1, 6) + ] + points_human_pct = [ + 100 + - stats["ai_agent_points_per_quintile"][f"quintile_{i}"][ + "ai_agent_points_percentage" + ] + for i in range(1, 6) + ] + + scores_ai_pct = [ + stats["ai_agent_scores_per_quintile"][f"quintile_{i}"][ + "ai_agent_scores_percentage" + ] + for i in range(1, 6) + ] + scores_human_pct = [ + 100 + - stats["ai_agent_scores_per_quintile"][f"quintile_{i}"][ + "ai_agent_scores_percentage" + ] + for i in range(1, 6) + ] + + per_point = stats["ai_agent_solve_per_point"] + point_labels = [str(k) for k in per_point if k > 0] + point_ai_pct = [ + per_point[k]["ai_agent_solve_percentage"] for k in per_point if k > 0 + ] + point_human_pct = [100 - v for v in point_ai_pct] + + over_time = stats["ai_agent_percentage_over_time"] + time_labels = [b["bucket_start"] for b in over_time] + time_ai_pct = [b["ai_percentage"] for b in over_time] + time_human_pct = [100 - v for v in time_ai_pct] + + return f""" + + + + +AI Agent Stats + + + + +

AI Agent Stats

+

Generated {timestamp}

+ +
+
{stats["ai_agent_score_percentage"]}%
Valid Flags submitted by an AI agent
{stats["ai_agent_scores"]} / {stats["total_scores"]}
+
{stats["ai_agent_points_percentage"]}%
Points scored by AI Agents
{stats["ai_agent_points"]} / {stats["total_points"]}
+
{stats["teams_with_ai_agent_scores_percentage"]}%
Teams Using AI Agents
{stats["teams_with_ai_agent_scores"]} / {stats["total_teams"]}
+
{stats["percentage_of_flags_with_ai_agent_solves"]}%
Solved Flags w/ at least one agent Solve
{stats["flags_with_ai_agent_solves"]} / {stats["total_flags_solved"]}
+
+ +
+
+

Points per Quintile

+ +
+
+

Solves per Quintile

+ +
+
+ +
+
+

AI Solve % by Flag Point Value

+ +
+
+

AI Submission % over Time (4s buckets)

+ +
+
+ + + +""" + + +def get(session: requests.Session, url: str) -> dict: + try: + response = session.get(url=f"{session.base_url}{url}") + response.raise_for_status() + return response.json() + except requests.HTTPError as e: + e.add_note(f"Failed to fetch stats from {e.request.url}: {e.text}") + raise e diff --git a/poetry.lock b/poetry.lock index df1785b..7f80110 100644 --- a/poetry.lock +++ b/poetry.lock @@ -81,10 +81,9 @@ uvloop = ["uvloop (>=0.15.2)"] name = "certifi" version = "2025.7.14" description = "Python package for providing Mozilla's CA Bundle." -optional = true +optional = false python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"workflow\"" files = [ {file = "certifi-2025.7.14-py3-none-any.whl", hash = "sha256:6b31f564a415d79ee77df69d757bb49a5bb53bd9f756cbbe24394ffd6fc1f4b2"}, {file = "certifi-2025.7.14.tar.gz", hash = "sha256:8ea99dbdfaaf2ba2f9bac77b9249ef62ec5218e7c2b2e903378ed5fccf765995"}, @@ -94,10 +93,9 @@ files = [ name = "charset-normalizer" version = "3.4.2" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -optional = true +optional = false python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"workflow\"" files = [ {file = "charset_normalizer-3.4.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7c48ed483eb946e6c04ccbe02c6b4d1d48e51944b6db70f697e089c193404941"}, {file = "charset_normalizer-3.4.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b2d318c11350e10662026ad0eb71bb51c7812fc8590825304ae0bdd4ac283acd"}, @@ -386,10 +384,9 @@ woff = ["brotli (>=1.0.1) ; platform_python_implementation == \"CPython\"", "bro name = "idna" version = "3.10" description = "Internationalized Domain Names in Applications (IDNA)" -optional = true +optional = false python-versions = ">=3.6" groups = ["main"] -markers = "extra == \"workflow\"" files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -1263,10 +1260,9 @@ typing-extensions = {version = ">=4.4.0", markers = "python_version < \"3.13\""} name = "requests" version = "2.32.4" description = "Python HTTP for Humans." -optional = true +optional = false python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"workflow\"" files = [ {file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"}, {file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"}, @@ -1591,10 +1587,9 @@ typing-extensions = ">=4.12.0" name = "urllib3" version = "2.5.0" description = "HTTP library with thread-safe connection pooling, file post, and more." -optional = true +optional = false python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"workflow\"" files = [ {file = "urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc"}, {file = "urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760"}, @@ -1612,4 +1607,4 @@ workflow = ["matplotlib", "pybadges", "standard-imghdr"] [metadata] lock-version = "2.1" python-versions = ">=3.11" -content-hash = "f1c4f5b6299867368584046e3684bf12839972b619fe5b6b14e7b5b21ef590db" +content-hash = "caed93386cad5888f8abea3bc094974913cce04350749aca2857832c4b3a140d" diff --git a/pyproject.toml b/pyproject.toml index f997e27..0f554c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "jinja2==3.1.5", "jsonschema==4.23.0", "pyyaml<7", + "requests", "rich==14.1.0", "ruff==0.12.8", "setuptools",