diff --git a/.gitignore b/.gitignore index a0c251a..d41f21e 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ challenges/*/terraform/versions.tf .vscode/ .idea +stats.html \ No newline at end of file diff --git a/ctf/__main__.py b/ctf/__main__.py index 0606e1a..daeaceb 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -13,6 +13,7 @@ from typing_extensions import Annotated from ctf import ENV, STATE +from ctf.askgod import app as askgod_app from ctf.check import app as check_app from ctf.deploy import app as deploy_app from ctf.destroy import app as destroy_app @@ -33,18 +34,23 @@ help="CLI tool to manage CTF challenges as code. Run from the root CTF repo directory or set the CTF_ROOT_DIR environment variable to run the tool.", no_args_is_help=True, ) -app.add_typer(validate_app) -app.add_typer(init_app) -app.add_typer(new_app) +app.add_typer( + askgod_app, + name="askgod", + help="Commands for interacting with a live askgod server (github.com/nsec/askgod).", +) +app.add_typer(check_app) +app.add_typer(deploy_app) app.add_typer(destroy_app) app.add_typer(flags_app) -app.add_typer(services_app) app.add_typer(generate_app) -app.add_typer(deploy_app) +app.add_typer(init_app) +app.add_typer(list_app) +app.add_typer(new_app) app.add_typer(redeploy_app) -app.add_typer(check_app) +app.add_typer(services_app) app.add_typer(stats_app) -app.add_typer(list_app) +app.add_typer(validate_app) app.add_typer(version_app) diff --git a/ctf/askgod/__init__.py b/ctf/askgod/__init__.py new file mode 100644 index 0000000..c22cf6b --- /dev/null +++ b/ctf/askgod/__init__.py @@ -0,0 +1,6 @@ +import typer + +from ctf.askgod.stats import app as stats_app + +app = typer.Typer() +app.add_typer(stats_app) diff --git a/ctf/askgod/stats.py b/ctf/askgod/stats.py new file mode 100644 index 0000000..070943e --- /dev/null +++ b/ctf/askgod/stats.py @@ -0,0 +1,426 @@ +import json +from datetime import datetime, timezone + +import requests +import rich +import typer +from typing_extensions import Annotated + +from ctf.logger import LOG + +app = typer.Typer() + + +@app.command( + help="Show stats from askgod, specifically regarding to AI agent flag submissions." +) +def stats( + askgod_url: Annotated[ + str, typer.Option("--askgod-url", "-u", help="Askgod server URL.") + ] = "https://askgod.nsec", + html: Annotated[ + bool, typer.Option("--html", help="Generate an HTML report (stats.html).") + ] = False, +) -> None: + stats = {} + session = requests.Session() + session.base_url = askgod_url + "/1.0" + LOG.info(f"Fetching stats from {session.base_url}") + flags = get(session, "/flags") + scores = get(session, "/scores") + scoreboard = get(session, "/scoreboard") + # rich.print(flags) + # rich.print(scores) + # rich.print(scoreboard) + + # Join the flags and scores data together based on flag's `id` and score's `flag_id` by modifying the `scores` list in place + for score in scores: + flag = next((f for f in flags if f["id"] == score["flag_id"]), None) + if flag: + score["flag"] = flag["flag"] + score["description"] = flag["description"] + score["return_string"] = flag["return_string"] + else: + LOG.warning( + f"Could not find flag for score with flag_id {score['flag_id']}" + ) + LOG.info(f"Analyzing {len(scores)} scores...") + ai_agent_scores = [s for s in scores if s["ai_agent"]] + stats["total_scores"] = len(scores) + stats["ai_agent_scores"] = len(ai_agent_scores) + stats["ai_agent_score_percentage"] = ( + round(len(ai_agent_scores) / len(scores) * 100) if scores else 0 + ) + + stats["total_points"] = sum(s["value"] for s in scores) + stats["ai_agent_points"] = sum(s["value"] for s in ai_agent_scores) + stats["ai_agent_points_percentage"] = ( + round(stats["ai_agent_points"] / stats["total_points"] * 100) + if stats["total_points"] + else 0 + ) + + stats["total_teams"] = len(set(s["team_id"] for s in scores)) + stats["teams_with_ai_agent_scores"] = len( + set(s["team_id"] for s in ai_agent_scores) + ) + stats["teams_with_ai_agent_scores_percentage"] = ( + round(stats["teams_with_ai_agent_scores"] / stats["total_teams"] * 100) + if stats["total_teams"] + else 0 + ) + + teams_per_quintile = {} + # Separate teams into quintiles based on the scoreboard. The rank of a team is its position in the index of the scoreboard + for i in range(5): + teams_per_quintile[4 - i] = scoreboard[ + len(scoreboard) // 5 * i : len(scoreboard) // 5 * (i + 1) + ] + # rich.print(teams_per_quintile) + + stats["ai_agent_points_per_quintile"] = {} + for i in range(5): + quintile_team_ids = set(t["team"]["id"] for t in teams_per_quintile[i]) + ai_agent_points_in_quintile = sum( + s["value"] for s in ai_agent_scores if s["team_id"] in quintile_team_ids + ) + total_points_in_quintile = sum( + s["value"] for s in scores if s["team_id"] in quintile_team_ids + ) + stats["ai_agent_points_per_quintile"][f"quintile_{i + 1}"] = { + "ai_agent_points": ai_agent_points_in_quintile, + "total_points": total_points_in_quintile, + "ai_agent_points_percentage": ( + round(ai_agent_points_in_quintile / total_points_in_quintile * 100) + if total_points_in_quintile + else 0 + ), + } + + stats["ai_agent_scores_per_quintile"] = {} + for i in range(5): + quintile_team_ids = set(t["team"]["id"] for t in teams_per_quintile[i]) + ai_agent_scores_in_quintile = sum( + 1 for s in ai_agent_scores if s["team_id"] in quintile_team_ids + ) + total_scores_in_quintile = sum( + 1 for s in scores if s["team_id"] in quintile_team_ids + ) + stats["ai_agent_scores_per_quintile"][f"quintile_{i + 1}"] = { + "ai_agent_scores": ai_agent_scores_in_quintile, + "total_scores": total_scores_in_quintile, + "ai_agent_scores_percentage": ( + round(ai_agent_scores_in_quintile / total_scores_in_quintile * 100) + if total_scores_in_quintile + else 0 + ), + } + + stats["ai_agent_solve_per_point"] = {} + for i in range(21): + stats["ai_agent_solve_per_point"][i] = { + "ai_agent_solves": 0, + "total_solves": 0, + "ai_agent_solve_percentage": 0, + } + for score in scores: + stats["ai_agent_solve_per_point"][score["value"]]["total_solves"] += 1 + if score["ai_agent"]: + stats["ai_agent_solve_per_point"][score["value"]]["ai_agent_solves"] += 1 + stats["ai_agent_solve_per_point"][score["value"]][ + "ai_agent_solve_percentage" + ] = round( + stats["ai_agent_solve_per_point"][score["value"]]["ai_agent_solves"] + / stats["ai_agent_solve_per_point"][score["value"]]["total_solves"] + * 100 + ) + stats["ai_agent_solve_per_point"] = dict( + sorted(stats["ai_agent_solve_per_point"].items(), key=lambda item: item[0]) + ) + + flags_with_ai_solves = len(set(s["flag_id"] for s in ai_agent_scores)) + total_flags_solved = len(set(s["flag_id"] for s in scores)) + stats["flags_with_ai_agent_solves"] = flags_with_ai_solves + stats["total_flags_solved"] = total_flags_solved + stats["percentage_of_flags_with_ai_agent_solves"] = ( + round(flags_with_ai_solves / total_flags_solved * 100) if scores else 0 + ) + + # Bucket submissions into 4-second intervals and compute AI% per bucket + bucket_size = 10 + buckets: dict[int, dict] = {} + for score in scores: + t = datetime.fromisoformat(score["submit_time"].replace("Z", "+00:00")) + epoch = int(t.timestamp()) + bucket_key = (epoch // bucket_size) * bucket_size + if bucket_key not in buckets: + buckets[bucket_key] = {"ai_count": 0, "total_count": 0} + buckets[bucket_key]["total_count"] += 1 + if score["ai_agent"]: + buckets[bucket_key]["ai_count"] += 1 + stats["ai_agent_percentage_over_time"] = [ + { + "bucket_start": datetime.fromtimestamp(k, tz=timezone.utc).strftime( + "%a %H:%M:%S" + ), + "ai_count": v["ai_count"], + "total_count": v["total_count"], + "ai_percentage": round(v["ai_count"] / v["total_count"] * 100), + } + for k, v in sorted(buckets.items()) + ] + + rich.print(stats) + + if html: + html_content = generate_html(stats) + with open("stats.html", "w") as f: + f.write(html_content) + LOG.info("HTML report written to stats.html") + + +def generate_html(stats: dict) -> str: + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + quintile_labels = ["Bottom 20%", "20-40%", "40-60%", "60-80%", "Top 20%"] + + points_ai_pct = [ + stats["ai_agent_points_per_quintile"][f"quintile_{i}"][ + "ai_agent_points_percentage" + ] + for i in range(1, 6) + ] + points_human_pct = [ + 100 + - stats["ai_agent_points_per_quintile"][f"quintile_{i}"][ + "ai_agent_points_percentage" + ] + for i in range(1, 6) + ] + + scores_ai_pct = [ + stats["ai_agent_scores_per_quintile"][f"quintile_{i}"][ + "ai_agent_scores_percentage" + ] + for i in range(1, 6) + ] + scores_human_pct = [ + 100 + - stats["ai_agent_scores_per_quintile"][f"quintile_{i}"][ + "ai_agent_scores_percentage" + ] + for i in range(1, 6) + ] + + per_point = stats["ai_agent_solve_per_point"] + point_labels = [str(k) for k in per_point if k > 0] + point_ai_pct = [ + per_point[k]["ai_agent_solve_percentage"] for k in per_point if k > 0 + ] + point_human_pct = [100 - v for v in point_ai_pct] + + over_time = stats["ai_agent_percentage_over_time"] + time_labels = [b["bucket_start"] for b in over_time] + time_ai_pct = [b["ai_percentage"] for b in over_time] + time_human_pct = [100 - v for v in time_ai_pct] + + return f""" + +
+ + +Generated {timestamp}
+ +