forked from cairn-dev/cairn
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinteractive_worker_manager.py
More file actions
executable file
·1610 lines (1399 loc) · 74.6 KB
/
interactive_worker_manager.py
File metadata and controls
executable file
·1610 lines (1399 loc) · 74.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import curses
import json
import logging
import os
import signal
import subprocess
import sys
import threading
import time
from curses import wrapper as curses_wrapper
from typing import List, Optional, Tuple
from datetime import datetime
from dotenv import load_dotenv
# Add the cairn_utils directory to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'cairn_utils'))
from cairn_utils.task_storage import TaskStorage
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Change to DEBUG level
handlers = [
logging.FileHandler('debug.log'),
logging.StreamHandler()
]
for handler in handlers:
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# Setup subprocess logger
subprocess_logger = logging.getLogger('subprocess_logger')
subprocess_logger.setLevel(logging.INFO)
# Create logs/subprocesses directory if it doesn't exist
os.makedirs('logs/subprocesses', exist_ok=True)
# Create handler for subprocess logger
subprocess_handler = logging.FileHandler('logs/subprocesses/subprocess.log')
subprocess_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
subprocess_logger.addHandler(subprocess_handler)
# Load environment variables
load_dotenv('.env')
logger.info("Environment variables loaded from .env")
logger.info(f"CONNECTED_REPOS from env: {os.getenv('CONNECTED_REPOS')}")
class WorkerManager:
def __init__(self):
logger.info("Initializing WorkerManager")
self.selected_agent = 0 # Initialize to 0 instead of None
self.selected_repos = [] # List of (owner, repo) tuples
self.task_description = ""
self.current_screen = "main"
self.cursor_pos = 0
self.selected_repo_idx = 0
self.active_tasks = {}
self.selected_task_id = None # Track selected task for viewing logs
self.selected_task_idx = 0 # Track selected task index in the list
self.log_scroll_pos = 0 # Track log scroll position
self.running_tasks = {} # Track worker processes instead of asyncio tasks
self.task_storage = TaskStorage()
# Parse connected repos
connected_repos_str = os.getenv("CONNECTED_REPOS", "")
logger.info(f"Raw CONNECTED_REPOS value: '{connected_repos_str}'")
self.connected_repos = self._parse_connected_repos(connected_repos_str)
logger.info(f"Parsed repositories: {self.connected_repos}")
# Get owner from first repo (assuming all repos have same owner)
self.owner = self.connected_repos[0][0] if self.connected_repos else "unknown"
logger.info(f"Owner set to: {self.owner}")
logger.info("Worker manager initialized")
def add_debug_message(self, message: str):
"""Add a debug message to display in UI"""
timestamp = time.strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
self.task_storage.add_debug_message(formatted_message)
logger.debug(message)
def _parse_connected_repos(self, repos_str: str) -> List[Tuple[str, str]]:
"""Parse the CONNECTED_REPOS string into a list of (owner, repo) tuples"""
if not repos_str:
print("Warning: CONNECTED_REPOS is empty in .env.local")
return []
repos = []
for repo_str in repos_str.split(','):
if '/' in repo_str:
owner, repo = repo_str.strip().split('/')
print(f"Found repository: {owner}/{repo}")
repos.append((owner, repo))
else:
print(f"Warning: Invalid repository format in CONNECTED_REPOS: {repo_str}")
if not repos:
print("Warning: No valid repositories found in CONNECTED_REPOS")
else:
print(f"Successfully parsed {len(repos)} repositories")
return repos
def log_worker_output(self, pipe, task_id: str, prefix: str):
"""Log output from a worker process"""
try:
for line in iter(pipe.readline, ""):
if line:
logger.info(f"[{task_id}] {prefix}: {line.strip()}")
else:
# Empty line might indicate pipe is closed
break
except (BrokenPipeError, OSError, ValueError) as e:
# Handle pipe closed during cleanup
logger.debug(f"Pipe closed for {task_id} {prefix}: {str(e)}")
except Exception as e:
logger.error(f"Error reading worker output for {task_id}: {str(e)}")
def run_worker_process(self, task_id: str) -> subprocess.Popen:
"""Run a single worker process for a task"""
try:
print(f"[DEBUG] Manager creating worker process for task {task_id}")
# Set up environment for worker
env = os.environ.copy()
# Start the worker process using the Python module approach for better reliability
print(f"[DEBUG] Manager starting subprocess for task {task_id}")
process = subprocess.Popen(
[sys.executable, "-m", "agent_worker", task_id],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1,
preexec_fn=os.setsid if hasattr(os, 'setsid') else None, # Create process group on Unix
)
# Log subprocess creation
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subprocess_logger.info(f"Subprocess started: PID={process.pid}, task_id={task_id}, command=[{sys.executable}, -m, agent_worker, {task_id}]")
print(f"[DEBUG] Manager created process PID {process.pid} for task {task_id}")
# Start threads to log output (store thread references for cleanup)
stdout_thread = threading.Thread(
target=self.log_worker_output,
args=(process.stdout, task_id, "stdout"),
daemon=False # Don't use daemon threads for proper cleanup
)
stderr_thread = threading.Thread(
target=self.log_worker_output,
args=(process.stderr, task_id, "stderr"),
daemon=False
)
stdout_thread.start()
stderr_thread.start()
# Store thread references for cleanup
if not hasattr(self, 'worker_threads'):
self.worker_threads = {}
self.worker_threads[task_id] = {
'stdout_thread': stdout_thread,
'stderr_thread': stderr_thread
}
logger.info(f"Started worker process for task {task_id}")
return process
except Exception as e:
logger.error(f"Error starting worker process for task {task_id}: {str(e)}")
# Log subprocess error
subprocess_logger.error(f"Failed to start subprocess for task {task_id}: {str(e)}")
raise
def create_task_sync(self, agent_type: str, description: str, repos: List[str], model_provider: str = "anthropic", model_name: Optional[str] = None) -> str:
"""Create a new task synchronously and start a worker process"""
try:
self.add_debug_message(f"Creating task with agent_type='{agent_type}', description='{description[:50]}...', repos={repos}")
# Generate a unique task ID
task_id = f"task_{int(time.time())}"
logger.debug(f"Generated task_id: {task_id}")
# Create payload based on agent type
if agent_type == "Fullstack Planner":
payload = {
"run_id": task_id,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"repos": repos,
"owner": self.owner,
"description": description,
"subtask_ids": [],
"agent_output": {},
"agent_status": "Queued",
"agent_type": agent_type,
"raw_logs_dump": {},
"model_provider": model_provider,
"model_name": model_name
}
else: # SWE or PM
payload = {
"run_id": task_id,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"repo": repos[0] if repos else "", # Single repo for SWE/PM
"owner": self.owner,
"description": description,
"agent_output": {},
"agent_status": "Queued",
"agent_type": agent_type,
"related_run_ids": [],
"raw_logs_dump": {},
"branch": None,
"model_provider": model_provider,
"model_name": model_name
}
logger.debug(f"Created payload: {payload}")
# Create auto-persisting payload and store in database
persistent_payload = self.task_storage.create_active_task_persistent(task_id, payload)
# Store reference to the persistent payload
self.active_tasks[task_id] = persistent_payload
self.add_debug_message(f"Added task {task_id} to active_tasks")
# Start worker process
try:
process = self.run_worker_process(task_id)
self.running_tasks[task_id] = process
self.add_debug_message(f"Started worker process for task {task_id}")
except Exception as e:
self.add_debug_message(f"Failed to start worker process: {str(e)}")
persistent_payload["agent_status"] = "Failed - Process Error"
persistent_payload["updated_at"] = time.strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Created and started task {task_id} with agent type: {agent_type}")
return task_id
except Exception as e:
error_msg = f"Error creating task: {str(e)}"
logger.error(error_msg)
self.add_debug_message(error_msg)
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return ""
def get_task(self, task_id: str) -> dict:
"""Get a task by ID, returning auto-persisting payload if available"""
# First try to get from active_tasks (which might have PersistentDict)
if task_id in self.active_tasks:
return self.active_tasks[task_id]
# If not in memory, try to get from database as PersistentDict
persistent_task = self.task_storage.get_active_task_persistent(task_id)
if persistent_task:
# Cache it in active_tasks for future access
self.active_tasks[task_id] = persistent_task
return persistent_task
# Fallback to regular dict from database
return self.task_storage.get_active_task(task_id) or {}
def get_task_status(self, task_id: str) -> Optional[dict]:
"""Get the status of a task (backward compatibility method)"""
return self.get_task(task_id)
def list_tasks(self) -> list:
"""List all active tasks"""
# Get all tasks from database (includes both in-memory and persisted)
all_tasks = self.task_storage.get_all_active_tasks()
# Update our in-memory cache with PersistentDict for any missing tasks
for task_id in all_tasks:
if task_id not in self.active_tasks:
self.active_tasks[task_id] = self.task_storage.get_active_task_persistent(task_id)
# Return the original payload dictionaries (they already have correct format)
return list(all_tasks.values())
def cleanup(self):
"""Clean up running tasks"""
try:
logger.info("Starting cleanup of running tasks...")
# Terminate any running worker processes
for task_id, process in list(self.running_tasks.items()):
try:
if process.poll() is None:
logger.info(f"Terminating worker process for task {task_id} (PID: {process.pid})")
# Try to terminate the process group (kills child processes too)
try:
if hasattr(os, 'killpg'):
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
else:
process.terminate()
# Log subprocess termination
subprocess_logger.info(f"Subprocess termination requested: PID={process.pid}, task_id={task_id}, signal=SIGTERM")
except (OSError, ProcessLookupError):
# Process might already be dead
process.terminate()
subprocess_logger.info(f"Subprocess termination fallback: PID={process.pid}, task_id={task_id}, signal=SIGTERM")
# Wait for clean termination
try:
process.wait(timeout=5)
logger.info(f"Cleanly terminated worker process for task {task_id}")
subprocess_logger.info(f"Subprocess terminated cleanly: PID={process.pid}, task_id={task_id}")
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate gracefully
logger.warning(f"Force killing worker process for task {task_id}")
subprocess_logger.warning(f"Subprocess force kill required: PID={process.pid}, task_id={task_id}")
try:
if hasattr(os, 'killpg'):
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
else:
process.kill()
process.wait(timeout=2)
subprocess_logger.info(f"Subprocess force killed: PID={process.pid}, task_id={task_id}, signal=SIGKILL")
except (OSError, ProcessLookupError, subprocess.TimeoutExpired):
# Process is already dead or we can't kill it
subprocess_logger.warning(f"Subprocess kill failed or process already dead: PID={process.pid}, task_id={task_id}")
pass
# Close pipes to prevent resource leaks
try:
if process.stdout and not process.stdout.closed:
process.stdout.close()
if process.stderr and not process.stderr.closed:
process.stderr.close()
if process.stdin and not process.stdin.closed:
process.stdin.close()
except Exception as e:
logger.warning(f"Error closing pipes for task {task_id}: {e}")
except Exception as e:
logger.error(f"Error cleaning up task {task_id}: {str(e)}")
# Wait for worker threads to finish
if hasattr(self, 'worker_threads'):
for task_id, threads in self.worker_threads.items():
try:
logger.info(f"Waiting for worker threads for task {task_id} to finish...")
threads['stdout_thread'].join(timeout=3)
threads['stderr_thread'].join(timeout=3)
except Exception as e:
logger.warning(f"Error joining worker threads for task {task_id}: {e}")
self.worker_threads.clear()
# Clear running tasks
self.running_tasks.clear()
logger.info("Cleanup completed successfully")
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
def draw_agent_selection_screen(self, stdscr):
"""Draw the agent selection screen"""
height, width = stdscr.getmaxyx()
# Title
title = "Select Agent Type"
stdscr.addstr(0, (width - len(title)) // 2, title, curses.A_BOLD)
# Options
options = [
"1. Fullstack Planner",
"2. PM (Project Manager)",
"3. SWE (Software Engineer)",
"q. Quit"
]
for idx, option in enumerate(options, start=2):
if idx - 2 == self.selected_agent:
stdscr.addstr(idx, 2, option, curses.A_REVERSE)
else:
stdscr.addstr(idx, 2, option)
# Instructions
instructions = "Use arrow keys to select, Enter to confirm, 'q' to quit"
stdscr.addstr(height - 1, 0, instructions[:width - 1])
def draw_repo_selection_screen(self, stdscr):
"""Draw the repository selection screen"""
height, width = stdscr.getmaxyx()
# Title
title = "Select Repository"
stdscr.addstr(0, (width - len(title)) // 2, title, curses.A_BOLD)
# Instructions based on agent type
if self.selected_agent == 0: # Fullstack Planner
instructions = "Use arrow keys to navigate, Space to select multiple, Enter to confirm, 'b' to go back"
else: # PM or SWE
instructions = "Use arrow keys to navigate, Space to select (one only), Enter to confirm, 'b' to go back"
stdscr.addstr(2, 2, instructions[:width - 4])
# Check if we have any repositories
if not self.connected_repos:
error_msg = "No repositories configured. Please set CONNECTED_REPOS in .env.local"
stdscr.addstr(4, 2, error_msg, curses.color_pair(2))
stdscr.addstr(6, 2, "Press 'b' to go back")
return
# Repository list
for idx, (owner, repo) in enumerate(self.connected_repos, start=4):
prefix = "[x] " if (owner, repo) in self.selected_repos else "[ ] "
repo_str = f"{owner}/{repo}"
if idx - 4 == self.selected_repo_idx:
stdscr.addstr(idx, 2, f"{prefix}{repo_str}", curses.A_REVERSE)
else:
stdscr.addstr(idx, 2, f"{prefix}{repo_str}")
def draw_task_description_screen(self, stdscr):
"""Draw the task description input screen"""
height, width = stdscr.getmaxyx()
# Title
title = "Enter Task Description"
stdscr.addstr(0, (width - len(title)) // 2, title, curses.A_BOLD)
# Instructions
instructions = "Type your task description (press Enter for new line, Enter twice quickly when done):"
stdscr.addstr(2, 2, instructions[:width - 4])
# Show current description with cursor
desc_lines = self.task_description.split('\n')
max_lines = height - 8 # Leave space for header, instructions, and footer
# Display visible lines with wrapping
current_display_line = 4
wrapped_lines = []
# Process each line of the description
for line in desc_lines:
# Wrap long lines
while line:
if current_display_line >= height - 4: # Leave space for footer
break
# Calculate how much of the line we can display
display_length = min(len(line), width - 4)
wrapped_lines.append(line[:display_length])
line = line[display_length:]
current_display_line += 1
# Display the wrapped lines
for i, line in enumerate(wrapped_lines):
if i >= max_lines:
break
stdscr.addstr(4 + i, 2, line)
# Calculate cursor position in wrapped text
cursor_line = 0
cursor_col = 0
remaining_pos = self.cursor_pos
# Find the correct line and column for the cursor in wrapped text
current_pos = 0
for i, line in enumerate(desc_lines):
line_length = len(line)
if remaining_pos <= current_pos + line_length:
# Cursor is in this line
relative_pos = remaining_pos - current_pos
# Calculate which wrapped line contains the cursor
wrapped_line_index = relative_pos // (width - 4)
cursor_line = i + wrapped_line_index
cursor_col = relative_pos % (width - 4)
break
current_pos += line_length + 1 # +1 for newline
# Show cursor at the right position
if cursor_line < max_lines:
try:
stdscr.move(4 + cursor_line, 2 + cursor_col)
except curses.error:
pass # Ignore cursor positioning errors
# Show selected repositories
repo_start_line = 4 + min(len(wrapped_lines), max_lines) + 1
stdscr.addstr(repo_start_line, 2, "Selected repositories:")
for i, repo in enumerate(self.selected_repos):
repo_str = f"{repo[0]}/{repo[1]}"
if len(repo_str) > width - 6:
repo_str = repo_str[:width - 9] + "..."
stdscr.addstr(repo_start_line + 1 + i, 4, repo_str)
# Draw footer
footer = "Use arrow keys to navigate, Enter for new line, Enter twice quickly to submit"
stdscr.addstr(height - 1, 0, footer[:width - 1])
def draw_main_screen(self, stdscr):
"""Draw the main screen"""
height, width = stdscr.getmaxyx()
# Draw header
stdscr.addstr(0, 0, "Worker Manager", curses.A_BOLD)
stdscr.addstr(1, 0, "Press 'n' to create new task, 'd' to delete task, 'q' to quit")
# Draw debug messages
debug_start_line = 2
stdscr.addstr(debug_start_line, 0, "Debug Messages:", curses.A_UNDERLINE)
debug_messages = self.task_storage.get_debug_messages(5) # Get last 5 debug messages
for i, msg in enumerate(debug_messages):
if debug_start_line + 1 + i < height - 5: # Leave space for tasks and footer
stdscr.addstr(debug_start_line + 1 + i, 0, msg[:width - 1])
# Draw task list
task_start_line = debug_start_line + 7
tasks = self.list_tasks()
if not tasks:
stdscr.addstr(task_start_line, 0, "No tasks found")
else:
stdscr.addstr(task_start_line - 1, 0, "Active Tasks:", curses.A_UNDERLINE)
current_line = task_start_line
for i, task in enumerate(tasks):
if current_line >= height - 2: # Don't draw beyond screen
break
# Draw task selection indicator
if i == self.selected_task_idx:
stdscr.addstr(current_line, 0, "> ", curses.A_BOLD)
else:
stdscr.addstr(current_line, 0, " ")
# Draw task info
status = task.get("agent_status", "unknown")
status_color = curses.A_NORMAL
if status == "Completed":
status_color = curses.A_BOLD | curses.color_pair(1) # Green
elif status == "Failed":
status_color = curses.A_BOLD | curses.color_pair(2) # Red
elif status == "Running":
status_color = curses.A_BOLD | curses.color_pair(3) # Yellow
# Get PR URL if available
pr_url = None
if task.get("agent_output", {}).get("pull_request_url"):
pr_url = task["agent_output"]["pull_request_url"]
elif task.get("agent_output", {}).get("pr_url"):
pr_url = task["agent_output"]["pr_url"]
# Format task info
task_info = f"{task['run_id']} - {status}"
if len(task_info) > width - 45:
task_info = task_info[:width - 48] + "..."
stdscr.addstr(current_line, 2, task_info, status_color)
# Draw description
desc = task.get("description", "")
if len(desc) > width - 45:
desc = desc[:width - 48] + "..."
stdscr.addstr(current_line, 40, desc)
current_line += 1
# Draw PR URL on next line if available
if pr_url and current_line < height - 2:
pr_indent = " " # 4 spaces indentation
pr_text = f"{pr_indent}PR: {pr_url}"
if len(pr_text) > width - 2:
pr_text = pr_text[:width - 5] + "..."
stdscr.addstr(current_line, 0, pr_text, curses.A_UNDERLINE)
current_line += 1
# Draw footer
footer = "Use arrow keys to navigate, Enter to select, 'd' to delete, 'q' to quit"
stdscr.addstr(height - 1, 0, footer[:width - 1])
def draw_task_screen(self, stdscr):
"""Draw the task details screen"""
try:
# Clear the entire screen first
stdscr.clear()
height, width = stdscr.getmaxyx()
# Get selected task
tasks = self.list_tasks()
if not tasks or self.selected_task_idx >= len(tasks):
self.current_screen = "main"
return
task = tasks[self.selected_task_idx]
current_line = 0
# Draw header with task ID
stdscr.addstr(current_line, 0, f"Task: {task['run_id']}", curses.A_BOLD)
current_line += 1
stdscr.addstr(current_line, 0, "Press 'b' to go back, 'l' to view logs")
current_line += 2
# Draw status prominently with color
status = task.get('agent_status', 'unknown')
status_color = curses.A_NORMAL
if status == "Completed":
status_color = curses.A_BOLD | curses.color_pair(1) # Green
elif status == "Failed":
status_color = curses.A_BOLD | curses.color_pair(2) # Red
elif status == "Running":
status_color = curses.A_BOLD | curses.color_pair(3) # Yellow
stdscr.addstr(current_line, 0, "Status:", curses.A_BOLD)
stdscr.addstr(current_line, 8, status, status_color)
current_line += 1
# Draw PR URL if available
pr_url = None
if task.get("agent_output", {}).get("pull_request_url"):
pr_url = task["agent_output"]["pull_request_url"]
elif task.get("agent_output", {}).get("pr_url"):
pr_url = task["agent_output"]["pr_url"]
if pr_url:
stdscr.addstr(current_line, 0, "PR URL:", curses.A_BOLD)
# Truncate URL if too long
display_url = pr_url
if len(display_url) > width - 10:
display_url = display_url[:width - 13] + "..."
stdscr.addstr(current_line, 8, display_url, curses.A_UNDERLINE)
current_line += 1
# Draw pull request message if available
pr_message = None
if task.get("agent_output", {}).get("pull_request_message"):
pr_message = task["agent_output"]["pull_request_message"]
elif task.get("agent_output", {}).get("pr_message"):
pr_message = task["agent_output"]["pr_message"]
if pr_message:
stdscr.addstr(current_line, 0, "Pull Request Message:", curses.A_BOLD)
current_line += 1
# Format and wrap the PR message
message_lines = pr_message.split('\n')
for line in message_lines:
# Handle long lines by wrapping them
while line:
if current_line >= height - 2: # Leave space for footer
break
# Calculate how much of the line we can display
display_length = min(len(line), width - 2)
stdscr.addstr(current_line, 2, line[:display_length])
line = line[display_length:]
current_line += 1
if current_line >= height - 2:
break
# Draw subtasks for Fullstack Planner tasks
if task.get('agent_type') == "Fullstack Planner" and task.get('agent_output'):
output = task['agent_output']
if output.get('list_of_subtasks') and output.get('list_of_subtask_titles'):
current_line += 1
stdscr.addstr(current_line, 0, "Generated Subtasks:", curses.A_BOLD)
current_line += 1
# Calculate total content height
total_content_height = 0
for i, (title, subtask) in enumerate(zip(output['list_of_subtask_titles'], output['list_of_subtasks']), 1):
# Count lines for title
total_content_height += 1
# Count lines for description
subtask_lines = subtask.split('\n')
for line in subtask_lines:
# Count wrapped lines
total_content_height += (len(line) + width - 7) // (width - 6)
# Count lines for difficulty and assignee
total_content_height += 2
# Add space between subtasks
total_content_height += 1
# Calculate visible area
visible_height = height - current_line - 2 # Leave space for footer
# Calculate scroll position
max_scroll = max(0, total_content_height - visible_height)
self.task_scroll_pos = min(max(0, self.task_scroll_pos), max_scroll)
# Show subtasks with numbers and titles
current_content_line = 0
for i, (title, subtask) in enumerate(zip(output['list_of_subtask_titles'], output['list_of_subtasks']), 1):
# Skip if this subtask is above the visible area
if current_content_line < self.task_scroll_pos:
# Count lines for this subtask
subtask_lines = subtask.split('\n')
for line in subtask_lines:
current_content_line += (len(line) + width - 7) // (width - 6)
current_content_line += 3 # Title, difficulty, assignee
current_content_line += 1 # Space between subtasks
continue
# Stop if we've reached the bottom of the screen
if current_line >= height - 2:
break
# Show subtask number and title
subtask_header = f"{i}. {title}"
if i == self.selected_subtask_idx + 1: # +1 because i starts at 1
stdscr.addstr(current_line, 2, subtask_header, curses.A_REVERSE)
else:
stdscr.addstr(current_line, 2, subtask_header, curses.A_BOLD)
current_line += 1
current_content_line += 1
# Show subtask description
subtask_lines = subtask.split('\n')
for line in subtask_lines:
if current_line >= height - 2:
break
# Wrap long lines
while line:
display_length = min(len(line), width - 6)
stdscr.addstr(current_line, 4, line[:display_length])
line = line[display_length:]
current_line += 1
current_content_line += 1
if current_line >= height - 2:
break
# Show difficulty if available
if output.get('assessment_of_subtask_difficulty') and i <= len(output['assessment_of_subtask_difficulty']):
difficulty = output['assessment_of_subtask_difficulty'][i-1]
if current_line < height - 2:
stdscr.addstr(current_line, 4, f"Difficulty: {difficulty}")
current_line += 1
current_content_line += 1
# Show assignee info
if current_line < height - 2:
# Default to Agent for all subtasks since they can be handled by agents
assignee = "Agent"
stdscr.addstr(current_line, 4, f"Assignee: {assignee}")
current_line += 1
current_content_line += 1
current_line += 1 # Add space between subtasks
current_content_line += 1
# Add instructions for creating PM tasks
if current_line < height - 2:
current_line += 1
stdscr.addstr(current_line, 0, "Use arrow keys to select a subtask, 'p' to create PM task for selected subtask", curses.A_BOLD)
current_line += 1
# Draw other task details in a more organized way
details_start = current_line + 1
if details_start < height - 2:
stdscr.addstr(details_start, 0, "Task Details:", curses.A_BOLD)
details_start += 1
# Agent Type
if details_start < height - 2:
stdscr.addstr(details_start, 2, f"Agent Type: {task.get('agent_type', 'unknown')}")
details_start += 1
# Created/Updated times
if details_start < height - 2:
stdscr.addstr(details_start, 2, f"Created: {task.get('created_at', 'unknown')}")
details_start += 1
if details_start < height - 2:
stdscr.addstr(details_start, 2, f"Updated: {task.get('updated_at', 'unknown')}")
details_start += 1
# Description
if details_start < height - 2:
desc = task.get('description', '')
if desc:
stdscr.addstr(details_start, 2, "Description:", curses.A_BOLD)
details_start += 1
# Split description into lines and display each line
desc_lines = desc.split('\n')
for line in desc_lines:
if details_start >= height - 2:
break
# Wrap long lines
while line:
display_length = min(len(line), width - 4)
stdscr.addstr(details_start, 4, line[:display_length])
line = line[display_length:]
details_start += 1
if details_start >= height - 2:
break
# Repository info
if details_start < height - 2:
if task.get("repos"): # Fullstack Planner
stdscr.addstr(details_start, 2, "Repositories:")
details_start += 1
for repo in task["repos"]:
if details_start < height - 2:
stdscr.addstr(details_start, 4, repo)
details_start += 1
elif task.get("repo"): # SWE/PM
stdscr.addstr(details_start, 2, f"Repository: {task['repo']}")
details_start += 1
# Branch info
if details_start < height - 2 and task.get("branch"):
stdscr.addstr(details_start, 2, f"Branch: {task['branch']}")
details_start += 1
# Draw footer
footer = "Press 'b' to go back, 'l' to view logs, ↑/↓ to scroll"
try:
stdscr.addstr(height - 1, 0, footer[:width - 1])
except curses.error:
pass # Ignore if we can't write the footer
# Refresh the screen
stdscr.refresh()
except Exception as e:
logger.error(f"Error drawing task screen: {str(e)}")
self.add_debug_message(f"Error displaying task: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Return to main screen on error
self.current_screen = "main"
def draw_log_screen(self, stdscr):
"""Draw the log screen for a selected task"""
height, width = stdscr.getmaxyx()
stdscr.clear()
# Get the selected task
task_id = self.selected_task
task = self.task_storage.get_active_task(task_id) or {}
# Draw header
stdscr.addstr(0, 0, f"Logs for Task: {task_id}", curses.A_BOLD)
stdscr.addstr(1, 0, "Press 'b' to go back, 'up/down' to scroll")
# Prepare formatted output
output_lines = []
# Get logs from database
try:
# This is correct, we want logs for the task_id
database_logs = self.task_storage.get_all_logs_for_task(task_id)
if database_logs:
output_lines.append("=== LOGS FROM DATABASE ===")
for log_entry in database_logs:
output_lines.append(f"Run ID: {log_entry['run_id']}")
output_lines.append(f"Agent Type: {log_entry['agent_type']}")
output_lines.append(f"Created: {log_entry['created_at']}")
output_lines.append(f"Updated: {log_entry['updated_at']}")
output_lines.append("Log Data:")
try:
formatted_logs = json.dumps(log_entry['log_data'], indent=2, ensure_ascii=False)
output_lines.extend(formatted_logs.split('\n'))
except (TypeError, ValueError):
output_lines.append(str(log_entry['log_data']))
output_lines.append("") # Empty line separator
output_lines.append("-" * 50) # Visual separator
output_lines.append("")
else:
output_lines.append("=== LOGS FROM DATABASE ===")
output_lines.append(f"No logs found in database for task: {task_id}")
output_lines.append("")
except Exception as e:
output_lines.append("=== LOGS FROM DATABASE ===")
output_lines.append(f"Error retrieving logs from database: {str(e)}")
output_lines.append("")
# Add agent output section if available
agent_output = task.get("agent_output", {})
if agent_output:
output_lines.append("=== AGENT OUTPUT (from payload) ===")
try:
formatted_output = json.dumps(agent_output, indent=2, ensure_ascii=False)
output_lines.extend(formatted_output.split('\n'))
except (TypeError, ValueError):
# Fallback if JSON serialization fails
output_lines.append(str(agent_output))
output_lines.append("") # Empty line separator
# Add raw logs dump section if available (legacy)
raw_logs = task.get("raw_logs_dump", {})
if raw_logs:
output_lines.append("=== RAW LOGS DUMP (from payload) ===")
try:
formatted_logs = json.dumps(raw_logs, indent=2, ensure_ascii=False)
output_lines.extend(formatted_logs.split('\n'))
except (TypeError, ValueError):
# Fallback if JSON serialization fails
output_lines.append(str(raw_logs))
output_lines.append("")
# If no output or logs, show a message
if not output_lines or all("not found" in line for line in output_lines if line):
output_lines = ["No logs or output available for this task"]
# Handle scrolling
visible_lines = height - 3
start_line = max(0, min(self.log_scroll_pos, len(output_lines) - visible_lines))
end_line = min(len(output_lines), start_line + visible_lines)
# Display the formatted lines
for i, line_idx in enumerate(range(start_line, end_line)):
line = output_lines[line_idx]
# Truncate line if too long for screen
if len(line) > width - 2:
line = line[:width - 5] + "..."
try:
stdscr.addstr(i + 3, 0, line)
except curses.error:
# Handle cases where we can't write to screen
pass
# Show scroll indicator if there are more lines
if len(output_lines) > visible_lines:
scroll_info = f"[{start_line + 1}-{end_line} of {len(output_lines)} lines]"
try:
stdscr.addstr(height - 1, width - len(scroll_info) - 1, scroll_info, curses.A_DIM)
except curses.error:
pass
stdscr.refresh()
def handle_task_description_input(self, stdscr, key: int) -> bool:
"""Handle input for task description screen"""
try:
if key == curses.KEY_BACKSPACE or key == 127:
if self.cursor_pos > 0:
# Handle backspace across multiple lines
lines = self.task_description.split('\n')
current_line = 0
remaining_pos = self.cursor_pos
# Find the current line
for i, line in enumerate(lines):
line_length = len(line)
if remaining_pos <= line_length:
current_line = i
break
remaining_pos -= line_length + 1
current_col = remaining_pos - 1
if current_col == 0 and current_line > 0:
# Backspace at start of line - merge with previous line
prev_line = lines[current_line - 1]
current_line_text = lines[current_line]
lines[current_line - 1] = prev_line + current_line_text
lines.pop(current_line)
else:
# Normal backspace within line
line = lines[current_line]
lines[current_line] = line[:current_col] + line[current_col + 1:]
self.task_description = '\n'.join(lines)
self.cursor_pos -= 1
elif key == curses.KEY_LEFT:
if self.cursor_pos > 0:
self.cursor_pos -= 1
elif key == curses.KEY_RIGHT:
if self.cursor_pos < len(self.task_description):
self.cursor_pos += 1
elif key == curses.KEY_UP:
# Move cursor up one line
lines = self.task_description.split('\n')
current_line = 0
remaining_pos = self.cursor_pos
# Find current line
for i, line in enumerate(lines):
line_length = len(line)
if remaining_pos <= line_length:
current_line = i
break
remaining_pos -= line_length + 1
current_col = remaining_pos - 1
if current_line > 0:
# Move to previous line
prev_line_length = len(lines[current_line - 1])
self.cursor_pos -= (current_col + 1) # Move to end of current line
self.cursor_pos -= 1 # Move past newline
self.cursor_pos -= prev_line_length # Move to start of previous line
self.cursor_pos += min(current_col, prev_line_length) # Move to same column
elif key == curses.KEY_DOWN:
# Move cursor down one line
lines = self.task_description.split('\n')
current_line = 0
remaining_pos = self.cursor_pos
# Find current line
for i, line in enumerate(lines):
line_length = len(line)
if remaining_pos <= line_length:
current_line = i
break