|
1 | 1 | import pytest |
2 | 2 | import os |
| 3 | +import sys |
3 | 4 | import numpy as np |
4 | 5 |
|
5 | 6 |
|
| 7 | +# =================================================================== |
| 8 | +# PID Registry Tests (Issue #391) |
| 9 | +# =================================================================== |
| 10 | + |
| 11 | +class TestPidRegistry: |
| 12 | + """Tests for the Windows PID registry mechanism that replaces the |
| 13 | + old single-overwrite concorekill.bat approach.""" |
| 14 | + |
| 15 | + @pytest.fixture(autouse=True) |
| 16 | + def use_temp_dir(self, temp_dir, monkeypatch): |
| 17 | + """Run each test in an isolated temp directory.""" |
| 18 | + self.temp_dir = temp_dir |
| 19 | + monkeypatch.chdir(temp_dir) |
| 20 | + |
| 21 | + def test_register_pid_creates_registry_file(self): |
| 22 | + """_register_pid should create concorekill_pids.txt with current PID.""" |
| 23 | + from concore import _register_pid, _PID_REGISTRY_FILE |
| 24 | + _register_pid() |
| 25 | + assert os.path.exists(_PID_REGISTRY_FILE) |
| 26 | + with open(_PID_REGISTRY_FILE) as f: |
| 27 | + pids = [line.strip() for line in f if line.strip()] |
| 28 | + assert str(os.getpid()) in pids |
| 29 | + |
| 30 | + def test_register_pid_appends_not_overwrites(self): |
| 31 | + """Multiple calls to _register_pid should append, not overwrite.""" |
| 32 | + from concore import _register_pid, _PID_REGISTRY_FILE |
| 33 | + # Simulate two different PIDs by writing manually then registering |
| 34 | + with open(_PID_REGISTRY_FILE, "w") as f: |
| 35 | + f.write("11111\n") |
| 36 | + f.write("22222\n") |
| 37 | + _register_pid() |
| 38 | + with open(_PID_REGISTRY_FILE) as f: |
| 39 | + pids = [line.strip() for line in f if line.strip()] |
| 40 | + assert "11111" in pids |
| 41 | + assert "22222" in pids |
| 42 | + assert str(os.getpid()) in pids |
| 43 | + assert len(pids) == 3 |
| 44 | + |
| 45 | + def test_cleanup_pid_removes_current_pid(self): |
| 46 | + """_cleanup_pid should remove only the current PID from the registry.""" |
| 47 | + from concore import _cleanup_pid, _PID_REGISTRY_FILE |
| 48 | + current_pid = str(os.getpid()) |
| 49 | + with open(_PID_REGISTRY_FILE, "w") as f: |
| 50 | + f.write("99999\n") |
| 51 | + f.write(current_pid + "\n") |
| 52 | + f.write("88888\n") |
| 53 | + _cleanup_pid() |
| 54 | + with open(_PID_REGISTRY_FILE) as f: |
| 55 | + pids = [line.strip() for line in f if line.strip()] |
| 56 | + assert current_pid not in pids |
| 57 | + assert "99999" in pids |
| 58 | + assert "88888" in pids |
| 59 | + |
| 60 | + def test_cleanup_pid_deletes_files_when_last_pid(self): |
| 61 | + """When the current PID is the only one left, cleanup should |
| 62 | + remove both the registry file and the kill script.""" |
| 63 | + from concore import _cleanup_pid, _PID_REGISTRY_FILE, _KILL_SCRIPT_FILE |
| 64 | + current_pid = str(os.getpid()) |
| 65 | + with open(_PID_REGISTRY_FILE, "w") as f: |
| 66 | + f.write(current_pid + "\n") |
| 67 | + # Create a dummy kill script to verify it gets cleaned up |
| 68 | + with open(_KILL_SCRIPT_FILE, "w") as f: |
| 69 | + f.write("@echo off\n") |
| 70 | + _cleanup_pid() |
| 71 | + assert not os.path.exists(_PID_REGISTRY_FILE) |
| 72 | + assert not os.path.exists(_KILL_SCRIPT_FILE) |
| 73 | + |
| 74 | + def test_cleanup_pid_handles_missing_registry(self): |
| 75 | + """_cleanup_pid should not crash when registry file doesn't exist.""" |
| 76 | + from concore import _cleanup_pid, _PID_REGISTRY_FILE |
| 77 | + assert not os.path.exists(_PID_REGISTRY_FILE) |
| 78 | + _cleanup_pid() # Should not raise |
| 79 | + |
| 80 | + def test_write_kill_script_generates_bat_file(self): |
| 81 | + """_write_kill_script should create concorekill.bat with validation logic.""" |
| 82 | + from concore import _write_kill_script, _KILL_SCRIPT_FILE, _PID_REGISTRY_FILE |
| 83 | + _write_kill_script() |
| 84 | + assert os.path.exists(_KILL_SCRIPT_FILE) |
| 85 | + with open(_KILL_SCRIPT_FILE) as f: |
| 86 | + content = f.read() |
| 87 | + # Script should reference the PID registry file |
| 88 | + assert _PID_REGISTRY_FILE in content |
| 89 | + # Script should validate processes before killing |
| 90 | + assert "tasklist" in content |
| 91 | + assert "taskkill" in content |
| 92 | + assert "python" in content.lower() |
| 93 | + |
| 94 | + def test_multi_node_registration(self): |
| 95 | + """Simulate 3 nodes registering PIDs — all should be present.""" |
| 96 | + from concore import _register_pid, _PID_REGISTRY_FILE |
| 97 | + fake_pids = ["1204", "1932", "8120"] |
| 98 | + with open(_PID_REGISTRY_FILE, "w") as f: |
| 99 | + for pid in fake_pids: |
| 100 | + f.write(pid + "\n") |
| 101 | + _register_pid() # Current process is the 4th |
| 102 | + with open(_PID_REGISTRY_FILE) as f: |
| 103 | + pids = [line.strip() for line in f if line.strip()] |
| 104 | + for pid in fake_pids: |
| 105 | + assert pid in pids |
| 106 | + assert str(os.getpid()) in pids |
| 107 | + assert len(pids) == 4 |
| 108 | + |
| 109 | + def test_cleanup_preserves_other_pids(self): |
| 110 | + """After cleanup, only the current process PID should be removed.""" |
| 111 | + from concore import _cleanup_pid, _PID_REGISTRY_FILE |
| 112 | + current_pid = str(os.getpid()) |
| 113 | + other_pids = ["1111", "2222", "3333"] |
| 114 | + with open(_PID_REGISTRY_FILE, "w") as f: |
| 115 | + for pid in other_pids: |
| 116 | + f.write(pid + "\n") |
| 117 | + f.write(current_pid + "\n") |
| 118 | + _cleanup_pid() |
| 119 | + with open(_PID_REGISTRY_FILE) as f: |
| 120 | + pids = [line.strip() for line in f if line.strip()] |
| 121 | + assert len(pids) == 3 |
| 122 | + assert current_pid not in pids |
| 123 | + for pid in other_pids: |
| 124 | + assert pid in pids |
| 125 | + |
| 126 | + @pytest.mark.skipif(not hasattr(sys, 'getwindowsversion'), |
| 127 | + reason="Windows-only test") |
| 128 | + def test_import_registers_pid_on_windows(self): |
| 129 | + """On Windows, importing concore should register the PID.""" |
| 130 | + from concore import _PID_REGISTRY_FILE |
| 131 | + # The import already happened, so just verify the registry exists |
| 132 | + # in our temp dir (we can't easily test the import-time side effect |
| 133 | + # since concore was already imported — we test the functions directly) |
| 134 | + from concore import _register_pid |
| 135 | + _register_pid() |
| 136 | + assert os.path.exists(_PID_REGISTRY_FILE) |
| 137 | + |
| 138 | + |
6 | 139 | class TestSafeLiteralEval: |
7 | 140 | def test_reads_dictionary_from_file(self, temp_dir): |
8 | 141 | test_file = os.path.join(temp_dir, "config.txt") |
|
0 commit comments