-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathagent-aider-worktree
More file actions
executable file
·967 lines (770 loc) · 39.2 KB
/
agent-aider-worktree
File metadata and controls
executable file
·967 lines (770 loc) · 39.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
#!/usr/bin/env python3
import argparse
import os
import re
import subprocess
import sys
import traceback
from datetime import datetime
from pathlib import Path
from shlex import quote
from types import SimpleNamespace
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.table import Table
# Initialize Rich console
console = Console()
# Constants
# DEFAULT_MODEL = "openrouter/deepseek/deepseek-r1-distill-qwen-32b"
# WEAK_MODEL = "openrouter/google/gemini-2.0-flash-001"
DEFAULT_MODEL = "r1"
DEFAULT_WEAK_MODEL = "deepseek"
COMMAND_TIMEOUT = 300 # 5 minute timeout
# Command execution utilities
def run_command(cmd, cwd=None, capture_output=True, check=True, env=None, shell=False):
"""Run a shell command and return the result."""
try:
# Create a copy of the current environment if needed
command_env = os.environ.copy()
if env:
command_env.update(env)
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]Running:[/bold blue] {task.description}"),
TimeElapsedColumn(),
transient=True,
) as progress:
task = progress.add_task(f"[cyan]{cmd}[/cyan]", total=None)
# Use shell=True for complex commands, otherwise use command array
command = cmd if shell else (cmd.split() if isinstance(cmd, str) else cmd)
result = subprocess.run(
command,
cwd=cwd,
text=True,
capture_output=capture_output,
check=check,
env=command_env,
timeout=COMMAND_TIMEOUT,
shell=shell,
universal_newlines=True
)
progress.update(task, completed=True)
return result
except subprocess.TimeoutExpired:
console.print(f"[bold red]Command timed out:[/bold red] {cmd}")
return subprocess.CompletedProcess([], 1, "", "Command timed out")
except subprocess.CalledProcessError as e:
if capture_output:
console.print(f"[bold red]Command failed:[/bold red] {cmd}")
console.print(f"[red]Error:[/red] {e.stderr}")
# Return a CompletedProcess with error info instead of exception object
return subprocess.CompletedProcess([], 1, "", str(e))
# Git repository utilities
def _update_gitignore(worktree_path):
"""Add generated files to .gitignore to prevent merge conflicts."""
gitignore_path = os.path.join(worktree_path, ".gitignore")
# Files that should be ignored
files_to_ignore = [
"context.txt",
"custom_aider_history.md",
"aider_history.md",
".aider*"
]
# Read existing .gitignore if it exists
existing_ignores = set()
if os.path.exists(gitignore_path):
with open(gitignore_path, 'r') as f:
existing_ignores = set(line.strip() for line in f.readlines())
# Add new entries if they don't exist
with open(gitignore_path, 'a') as f:
for file_pattern in files_to_ignore:
if file_pattern not in existing_ignores:
f.write(f"\n{file_pattern}")
# Stage the .gitignore changes
run_command("git add .gitignore", cwd=worktree_path, check=False)
run_command("git commit -m 'Update .gitignore to exclude generated files'",
cwd=worktree_path, check=False)
def get_repo_name(repo_path):
"""Get the repository name from the git remote URL or directory name."""
result = run_command("git remote get-url origin", cwd=repo_path, check=False)
if result.returncode != 0:
# Fallback to directory name if git remote fails
return os.path.basename(os.path.abspath(repo_path))
remote_url = result.stdout.strip()
# Extract repo name from URL (works for both HTTPS and SSH URLs)
repo_name = remote_url.split('/')[-1].replace('.git', '')
return repo_name
def sanitize_name(name):
"""Create a safe name for branches and directories."""
safe_name = re.sub(r'[^\w\s-]', '_', name).strip()
safe_name = re.sub(r'[-\s]+', '_', safe_name)
return safe_name[:50] # Limit length
def create_worktree(repo_path, task_name):
"""Create a new git worktree for the task."""
repo_name = get_repo_name(repo_path)
worktree_base = os.path.expanduser(f"~/worktrees/{repo_name}")
# Create the base directory if it doesn't exist
os.makedirs(worktree_base, exist_ok=True)
# Create a sanitized directory name from the task
safe_task_name = sanitize_name(task_name)
# Add timestamp to make it unique
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
worktree_path = os.path.join(worktree_base, f"{safe_task_name}_{timestamp}")
# Get the current branch
result = run_command("git branch --show-current", cwd=repo_path)
current_branch = result.stdout.strip() or "main" # Default to main if empty
# Create a new branch for the task
branch_name = f"task/{safe_task_name}_{timestamp}"
console.print(f"[bold green]Creating new branch:[/bold green] {branch_name}")
run_command(f"git branch {branch_name}", cwd=repo_path)
# Create the worktree
console.print(f"[bold green]Creating worktree at:[/bold green] {worktree_path}")
result = run_command(f"git worktree add {worktree_path} {branch_name}", cwd=repo_path)
if result.returncode != 0:
console.print("[bold red]Failed to create worktree. Falling back to clone method.[/bold red]")
# Fallback to clone and checkout
run_command(f"git clone {repo_path} {worktree_path}", check=False)
run_command(f"git checkout -b {branch_name}", cwd=worktree_path, check=False)
# Add generated files to .gitignore
_update_gitignore(worktree_path)
return worktree_path, branch_name, current_branch
# Testing utilities
def run_tests(worktree_path):
"""Run pytest and return True if all tests pass."""
console.print(Panel("[bold]Running Tests[/bold]", style="blue"))
# Check if pytest is installed
pytest_check = run_command("which pytest", shell=True, check=False)
if pytest_check.returncode != 0:
console.print("[bold red]Error: pytest not found. Please install pytest.[/bold red]")
return False
result = run_command("pytest", cwd=worktree_path, check=False)
if result.returncode == 0:
console.print(Panel("[bold]All Tests Passed![/bold]", style="green"))
return True
else:
console.print(Panel("[bold]Tests Failed[/bold]", style="red"))
if result.stdout:
console.print(Panel.fit(
"[bold]Test Output:[/bold]\n" + "\n".join(result.stdout.split("\n")[-10:]),
style="red"
))
return False
# Context generation
def generate_context(worktree_path):
"""Generate context.txt with test results and linting information."""
console.print(Panel("[bold]Generating context.txt[/bold]", style="blue"))
context_file = os.path.join(worktree_path, "context.txt")
with open(context_file, 'w') as f:
# Start with date header
f.write(f"============={datetime.now().strftime('%Y-%m-%d %H:%M:%S')}===================\n\n")
# Add test results
_add_test_results_to_context(f, worktree_path)
# Add separator
f.write("\n\n=====================================================================\n")
# Add linting information
_add_linting_to_context(f, worktree_path)
# Add focus directive
f.write("\nFocus on fixing bugs and improving code quality\n")
# Add test file samples
_add_test_samples_to_context(f, worktree_path)
console.print("[dim]Context file generated[/dim]")
def _add_test_results_to_context(f, worktree_path):
"""Add test results to the context file."""
tests_dir = os.path.join(worktree_path, "tests")
if os.path.exists(tests_dir) and os.path.isdir(tests_dir):
# Check if pytest is installed
pytest_check = run_command("which pytest", shell=True, check=False)
if pytest_check.returncode != 0:
console.print("[dim]pytest not found, skipping test execution[/dim]")
f.write("pytest not installed, skipping test execution\n")
return
console.print("[dim]Running tests for context...[/dim]")
result = run_command(
"python -m pytest --timeout=10",
cwd=worktree_path,
check=False,
shell=True
)
# Write up to 200 lines of test output
test_output = (result.stdout or "").strip().split('\n')[-200:]
f.write('\n'.join(test_output) + '\n')
else:
console.print("[dim]No tests directory found, skipping test execution[/dim]")
f.write("No tests directory found\n")
def _add_linting_to_context(f, worktree_path):
"""Add linting information to the context file."""
result = run_command("git ls-files '*.py' | head -10", cwd=worktree_path, shell=True)
py_files = result.stdout.strip().split('\n')
if py_files and py_files[0]:
console.print("[dim]Running pylint for context...[/dim]")
lint_output = []
for py_file in py_files[:5]: # Limit to first 5 files
if py_file.strip():
# Only focus on important issues (errors and warnings), ignore style/convention issues
lint_result = run_command(
f"pylint {py_file} --disable=C0303,C0111,C0103,C0301,C0411,C0412,C0413,W0311",
cwd=worktree_path,
check=False
)
if lint_result.stdout:
lint_output.extend(lint_result.stdout.strip().split('\n')[-20:])
# Write lint output
if lint_output:
f.write('\n'.join(lint_output) + '\n')
f.write("\nNote: Minor style issues like trailing whitespace, import order, and line length are ignored.\n")
else:
f.write("No significant linting issues found\n")
else:
console.print("[dim]No Python files found, skipping linting[/dim]")
f.write("No Python files found for linting\n")
def _add_test_samples_to_context(f, worktree_path):
"""Add test file samples to the context file."""
tests_dir = os.path.join(worktree_path, "tests")
if os.path.exists(tests_dir) and os.path.isdir(tests_dir):
test_files = [f for f in os.listdir(tests_dir) if f.endswith('.py')][:3]
if test_files:
f.write("\n\n# Sample test files:\n")
for test_file in test_files:
test_file_path = os.path.join(tests_dir, test_file)
f.write(f"\n# {test_file}\n")
try:
with open(test_file_path, 'r') as tf:
content = tf.read(50000) # Read at most 50KB
f.write(content)
if len(content) == 50000:
f.write("\n# ... (content truncated due to size)")
except Exception as e:
f.write(f"\n# Error reading file: {str(e)}")
# Aider integration
def run_aider(worktree_path, task, args, model=None, weak_model=None, inner_loop_count=3):
"""Run aider with the given task."""
# Use args.model if model parameter is not provided
if model is None:
model = args.model
# Use args.weak_model if weak_model parameter is not provided
if weak_model is None:
weak_model = args.weak_model
# Get the list of files before running aider
before_files = set(run_command("git ls-files", cwd=worktree_path).stdout.strip().split('\n'))
# Get the git status before running aider
before_status = run_command("git status --porcelain", cwd=worktree_path).stdout
console.print(Panel(f"[bold]Running aider with task:[/bold]\n{task}", style="cyan"))
# Format code before running aider
console.print("[dim]Running black formatter...[/dim]")
run_command("black .", cwd=worktree_path, check=False)
# Setup files and arguments
instruction_file = os.path.expanduser("~/dotfiles/instruction.md")
context_file = os.path.join(worktree_path, "context.txt")
history_file = os.path.join(worktree_path, "custom_aider_history.md")
# Prepare read arguments
read_args = []
if os.path.exists(instruction_file):
read_args.append(f"--read {instruction_file}")
if os.path.exists(context_file):
read_args.append(f"--read {context_file}")
# Add user-specified files to read
for read_file in args.read:
read_args.append(f"--read {quote(read_file)}")
# Ensure we have Python files to work with
files_to_include = _prepare_files_for_aider(worktree_path, args)
# Create the message
message = f"TDD! Create tests first for all features or functionality you want to implement or modify if they do not already exist. Improve code quality and fix any issues found in tests or linting. Keep it very simple. Please do the minimal, cleanest, low complexity of the task possible. Please also keep the tests simple. Both tests and tasks do actually need to be implemented, do not just mock. Task: {task}"
# Build the aider command
aider_cmd = (
f"aider --architect "
f"--model {model} "
f"--weak-model {weak_model} "
f"{' '.join(read_args)} "
f"--yes-always "
f"--no-show-model-warnings "
f"--no-show-release-notes "
f"--chat-history-file {quote(history_file)} "
f"--restore-chat-history "
f"--edit-format diff "
f"--auto-lint --lint-cmd \"pylint --disable=C0303,C0111,C0103,C0301,C0411,C0412,C0413,W0311\" "
f"--auto-test --test-cmd pytest "
f"--message {quote(message)} "
f"{' '.join(files_to_include)}"
)
# Run aider with proper environment
console.print(f"[dim]Running aider in {worktree_path}[/dim]")
# Get the original working directory
original_cwd = os.getcwd()
# Change to worktree path before running aider
os.chdir(worktree_path)
process = subprocess.Popen(
aider_cmd,
text=True,
shell=True,
env={**os.environ, "PYTHONUNBUFFERED": "1", "PYTHONINSPECT": "0"}
)
try:
process.wait()
finally:
# Always restore original working directory
os.chdir(original_cwd)
console.print(f"[bold yellow]Aider session completed[/bold yellow]")
# Stage all changes to avoid unstaged changes
run_command("git add -A", cwd=worktree_path, check=False)
# Get the list of files after running aider
after_files = set(run_command("git ls-files", cwd=worktree_path).stdout.strip().split('\n'))
# Get the git status after running aider
after_status = run_command("git status --porcelain", cwd=worktree_path).stdout
# Show new files
new_files = after_files - before_files
if new_files:
console.print("[bold green]New files created:[/bold green]")
for file in sorted(new_files):
console.print(f" - [green]{file}[/green]")
# Show modified files using git status
if after_status:
console.print("[bold yellow]Changes detected:[/bold yellow]")
for line in after_status.strip().split('\n'):
if line.strip():
console.print(f" - [yellow]{line}[/yellow]")
# Determine if terminal supports color
color_flag = "--color=always" if console.is_terminal else "--color=never"
# Show git diff for each modified file
console.print("[bold yellow]File changes (git diff):[/bold yellow]")
diff_result = run_command(f"git diff --staged {color_flag}", cwd=worktree_path, check=False)
if diff_result.stdout.strip():
console.print(diff_result.stdout)
# Also show unstaged changes
unstaged_diff = run_command(f"git diff {color_flag}", cwd=worktree_path, check=False)
if unstaged_diff.stdout.strip():
console.print("[bold yellow]Unstaged changes:[/bold yellow]")
console.print(unstaged_diff.stdout)
# If no changes detected
if not new_files and not after_status:
console.print("[bold red]Warning: No changes detected after aider run[/bold red]")
# If no changes were made, try again with a more explicit task
if inner_loop_count > 0:
console.print("[bold yellow]Retrying with a more explicit task...[/bold yellow]")
enhanced_task = f"IMPORTANT: You must create or modify at least one file. {task}. Create a new test file if needed."
return run_aider(worktree_path, enhanced_task, args, model, weak_model, inner_loop_count - 1)
else:
console.print("[bold red]Failed to make any changes after multiple attempts.[/bold red]")
# Return whether changes were made
return bool(new_files or after_status)
def _prepare_files_for_aider(worktree_path, args):
"""Prepare files for aider to work with."""
# Get repository files
result = run_command("git ls-files", cwd=worktree_path)
all_files = [f for f in result.stdout.strip().split('\n') if f]
# Exclude problematic files
excluded_patterns = [
"custom_aider_history.md",
"aider_history.md",
".aider*",
"context.txt"
]
def should_include(filename):
return not any(re.search(pattern, filename) for pattern in excluded_patterns)
# Check if there are Python files in the repository
python_files = [f for f in all_files if f.endswith('.py') and should_include(f)]
# Determine which files to include
if args.no_python_files:
return []
# Include all Python files
return [quote(f) for f in python_files]
# Git merge and push operations
def merge_and_push(worktree_path, main_repo_path, branch_name, main_branch, task, args):
"""Merge changes from main, then push if no conflicts."""
try:
# Update main branch
if not _update_main_branch(main_repo_path, main_branch):
return False
# Update and commit in worktree
if not _update_worktree(worktree_path, main_branch, branch_name, task, args):
return False
# Push changes to branch
if not _push_branch(worktree_path, branch_name):
return False
# Merge to main branch
if not _merge_to_main(main_repo_path, main_branch, branch_name):
return False
console.print(f"[bold green]Successfully merged {branch_name} into {main_branch} and pushed![/bold green]")
return True
except Exception as e:
console.print(f"[bold red]Error during merge and push:[/bold red] {str(e)}")
return False
def _update_main_branch(main_repo_path, main_branch):
"""Update the main branch with latest changes."""
console.print(f"[bold blue]Pulling latest changes from {main_branch}...[/bold blue]")
checkout_result = run_command(f"git checkout {main_branch}", cwd=main_repo_path)
if checkout_result.returncode != 0:
console.print(f"[bold red]Failed to checkout {main_branch}[/bold red]")
return False
pull_result = run_command("git pull", cwd=main_repo_path)
if pull_result.returncode != 0:
console.print("[bold red]Failed to pull latest changes[/bold red]")
return False
return True
def _update_worktree(worktree_path, main_branch, branch_name, task, args):
"""Update worktree with latest changes and resolve conflicts."""
console.print("[bold blue]Updating worktree with latest changes...[/bold blue]")
# Make sure generated files are in .gitignore
_update_gitignore(worktree_path)
# Remove generated files from git tracking if they're tracked
run_command("git rm --cached context.txt custom_aider_history.md aider_history.md .aider* 2>/dev/null || true",
cwd=worktree_path, shell=True, check=False)
# Commit any pending changes in worktree
run_command("git add -A", cwd=worktree_path)
run_command("git commit -m 'Commit changes before merge'", cwd=worktree_path, check=False)
# Stash any untracked files that might cause conflicts
run_command("git stash push --include-untracked", cwd=worktree_path, check=False)
# Attempt to merge directly
console.print(f"[bold blue]Merging changes from {main_branch}...[/bold blue]")
merge_result = run_command(f"git merge origin/{main_branch}", cwd=worktree_path, check=False)
# Pop the stash if we stashed anything
stash_list = run_command("git stash list", cwd=worktree_path)
if stash_list.stdout.strip():
run_command("git stash pop", cwd=worktree_path, check=False)
# Check if there are conflicts using multiple methods
has_conflicts = False
# Method 1: Check if MERGE_HEAD exists (indicates an in-progress merge)
merge_head_exists = os.path.exists(os.path.join(worktree_path, ".git", "MERGE_HEAD"))
# Method 2: Use git diff to find unmerged files
unmerged_files = []
if merge_result.returncode != 0 or merge_head_exists:
diff_result = run_command("git diff --name-only --diff-filter=U", cwd=worktree_path, check=False)
unmerged_files = diff_result.stdout.strip().split('\n') if diff_result.stdout.strip() else []
has_conflicts = len(unmerged_files) > 0
if has_conflicts:
console.print("[bold yellow]Merge conflicts detected in the following files:[/bold yellow]")
for file in unmerged_files:
if file: # Skip empty lines
console.print(f" - [yellow]{file}[/yellow]")
# Create conflict resolution task
conflict_task = f"Resolve all git merge conflicts while preserving functionality. Original task: {task}"
# Create minimal args for conflict resolution
conflict_args = SimpleNamespace(
model=args.model,
weak_model=args.weak_model,
no_python_files=False,
inner_loop=1,
max_iterations=1
)
# Run aider to help resolve conflicts
run_aider(worktree_path, conflict_task, conflict_args, model=conflict_args.model, weak_model=conflict_args.weak_model)
# Verify conflicts were resolved
diff_result = run_command("git diff --name-only --diff-filter=U", cwd=worktree_path, check=False)
remaining_conflicts = diff_result.stdout.strip().split('\n') if diff_result.stdout.strip() else []
if remaining_conflicts and remaining_conflicts[0]:
console.print("[bold red]Unresolved conflicts remain after aider run:[/bold red]")
for file in remaining_conflicts:
if file: # Skip empty lines
console.print(f" - [red]{file}[/red]")
return False
# Commit the resolved conflicts
run_command("git add -A", cwd=worktree_path)
run_command("git commit -m 'Resolved merge conflicts'", cwd=worktree_path, check=False)
elif merge_result.returncode != 0:
# Merge failed but not due to conflicts
console.print("[bold red]Merge failed for reasons other than conflicts:[/bold red]")
console.print(merge_result.stderr)
return False
return True
def _push_branch(worktree_path, branch_name):
"""Push changes to the branch."""
console.print(f"[bold blue]Pushing changes to branch {branch_name}...[/bold blue]")
push_result = run_command(f"git push -u origin {branch_name}", cwd=worktree_path)
if push_result.returncode != 0:
console.print("[bold red]Failed to push changes. Trying force push...[/bold red]")
force_push = run_command(f"git push -f -u origin {branch_name}", cwd=worktree_path, check=False)
if force_push.returncode != 0:
console.print("[bold red]Force push also failed. Please push manually.[/bold red]")
return False
return True
def _merge_to_main(main_repo_path, main_branch, branch_name):
"""Merge branch into main branch and push."""
console.print(f"[bold blue]Merging {branch_name} into {main_branch}...[/bold blue]")
# Make sure generated files are in .gitignore
_update_gitignore(main_repo_path)
# Remove generated files from git tracking if they're tracked
run_command("git rm --cached context.txt custom_aider_history.md aider_history.md .aider* 2>/dev/null || true",
cwd=main_repo_path, shell=True, check=False)
# Use a safer approach with separate commands
checkout_result = run_command(f"git checkout {main_branch}", cwd=main_repo_path)
if checkout_result.returncode != 0:
console.print(f"[bold red]Failed to checkout {main_branch}[/bold red]")
return False
# Stash any untracked files that might cause conflicts
run_command("git stash push --include-untracked", cwd=main_repo_path, check=False)
merge_result = run_command(f"git merge --no-ff {branch_name}", cwd=main_repo_path, check=False)
# Pop the stash if we stashed anything
stash_list = run_command("git stash list", cwd=main_repo_path)
if stash_list.stdout.strip():
run_command("git stash pop", cwd=main_repo_path, check=False)
# Check for conflicts using multiple methods
has_conflicts = False
# Method 1: Check if MERGE_HEAD exists (indicates an in-progress merge)
merge_head_exists = os.path.exists(os.path.join(main_repo_path, ".git", "MERGE_HEAD"))
# Method 2: Use git diff to find unmerged files
unmerged_files = []
if merge_result.returncode != 0 or merge_head_exists:
diff_result = run_command("git diff --name-only --diff-filter=U", cwd=main_repo_path, check=False)
unmerged_files = diff_result.stdout.strip().split('\n') if diff_result.stdout.strip() else []
has_conflicts = len(unmerged_files) > 0
if has_conflicts:
console.print("[bold yellow]Merge conflict detected in main repo. Running aider to resolve...[/bold yellow]")
for file in unmerged_files:
if file: # Skip empty lines
console.print(f" - [yellow]{file}[/yellow]")
# Create a task for resolving conflicts
conflict_task = "Resolve all git merge conflicts while preserving functionality"
# Run aider to help resolve conflicts
run_aider(main_repo_path, conflict_task, SimpleNamespace(
model=args.model,
weak_model=args.weak_model,
no_python_files=False,
inner_loop=1,
max_iterations=1
))
# Check if conflicts were resolved
diff_result = run_command("git diff --name-only --diff-filter=U", cwd=main_repo_path, check=False)
remaining_conflicts = diff_result.stdout.strip().split('\n') if diff_result.stdout.strip() else []
if remaining_conflicts and remaining_conflicts[0]:
console.print("[bold red]Unresolved conflicts remain. Aborting merge.[/bold red]")
run_command("git merge --abort", cwd=main_repo_path, check=False)
return False
# Commit the resolved conflicts
run_command("git add -A", cwd=main_repo_path)
run_command("git commit -m 'Resolved merge conflicts'", cwd=main_repo_path, check=False)
elif merge_result.returncode != 0:
# Merge failed but not due to conflicts
console.print("[bold red]Merge failed for reasons other than conflicts:[/bold red]")
console.print(merge_result.stderr)
run_command("git merge --abort", cwd=main_repo_path, check=False)
return False
push_result = run_command("git push", cwd=main_repo_path, check=False)
if push_result.returncode != 0:
console.print("[bold red]Failed to push to main. Please push manually.[/bold red]")
return False
return True
# Main application
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Create a git worktree and run aider until tests pass, then merge back to main.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s "Add user authentication feature"
%(prog)s -p /path/to/repo "Fix bug in login form"
%(prog)s --model claude-3-opus "Implement new feature"
%(prog)s --inner-loop 5 "Refactor database code"
"""
)
parser.add_argument("task", help="The task description to pass to aider")
parser.add_argument("-p", "--path", default=".",
help="Path to the main git repository (default: current directory)")
parser.add_argument("--no-push", action="store_true",
help="Don't push changes back to main repository")
parser.add_argument("--model", default=DEFAULT_MODEL,
help=f"Model to use with aider (default: {DEFAULT_MODEL})")
parser.add_argument("--weak-model", default=DEFAULT_WEAK_MODEL,
help=f"Weak model to use with aider (default: {DEFAULT_WEAK_MODEL})")
parser.add_argument("--min-iterations", type=int, default=1,
help="Minimum number of iterations to run before accepting success (default: 1)")
parser.add_argument("--max-iterations", type=int, default=10,
help="Maximum number of iterations to run (default: 10)")
parser.add_argument("--inner-loop", type=int, default=10,
help="Number of inner loop iterations to run (default: 3)")
parser.add_argument("--exponential-retries", action="store_true",
help="If task fails, retry with double the iterations each time (creates new worktrees)")
parser.add_argument("--no-python-files", action="store_true",
help="Disable automatic inclusion of Python files")
parser.add_argument("--read", action="append", default=[],
help="Additional files to read (can be used multiple times)")
return parser.parse_args()
def setup_signal_handlers():
"""Set up signal handlers for graceful exit."""
import signal
def signal_handler(sig, frame):
console.print("\n[bold red]Received interrupt signal. Cleaning up...[/bold red]")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
def display_configuration(args, worktree_path, branch_name, current_iterations=None):
"""Display the current configuration."""
config_table = Table(title="Configuration")
config_table.add_column("Setting", style="cyan")
config_table.add_column("Value", style="green")
config_table.add_row("Task", args.task)
config_table.add_row("Repository", args.path)
config_table.add_row("Worktree", worktree_path)
config_table.add_row("Branch", branch_name)
config_table.add_row("Model", args.model)
config_table.add_row("Weak Model", args.weak_model)
if current_iterations is not None:
config_table.add_row("Current Iterations", str(current_iterations))
config_table.add_row("Min Iterations", str(args.min_iterations))
config_table.add_row("Max Iterations", str(args.max_iterations))
config_table.add_row("Inner Loop", str(args.inner_loop))
config_table.add_row("Push Changes", "No" if args.no_push else "Yes")
config_table.add_row("Exponential Retries", "Yes" if args.exponential_retries else "No")
console.print(config_table)
def run_iteration(iteration, max_iterations, worktree_path, task, args, start_time):
"""Run a single iteration of the aider workflow."""
# Calculate elapsed time
elapsed_time = datetime.now() - start_time
elapsed_str = str(elapsed_time).split('.')[0] # Remove microseconds
console.print(Panel(
f"[bold]Iteration {iteration}/{max_iterations}[/bold]\nTotal time: {elapsed_str}",
style="blue"
))
# Clear previous chat history
history_file = os.path.join(worktree_path, "custom_aider_history.md")
if os.path.exists(history_file):
open(history_file, 'w').close() # Empty the file
# Generate context file
generate_context(worktree_path)
# Run aider and check if changes were made
changes_made = run_aider(worktree_path, task, args, model=args.model, weak_model=args.weak_model)
if not changes_made:
console.print("[bold red]No changes were made during this iteration.[/bold red]")
return False
# Check if tests pass after aider run
return run_tests(worktree_path)
def handle_success(worktree_path, main_repo_path, branch_name, main_branch, task, args, iteration, start_time):
"""Handle successful test completion."""
# Commit any remaining changes
run_command("git add -A", cwd=worktree_path, check=False)
run_command("git commit -m 'Final changes'", cwd=worktree_path, check=False)
if not args.no_push:
# Try to merge and push
merge_success = merge_and_push(worktree_path, main_repo_path, branch_name, main_branch, task, args)
if merge_success:
finish_time = datetime.now()
total_time = str(finish_time - start_time).split('.')[0]
console.print(Panel(
f"[bold]Task completed successfully in {iteration} iterations![/bold]\n"
f"Finished at: {finish_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Total time: {total_time}\n"
f"Original task: {task}",
style="green"
))
return True
else:
console.print("[yellow]Merge issues detected. Running aider again...[/yellow]")
return False
else:
finish_time = datetime.now()
total_time = str(finish_time - start_time).split('.')[0]
console.print(Panel(
f"[bold]Task completed successfully in {iteration} iterations![/bold]\n"
f"Finished at: {finish_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Total time: {total_time}",
style="green"
))
return True
def run_with_iterations(main_repo_path, args, iterations_to_use, start_time=None):
"""Run the task with a specific number of iterations.
Won't exit before min_iterations even if tests pass."""
if start_time is None:
start_time = datetime.now()
console.print(Panel(
f"[bold cyan]Attempting task with {iterations_to_use} iterations[/bold cyan]",
style="blue"
))
try:
# Create worktree
worktree_path, branch_name, main_branch = create_worktree(main_repo_path, args.task)
# Initialize iteration counter
iteration = 1
# Display configuration
display_configuration(args, worktree_path, branch_name, iterations_to_use)
# Main loop
while iteration <= iterations_to_use:
# Run a single iteration
tests_pass = run_iteration(
iteration, iterations_to_use, worktree_path, args.task, args, start_time
)
if tests_pass and iteration >= args.min_iterations:
# Handle successful test completion
if handle_success(worktree_path, main_repo_path, branch_name, main_branch,
args.task, args, iteration, start_time):
return True
elif tests_pass:
console.print(f"[yellow]Tests passed but continuing for minimum {args.min_iterations} iterations[/yellow]")
iteration += 1
if iteration > iterations_to_use:
finish_time = datetime.now()
console.print(Panel(
f"[bold]Reached maximum number of iterations ({iterations_to_use})[/bold]\n"
f"Finished at: {finish_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Total time spent: {str(finish_time - start_time).split('.')[0]}\n"
f"Original task: {args.task}",
style="yellow"
))
break
return False
except Exception as e:
# Escape the error message to prevent markup interpretation
error_msg = str(e).replace("[", "\\[").replace("]", "\\]")
console.print(f"[bold red]Error:[/bold red] {error_msg}")
# Also escape the traceback
traceback_text = traceback.format_exc().replace("[", "\\[").replace("]", "\\]")
console.print(f"[dim]{traceback_text}[/dim]")
return False
def main():
# Setup
setup_signal_handlers()
console.print(Panel.fit(
"[bold cyan]Agent Aider Worktree[/bold cyan]\n"
"[dim]Create a git worktree and run aider until tests pass, then merge back to main.[/dim]",
border_style="blue"
))
# Parse arguments
args = parse_arguments()
# Find git root using git rev-parse
try:
main_repo_path = run_command("git rev-parse --show-toplevel", shell=True).stdout.strip()
except:
console.print("[bold red]Error:[/bold red] Not in a git repository")
sys.exit(1)
# Start time for the entire process
start_time = datetime.now()
if args.exponential_retries:
# Start with min_iterations
iterations = args.min_iterations
success = False
while iterations <= args.max_iterations and not success:
# Try with current number of iterations
success = run_with_iterations(main_repo_path, args, iterations, start_time)
if success:
break
# Double the iterations for next attempt, but don't exceed max_iterations
iterations = min(iterations * 2, args.max_iterations)
# If we've reached max_iterations and still failed, try one last time with max_iterations
if iterations == args.max_iterations and not success:
console.print(Panel(
f"[bold yellow]Final attempt with {args.max_iterations} iterations...[/bold yellow]",
style="yellow"
))
success = run_with_iterations(main_repo_path, args, args.max_iterations, start_time)
break
if not success:
finish_time = datetime.now()
console.print(Panel(
f"[bold red]Task failed after all retry attempts.[/bold red]\n"
f"Finished at: {finish_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Total time spent: {str(finish_time - start_time).split('.')[0]}\n"
f"Original task: {args.task}",
style="red"
))
else:
# Standard approach - just run with max_iterations
try:
run_with_iterations(main_repo_path, args, args.max_iterations, start_time)
except Exception as e:
# Escape the error message to prevent markup interpretation
error_msg = str(e).replace("[", "\\[").replace("]", "\\]")
console.print(f"[bold red]Error:[/bold red] {error_msg}")
# Also escape the traceback
traceback_text = traceback.format_exc().replace("[", "\\[").replace("]", "\\]")
console.print(f"[dim]{traceback_text}[/dim]")
sys.exit(1)
if __name__ == "__main__":
main()