Skip to content

Commit 2afd667

Browse files
committed
CM-59844: add some basic test coverage to validate updates
1 parent 15b2da6 commit 2afd667

5 files changed

Lines changed: 927 additions & 0 deletions

File tree

tests/cli/apps/__init__.py

Whitespace-only changes.

tests/cli/apps/mcp/__init__.py

Whitespace-only changes.
Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
import json
2+
import os
3+
import sys
4+
from unittest.mock import AsyncMock, patch
5+
6+
import pytest
7+
8+
if sys.version_info < (3, 10):
9+
pytest.skip('MCP requires Python 3.10+', allow_module_level=True)
10+
11+
from cycode.cli.apps.mcp.mcp_command import (
12+
_sanitize_file_path,
13+
_TempFilesManager,
14+
)
15+
16+
pytestmark = pytest.mark.anyio
17+
18+
19+
@pytest.fixture
20+
def anyio_backend() -> str:
21+
return 'asyncio'
22+
23+
24+
# --- _sanitize_file_path input validation ---
25+
26+
27+
def test_sanitize_file_path_rejects_empty_string() -> None:
28+
with pytest.raises(ValueError, match='non-empty string'):
29+
_sanitize_file_path('')
30+
31+
32+
def test_sanitize_file_path_rejects_none() -> None:
33+
with pytest.raises(ValueError, match='non-empty string'):
34+
_sanitize_file_path(None)
35+
36+
37+
def test_sanitize_file_path_rejects_non_string() -> None:
38+
with pytest.raises(ValueError, match='non-empty string'):
39+
_sanitize_file_path(123)
40+
41+
42+
def test_sanitize_file_path_strips_null_bytes() -> None:
43+
result = _sanitize_file_path('foo/bar\x00baz.py')
44+
assert '\x00' not in result
45+
46+
47+
def test_sanitize_file_path_passes_valid_path_through() -> None:
48+
assert _sanitize_file_path('src/main.py') == 'src/main.py'
49+
50+
51+
# --- _TempFilesManager: path traversal prevention ---
52+
#
53+
# _sanitize_file_path delegates to pathvalidate which does NOT block
54+
# path traversal (../ passes through). The real security boundary is
55+
# the normpath containment check in _TempFilesManager.__enter__ (lines 136-139).
56+
# These tests verify that the two layers together prevent escaping the temp dir.
57+
58+
59+
def test_traversal_simple_dotdot_rejected() -> None:
60+
"""../../../etc/passwd must not escape the temp directory."""
61+
files = {
62+
'../../../etc/passwd': 'malicious',
63+
'safe.py': 'ok',
64+
}
65+
with _TempFilesManager(files, 'test-traversal') as temp_files:
66+
assert len(temp_files) == 1
67+
assert temp_files[0].endswith('safe.py')
68+
for tf in temp_files:
69+
assert '/etc/passwd' not in tf
70+
71+
72+
def test_traversal_backslash_dotdot_rejected() -> None:
73+
"""..\\..\\windows\\system32 must not escape the temp directory."""
74+
files = {
75+
'..\\..\\windows\\system32\\config': 'malicious',
76+
'safe.py': 'ok',
77+
}
78+
with _TempFilesManager(files, 'test-backslash') as temp_files:
79+
assert len(temp_files) == 1
80+
assert temp_files[0].endswith('safe.py')
81+
82+
83+
def test_traversal_embedded_dotdot_rejected() -> None:
84+
"""foo/../../../etc/passwd resolves outside temp dir and must be rejected."""
85+
files = {
86+
'foo/../../../etc/passwd': 'malicious',
87+
'safe.py': 'ok',
88+
}
89+
with _TempFilesManager(files, 'test-embedded') as temp_files:
90+
assert len(temp_files) == 1
91+
assert temp_files[0].endswith('safe.py')
92+
93+
94+
def test_traversal_absolute_path_rejected() -> None:
95+
"""Absolute paths must not be written outside the temp directory."""
96+
files = {
97+
'/etc/passwd': 'malicious',
98+
'safe.py': 'ok',
99+
}
100+
with _TempFilesManager(files, 'test-absolute') as temp_files:
101+
assert len(temp_files) == 1
102+
assert temp_files[0].endswith('safe.py')
103+
104+
105+
def test_traversal_dotdot_only_rejected() -> None:
106+
"""A bare '..' path must be rejected."""
107+
files = {
108+
'..': 'malicious',
109+
'safe.py': 'ok',
110+
}
111+
with _TempFilesManager(files, 'test-bare-dotdot') as temp_files:
112+
assert len(temp_files) == 1
113+
114+
115+
def test_traversal_all_malicious_raises() -> None:
116+
"""If every file path is a traversal attempt, no files are created and ValueError is raised."""
117+
files = {
118+
'../../../etc/passwd': 'malicious',
119+
'../../shadow': 'also malicious',
120+
}
121+
with pytest.raises(ValueError, match='No valid files'), _TempFilesManager(files, 'test-all-malicious'):
122+
pass
123+
124+
125+
def test_all_created_files_are_inside_temp_dir() -> None:
126+
"""Every created file must be under the temp base directory."""
127+
files = {
128+
'a.py': 'aaa',
129+
'sub/b.py': 'bbb',
130+
'sub/deep/c.py': 'ccc',
131+
}
132+
manager = _TempFilesManager(files, 'test-containment')
133+
with manager as temp_files:
134+
base = os.path.normcase(os.path.normpath(manager.temp_base_dir))
135+
for tf in temp_files:
136+
normalized = os.path.normcase(os.path.normpath(tf))
137+
assert normalized.startswith(base + os.sep), f'{tf} escaped temp dir {base}'
138+
139+
140+
def test_mixed_valid_and_traversal_only_creates_valid() -> None:
141+
"""Valid files are created, traversal attempts are silently skipped."""
142+
files = {
143+
'../escape.py': 'bad',
144+
'legit.py': 'good',
145+
'foo/../../escape2.py': 'bad',
146+
'src/app.py': 'good',
147+
}
148+
manager = _TempFilesManager(files, 'test-mixed')
149+
with manager as temp_files:
150+
base = os.path.normcase(os.path.normpath(manager.temp_base_dir))
151+
assert len(temp_files) == 2
152+
for tf in temp_files:
153+
assert os.path.normcase(os.path.normpath(tf)).startswith(base + os.sep)
154+
basenames = [os.path.basename(tf) for tf in temp_files]
155+
assert 'legit.py' in basenames
156+
assert 'app.py' in basenames
157+
158+
159+
# --- _TempFilesManager: general functionality ---
160+
161+
162+
def test_temp_files_manager_creates_files() -> None:
163+
files = {
164+
'test1.py': 'print("hello")',
165+
'subdir/test2.js': 'console.log("world")',
166+
}
167+
with _TempFilesManager(files, 'test-call-id') as temp_files:
168+
assert len(temp_files) == 2
169+
for tf in temp_files:
170+
assert os.path.exists(tf)
171+
172+
173+
def test_temp_files_manager_writes_correct_content() -> None:
174+
files = {'hello.py': 'print("hello world")'}
175+
with _TempFilesManager(files, 'test-content') as temp_files, open(temp_files[0]) as f:
176+
assert f.read() == 'print("hello world")'
177+
178+
179+
def test_temp_files_manager_cleans_up_on_exit() -> None:
180+
files = {'cleanup.py': 'code'}
181+
manager = _TempFilesManager(files, 'test-cleanup')
182+
with manager as temp_files:
183+
temp_dir = manager.temp_base_dir
184+
assert os.path.exists(temp_dir)
185+
assert len(temp_files) == 1
186+
assert not os.path.exists(temp_dir)
187+
188+
189+
def test_temp_files_manager_empty_path_raises() -> None:
190+
files = {'': 'empty path'}
191+
with pytest.raises(ValueError, match='No valid files'), _TempFilesManager(files, 'test-empty-path'):
192+
pass
193+
194+
195+
def test_temp_files_manager_preserves_subdirectory_structure() -> None:
196+
files = {
197+
'src/main.py': 'main',
198+
'src/utils/helper.py': 'helper',
199+
}
200+
with _TempFilesManager(files, 'test-dirs') as temp_files:
201+
assert len(temp_files) == 2
202+
paths = [os.path.basename(tf) for tf in temp_files]
203+
assert 'main.py' in paths
204+
assert 'helper.py' in paths
205+
206+
207+
# --- _run_cycode_command (async) ---
208+
209+
210+
@pytest.mark.anyio
211+
async def test_run_cycode_command_returns_dict() -> None:
212+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
213+
214+
mock_process = AsyncMock()
215+
mock_process.communicate.return_value = (b'', b'error output')
216+
mock_process.returncode = 1
217+
218+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
219+
result = await _run_cycode_command('--invalid-flag-for-test')
220+
assert isinstance(result, dict)
221+
assert 'error' in result
222+
223+
224+
@pytest.mark.anyio
225+
async def test_run_cycode_command_parses_json_output() -> None:
226+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
227+
228+
mock_process = AsyncMock()
229+
mock_process.communicate.return_value = (b'{"status": "ok"}', b'')
230+
mock_process.returncode = 0
231+
232+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
233+
result = await _run_cycode_command('version')
234+
assert result == {'status': 'ok'}
235+
236+
237+
@pytest.mark.anyio
238+
async def test_run_cycode_command_handles_invalid_json() -> None:
239+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
240+
241+
mock_process = AsyncMock()
242+
mock_process.communicate.return_value = (b'not json{', b'')
243+
mock_process.returncode = 0
244+
245+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
246+
result = await _run_cycode_command('version')
247+
assert result['error'] == 'Failed to parse JSON output'
248+
249+
250+
@pytest.mark.anyio
251+
async def test_run_cycode_command_timeout() -> None:
252+
import asyncio
253+
254+
from cycode.cli.apps.mcp.mcp_command import _run_cycode_command
255+
256+
async def slow_communicate() -> tuple[bytes, bytes]:
257+
await asyncio.sleep(10)
258+
return b'', b''
259+
260+
mock_process = AsyncMock()
261+
mock_process.communicate = slow_communicate
262+
263+
with patch('asyncio.create_subprocess_exec', return_value=mock_process):
264+
result = await _run_cycode_command('status', timeout=0.001)
265+
assert isinstance(result, dict)
266+
assert 'error' in result
267+
assert 'timeout' in result['error'].lower()
268+
269+
270+
# --- _cycode_scan_tool ---
271+
272+
273+
@pytest.mark.anyio
274+
async def test_cycode_scan_tool_no_files() -> None:
275+
from cycode.cli.apps.mcp.mcp_command import _cycode_scan_tool
276+
from cycode.cli.cli_types import ScanTypeOption
277+
278+
result = await _cycode_scan_tool(ScanTypeOption.SECRET, {})
279+
parsed = json.loads(result)
280+
assert 'error' in parsed
281+
assert 'No files provided' in parsed['error']
282+
283+
284+
@pytest.mark.anyio
285+
async def test_cycode_scan_tool_invalid_files() -> None:
286+
from cycode.cli.apps.mcp.mcp_command import _cycode_scan_tool
287+
from cycode.cli.cli_types import ScanTypeOption
288+
289+
result = await _cycode_scan_tool(ScanTypeOption.SECRET, {'': 'content'})
290+
parsed = json.loads(result)
291+
assert 'error' in parsed
292+
293+
294+
# --- _create_mcp_server ---
295+
296+
297+
def test_create_mcp_server() -> None:
298+
from cycode.cli.apps.mcp.mcp_command import _create_mcp_server
299+
300+
server = _create_mcp_server('127.0.0.1', 8000)
301+
assert server is not None
302+
assert server.name == 'cycode'
303+
304+
305+
def test_create_mcp_server_registers_tools() -> None:
306+
from cycode.cli.apps.mcp.mcp_command import _create_mcp_server
307+
308+
server = _create_mcp_server('127.0.0.1', 8000)
309+
tool_names = [t.name for t in server._tool_manager._tools.values()]
310+
assert 'cycode_status' in tool_names
311+
assert 'cycode_secret_scan' in tool_names
312+
assert 'cycode_sca_scan' in tool_names
313+
assert 'cycode_iac_scan' in tool_names
314+
assert 'cycode_sast_scan' in tool_names

0 commit comments

Comments
 (0)