-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommand_executor.py
More file actions
139 lines (112 loc) · 4.75 KB
/
command_executor.py
File metadata and controls
139 lines (112 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import shlex
import subprocess
import threading
from flask import request, session
from auth import AUTH_SESSION_KEY, SUDO_SESSION_KEY, is_authenticated
ALLOWED_COMMANDS = {
'ls': '/bin/ls',
'ps': '/bin/ps',
'df': '/bin/df',
'free': '/usr/bin/free',
'top': '/usr/bin/top',
'systemctl': '/bin/systemctl',
'journalctl': '/bin/journalctl',
'cat': '/bin/cat',
'grep': '/bin/grep',
'uptime': '/usr/bin/uptime',
'who': '/usr/bin/who',
'date': '/bin/date',
'pwd': '/bin/pwd',
}
class CommandExecutor:
@staticmethod
def execute_command(socketio, command: str, socket_id: str, sudo_password=None, sudo_enabled: bool = False):
try:
args = shlex.split(command)
if not args:
return "[ERROR] Empty command"
base_command = args[0]
if base_command not in ALLOWED_COMMANDS:
return f"[ERROR] Command '{base_command}' not allowed"
args[0] = ALLOWED_COMMANDS[base_command]
if base_command in ['systemctl', 'journalctl']:
if not all(arg.isalnum() or arg in ['-', '_', '.'] for arg in args[1:]):
return 'Error: Invalid characters in arguments'
popen_args = args
popen_input = None
if sudo_enabled and sudo_password and base_command in ['systemctl', 'journalctl']:
popen_args = ['sudo', '-S', '-p', ''] + args
popen_input = sudo_password + '\n'
process = subprocess.Popen(
popen_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE if popen_input is not None else None,
text=True,
bufsize=1,
universal_newlines=True,
)
if popen_input is not None:
try:
process.stdin.write(popen_input)
process.stdin.flush()
except Exception:
pass
while True:
output = process.stdout.readline()
if output:
if any(code in output for code in ['\x1b[31m', '\x1b[32m', '\x1b[33m', '\x1b[34m']):
formatted_output = output.strip()
else:
formatted_output = f"[INFO] {output.strip()}"
socketio.emit('console_output', {'output': formatted_output}, room=socket_id)
socketio.sleep(0)
error = process.stderr.readline()
if error:
socketio.emit('console_output', {'output': f"[ERROR] {error.strip()}"}, room=socket_id)
socketio.sleep(0)
if output == '' and error == '' and process.poll() is not None:
break
return_code = process.poll()
if return_code != 0:
socketio.emit('console_output', {'output': f"[ERROR] Command exited with status {return_code}"}, room=socket_id)
except Exception as e:
return f"[ERROR] Error executing command: {str(e)}"
def register_console_socket_handlers(socketio):
@socketio.on('console_command')
def handle_console_command(data):
try:
if not is_authenticated():
socketio.emit('console_output', {'output': '[ERROR] Not authenticated'}, room=request.sid)
return
command = (data or {}).get('command', '').strip()
if not command:
return
sudo_password = session.get(SUDO_SESSION_KEY)
sudo_enabled = bool(session.get(AUTH_SESSION_KEY) and sudo_password)
thread = threading.Thread(
target=CommandExecutor.execute_command,
args=(socketio, command, request.sid, sudo_password, sudo_enabled),
)
thread.daemon = True
thread.start()
except Exception as e:
socketio.emit('console_output', {'output': f"Error: {str(e)}"}, room=request.sid)
@socketio.on('join_console')
def on_join_console():
if not is_authenticated():
return
socketio.emit(
'console_output',
{'output': "[SUCCESS] Connected to console. Type 'help' for available commands."},
room=request.sid,
)
@socketio.on('console_help')
def handle_console_help():
help_text = (
'[INFO] Available commands:\n'
+ '\n'.join(f"- {cmd}" for cmd in sorted(ALLOWED_COMMANDS.keys()))
+ '\n\n[WARNING] Note: All commands are executed with restricted privileges.'
)
socketio.emit('console_output', {'output': help_text}, room=request.sid)
__all__ = ['register_console_socket_handlers']