Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions queue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths":
pid INTEGER,
server_id TEXT,
child_pid INTEGER,
command TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
Expand All @@ -62,6 +63,11 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths":
ALTER TABLE queue ADD COLUMN server_id TEXT
"""

# Migration to add command column to existing databases
QUEUE_MIGRATION_COMMAND = """
ALTER TABLE queue ADD COLUMN command TEXT
"""

QUEUE_INDEX = """
CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(queue_name, status)
"""
Expand Down Expand Up @@ -91,11 +97,12 @@ def init_db(paths: QueuePaths):
with get_db(paths.db_path) as conn:
conn.execute(QUEUE_SCHEMA)
conn.execute(QUEUE_INDEX)
# Run migration for existing databases without server_id column
try:
conn.execute(QUEUE_MIGRATION_SERVER_ID)
except sqlite3.OperationalError:
pass # Column already exists
# Run migrations for existing databases
for migration in [QUEUE_MIGRATION_SERVER_ID, QUEUE_MIGRATION_COMMAND]:
try:
conn.execute(migration)
except sqlite3.OperationalError:
pass # Column already exists


def ensure_db(paths: QueuePaths):
Expand Down
229 changes: 229 additions & 0 deletions tests/test_tq_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
"""

import json
import os
import signal
import subprocess
import sys
import tempfile
import time
from pathlib import Path

import pytest
Expand Down Expand Up @@ -153,6 +156,176 @@ def test_list_no_database(self, temp_data_dir):
assert result.returncode == 0
assert "No queue database" in result.stdout or "empty" in result.stdout.lower()

def test_list_json_empty_queue(self, temp_data_dir):
"""Test list --json command with DB exists but queue is empty."""
# Initialize DB by running a task that completes
run_tq("echo", "init", data_dir=temp_data_dir)

result = run_tq("list", "--json", data_dir=temp_data_dir)

assert result.returncode == 0
output = json.loads(result.stdout)
assert output == {
"tasks": [],
"summary": {"total": 0, "running": 0, "waiting": 0}
}

def test_list_json_no_database(self, temp_data_dir):
"""Test list --json command when database doesn't exist."""
result = run_tq("list", "--json", data_dir=temp_data_dir)

assert result.returncode == 0
output = json.loads(result.stdout)
assert output["tasks"] == []
assert output["summary"]["total"] == 0


class TestJsonSchemaContracts:
"""
Schema contract tests for JSON output.

These tests ensure the JSON structure remains stable for programmatic consumers
(e.g., Claude Code status lines). Any changes to these schemas should be
intentional and backward-compatible.
"""

# Expected fields for each schema - used to enforce contracts
LIST_REQUIRED_KEYS = {"tasks", "summary"}
LIST_SUMMARY_REQUIRED_KEYS = {"total", "running", "waiting"}
LIST_TASK_REQUIRED_KEYS = {"id", "queue_name", "status", "command", "pid", "child_pid", "created_at", "updated_at"}

LOGS_REQUIRED_KEYS = {"entries"}
LOGS_ENTRY_REQUIRED_KEYS = {"event", "timestamp"} # Base keys all entries must have

CLEAR_REQUIRED_KEYS = {"cleared", "success"}

def test_list_json_schema_empty(self, temp_data_dir):
"""Verify list --json schema structure when empty."""
result = run_tq("list", "--json", data_dir=temp_data_dir)
output = json.loads(result.stdout)

# Top-level keys
assert set(output.keys()) == self.LIST_REQUIRED_KEYS, \
f"list --json must have exactly keys {self.LIST_REQUIRED_KEYS}"

# Summary keys
assert set(output["summary"].keys()) == self.LIST_SUMMARY_REQUIRED_KEYS, \
f"list --json summary must have exactly keys {self.LIST_SUMMARY_REQUIRED_KEYS}"

# Type checks
assert isinstance(output["tasks"], list)
assert isinstance(output["summary"]["total"], int)
assert isinstance(output["summary"]["running"], int)
assert isinstance(output["summary"]["waiting"], int)

def test_list_json_schema_with_running_task(self, temp_data_dir):
"""Verify list --json task schema with an active task."""
# Start a long-running task
proc = subprocess.Popen(
[sys.executable, str(TQ_PATH), f"--data-dir={temp_data_dir}", "sleep", "30"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True,
)

try:
# Wait for it to start
time.sleep(0.5)

result = run_tq("list", "--json", data_dir=temp_data_dir)
output = json.loads(result.stdout)

# Verify structure
assert set(output.keys()) == self.LIST_REQUIRED_KEYS
assert len(output["tasks"]) >= 1

# Verify task object schema
task = output["tasks"][0]
assert set(task.keys()) == self.LIST_TASK_REQUIRED_KEYS, \
f"Task object must have exactly keys {self.LIST_TASK_REQUIRED_KEYS}, got {set(task.keys())}"

# Verify task field types
assert isinstance(task["id"], int)
assert isinstance(task["queue_name"], str)
assert task["status"] in ("running", "waiting")
assert task["command"] is None or isinstance(task["command"], str)
assert task["pid"] is None or isinstance(task["pid"], int)
assert task["child_pid"] is None or isinstance(task["child_pid"], int)
assert task["created_at"] is None or isinstance(task["created_at"], str)
assert task["updated_at"] is None or isinstance(task["updated_at"], str)

# Verify command is populated for the running task
assert task["command"] == "sleep 30", f"Expected command 'sleep 30', got {task['command']}"

# Verify summary counts are accurate
assert output["summary"]["total"] == len(output["tasks"])
running_count = sum(1 for t in output["tasks"] if t["status"] == "running")
waiting_count = sum(1 for t in output["tasks"] if t["status"] == "waiting")
assert output["summary"]["running"] == running_count
assert output["summary"]["waiting"] == waiting_count

finally:
# Clean up
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except Exception:
proc.terminate()
proc.wait(timeout=5)

def test_logs_json_schema_empty(self, temp_data_dir):
"""Verify logs --json schema structure when empty."""
result = run_tq("logs", "--json", data_dir=temp_data_dir)
output = json.loads(result.stdout)

assert set(output.keys()) == self.LOGS_REQUIRED_KEYS, \
f"logs --json must have exactly keys {self.LOGS_REQUIRED_KEYS}"
assert isinstance(output["entries"], list)

def test_logs_json_schema_with_entries(self, temp_data_dir):
"""Verify logs --json entry schema with actual log entries."""
# Generate some logs
run_tq("echo", "test", data_dir=temp_data_dir)

result = run_tq("logs", "--json", data_dir=temp_data_dir)
output = json.loads(result.stdout)

assert set(output.keys()) == self.LOGS_REQUIRED_KEYS
assert len(output["entries"]) >= 3 # queued, started, completed

# Verify each entry has required base keys
for entry in output["entries"]:
assert self.LOGS_ENTRY_REQUIRED_KEYS.issubset(set(entry.keys())), \
f"Log entry must have at least keys {self.LOGS_ENTRY_REQUIRED_KEYS}, got {set(entry.keys())}"
assert isinstance(entry["event"], str)
assert isinstance(entry["timestamp"], str)

# Verify specific event schemas
for entry in output["entries"]:
if entry["event"] == "task_queued":
assert "task_id" in entry
assert "queue_name" in entry
elif entry["event"] == "task_started":
assert "task_id" in entry
assert "queue_name" in entry
assert "wait_time_seconds" in entry
elif entry["event"] == "task_completed":
assert "task_id" in entry
assert "queue_name" in entry
assert "exit_code" in entry
assert "duration_seconds" in entry

def test_clear_json_schema(self, temp_data_dir):
"""Verify clear --json schema structure."""
result = run_tq("clear", "--json", data_dir=temp_data_dir)
output = json.loads(result.stdout)

assert set(output.keys()) == self.CLEAR_REQUIRED_KEYS, \
f"clear --json must have exactly keys {self.CLEAR_REQUIRED_KEYS}"
assert isinstance(output["cleared"], int)
assert isinstance(output["success"], bool)
assert output["cleared"] >= 0
assert output["success"] is True


class TestTqLogs:
"""Tests for the tq logs command."""
Expand Down Expand Up @@ -189,6 +362,43 @@ def test_logs_n_option(self, temp_data_dir):
lines = [line for line in result.stdout.strip().split("\n") if line]
assert len(lines) == 3

def test_logs_json_no_file(self, temp_data_dir):
"""Test logs --json command when no log file exists."""
result = run_tq("logs", "--json", data_dir=temp_data_dir)

assert result.returncode == 0
output = json.loads(result.stdout)
assert output == {"entries": []}

def test_logs_json_shows_activity(self, temp_data_dir):
"""Test logs --json command shows task activity."""
# Run a task first to generate logs
run_tq("echo", "test", data_dir=temp_data_dir)

result = run_tq("logs", "--json", data_dir=temp_data_dir)

assert result.returncode == 0
output = json.loads(result.stdout)
assert "entries" in output
assert len(output["entries"]) >= 3 # queued, started, completed

events = [e["event"] for e in output["entries"]]
assert "task_queued" in events
assert "task_started" in events
assert "task_completed" in events

def test_logs_json_n_option(self, temp_data_dir):
"""Test logs --json -n option to limit entries."""
# Run multiple tasks
for i in range(5):
run_tq("echo", f"test {i}", data_dir=temp_data_dir)

result = run_tq("logs", "--json", "-n", "3", data_dir=temp_data_dir)

assert result.returncode == 0
output = json.loads(result.stdout)
assert len(output["entries"]) == 3


class TestTqClear:
"""Tests for the tq clear command."""
Expand All @@ -203,6 +413,25 @@ def test_clear_empty_queue(self, temp_data_dir):
assert result.returncode == 0
assert "already empty" in result.stdout.lower()

def test_clear_json_empty_queue(self, temp_data_dir):
"""Test clear --json command with empty queue."""
# Initialize database by running a task that completes
run_tq("echo", "init", data_dir=temp_data_dir)

result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5)

assert result.returncode == 0
output = json.loads(result.stdout)
assert output == {"cleared": 0, "success": True}

def test_clear_json_no_database(self, temp_data_dir):
"""Test clear --json command when no database exists."""
result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5)

assert result.returncode == 0
output = json.loads(result.stdout)
assert output == {"cleared": 0, "success": True}


class TestTqHelp:
"""Tests for help output."""
Expand Down
Loading