Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions packages/opencode/src/tasks/job-commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ import { Worktree } from "../worktree"

const log = Log.create({ service: "taskctl.tool.job-commands" })

// Branch name validation for security
const BRANCH_REGEX = /^[a-zA-Z0-9_\-\/\+.]+$/

function safeBranch(name: string): string | null {
return BRANCH_REGEX.test(name) ? name : null
}

export async function executeStart(projectId: string, params: any, ctx: any): Promise<{ title: string; output: string; metadata: {} }> {
const issueNumber = params.issueNumber
if (!issueNumber) throw new Error("start requires issueNumber")
if (!issueNumber || !Number.isInteger(Number(issueNumber)) || Number(issueNumber) <= 0) {
throw new Error("issueNumber must be a valid positive integer")
}

const existingJob = await Store.findJobByIssue(projectId, issueNumber)
if (existingJob) {
Expand Down Expand Up @@ -50,15 +59,32 @@ export async function executeStart(projectId: string, params: any, ctx: any): Pr
const jobId = `job-${Date.now()}`
const featureBranch = `feature/issue-${issueNumber}`

// Validate feature branch name before using in git operations
const safeFeatureBranch = safeBranch(featureBranch)
if (!safeFeatureBranch) {
log.error("feature branch name failed validation", { issueNumber, featureBranch })
throw new Error(`Invalid feature branch name: ${featureBranch}`)
}

// Create the feature branch in the main repository
try {
const { $ } = await import("bun")
const result = await $`git checkout -b ${featureBranch} dev`.cwd(ctx.session.directory).quiet().nothrow()
const result = await $`git checkout -b ${safeFeatureBranch} dev`.cwd(ctx.session.directory).quiet().nothrow()
if (result.exitCode !== 0) {
log.error("failed to create feature branch", { issueNumber, featureBranch })
log.error("failed to create feature branch", { issueNumber, featureBranch: safeFeatureBranch })
} else {
// Push the feature branch to origin - MUST succeed before creating job
const pushResult = await $`git push -u origin ${safeFeatureBranch}`.cwd(ctx.session.directory).quiet().nothrow()
if (pushResult.exitCode !== 0) {
const stderr = pushResult.stderr ? new TextDecoder().decode(pushResult.stderr) : "Unknown error"
log.error("failed to push feature branch to origin", { issueNumber, featureBranch: safeFeatureBranch, error: stderr })
throw new Error(`Failed to push feature branch ${safeFeatureBranch} to origin: ${stderr}`)
}
log.info("feature branch created and pushed", { issueNumber, featureBranch: safeFeatureBranch })
}
} catch (e) {
log.error("error creating feature branch", { issueNumber, featureBranch, error: String(e) })
log.error("error creating feature branch", { issueNumber, featureBranch: safeFeatureBranch, error: String(e) })
throw e
}

await Store.createJob(projectId, {
Expand All @@ -70,7 +96,7 @@ export async function executeStart(projectId: string, params: any, ctx: any): Pr
pulse_pid: null,
max_workers: 3,
pm_session_id: ctx.sessionID,
feature_branch: featureBranch,
feature_branch: safeFeatureBranch,
})

enableAutoWakeup(ctx.sessionID)
Expand Down
115 changes: 110 additions & 5 deletions packages/opencode/src/tasks/pulse-verdicts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ import { isSessionActivelyRunning, lockFilePath } from "./pulse-scheduler"
// Allow 6 attempts to resolve minor test flakiness before escalating to PM
const MAX_ADVERSARIAL_ATTEMPTS = 6

// Branch name validation: only alphanumeric, hyphen, underscore, slash, dot, plus (anchors ensure full string match)
const BRANCH_REGEX = /^[a-zA-Z0-9_\-\/\+.]+$/

const log = Log.create({ service: "taskctl.pulse.verdicts" })

function safeBranch(name: string): string | null {
return BRANCH_REGEX.test(name) ? name : null
}

export { MAX_ADVERSARIAL_ATTEMPTS }

async function notifyPM(pmSessionId: string, text: string): Promise<{ ok: true } | { ok: false; error: string }> {
Expand Down Expand Up @@ -322,6 +329,75 @@ async function escalateCommitFailure(
})
}

async function mergeTaskBranchesToFeatureBranch(projectRoot: string, featureBranch: string, tasks: Task[]): Promise<{ ok: true } | { ok: false; error: string }> {
// Validate feature branch name
const safeFeatureBranch = safeBranch(featureBranch)
if (!safeFeatureBranch) {
log.error("feature branch name contains invalid characters", { featureBranch })
return { ok: false, error: `Invalid feature branch name: ${featureBranch}` }
}

const branches: string[] = []
for (const task of tasks) {
if (task.branch && task.branch !== safeFeatureBranch) {
const safeTaskBranch = safeBranch(task.branch)
if (!safeTaskBranch) {
log.warn("task branch name contains invalid characters, skipping", { taskId: task.id, branch: task.branch })
continue
}
branches.push(safeTaskBranch)
}
}
if (!branches.length) {
log.debug("no task branches to merge", { featureBranch: safeFeatureBranch })
return { ok: true }
}

try {
const { $ } = await import("bun")

// Verify origin remote exists
const remoteCheck = await $`git ls-remote origin HEAD`.cwd(projectRoot).quiet().nothrow()
if (remoteCheck.exitCode !== 0) {
log.warn("origin remote not configured", { projectRoot })
return { ok: false, error: "origin remote not configured" }
}

// Checkout feature branch
const checkoutRes = await $`git checkout ${safeFeatureBranch}`.cwd(projectRoot).nothrow()
if (checkoutRes.exitCode !== 0) {
log.error("failed to checkout feature branch", { featureBranch: safeFeatureBranch })
return { ok: false, error: `Failed to checkout feature branch ${safeFeatureBranch}` }
}

// Merge each task branch
for (const branch of branches) {
const mergeRes = await $`git merge --no-ff ${branch} -m "merge task branch ${branch}"`.cwd(projectRoot).nothrow()
if (mergeRes.exitCode !== 0) {
log.error("failed to merge task branch, aborting", { branch, featureBranch: safeFeatureBranch })
// Abort merge to leave repository in clean state
await $`git merge --abort`.cwd(projectRoot).nothrow()
return { ok: false, error: `Merge conflict with branch ${branch}, aborting` }
}
log.info("merged task branch", { branch, featureBranch: safeFeatureBranch })
}

// Push feature branch
const pushResult = await $`git push origin ${safeFeatureBranch}`.cwd(projectRoot).nothrow()
if (pushResult.exitCode !== 0) {
const stderr = pushResult.stderr ? new TextDecoder().decode(pushResult.stderr) : "Unknown error"
log.error("failed to push feature branch", { featureBranch: safeFeatureBranch, error: stderr })
return { ok: false, error: `Failed to push feature branch: ${stderr}` }
}

log.info("pushed feature branch after merging task branches", { featureBranch: safeFeatureBranch })
return { ok: true }
} catch (e) {
log.error("error merging task branches", { featureBranch, error: String(e) })
return { ok: false, error: String(e) }
}
}

async function createPRForJob(projectId: string, tasks: Task[], pmSessionId: string, issueNumber: number): Promise<{ ok: true; prUrl: string } | { ok: false; error: string }> {
if (tasks.length === 0) {
return { ok: false, error: "No tasks found in job" }
Expand All @@ -332,29 +408,50 @@ async function createPRForJob(projectId: string, tasks: Task[], pmSessionId: str

// Fallback to first task branch if job or feature_branch is missing
if (!featureBranch) {
const firstTaskWithBranch = tasks.find((t) => t.branch)
const firstTaskWithBranch = tasks.find((t) => t.branch && t.branch.trim().length > 0)
if (firstTaskWithBranch) {
featureBranch = firstTaskWithBranch.branch
} else {
return { ok: false, error: "No feature branch found for job" }
}
}

// After fallback, validate featureBranch is not empty
if (!featureBranch || featureBranch.trim().length === 0) {
return { ok: false, error: "Invalid feature branch name for job" }
}

// Validate branch name contains only safe characters
const safeFeatureBranch = safeBranch(featureBranch)
if (!safeFeatureBranch) {
log.error("feature branch name contains invalid characters", { featureBranch })
return { ok: false, error: `Invalid feature branch name: ${featureBranch}` }
}

try {
const parentSession = await Session.get(pmSessionId).catch(() => null)
if (!parentSession?.directory) {
return { ok: false, error: `PM session not found for PR creation (session: ${pmSessionId}, branch: ${featureBranch})` }
return { ok: false, error: `PM session not found for PR creation (session: ${pmSessionId}, branch: ${safeFeatureBranch})` }
}

const { $ } = await import("bun")

// Check if feature branch has commits ahead of dev
const ahead = await $`git rev-list --count dev..${safeFeatureBranch}`.cwd(parentSession.directory).quiet().nothrow()
const count = parseInt(new TextDecoder().decode(ahead.stdout).trim() || "0")
if (count === 0) {
log.warn("feature branch has no commits ahead of dev, skipping PR creation", { featureBranch: safeFeatureBranch })
return { ok: false, error: `Feature branch ${safeFeatureBranch} has no commits ahead of dev` }
}

const repo = "randomm/opencode"
const prTitle = `Issue #${issueNumber}: Automated PR from taskctl`
const prBody = `Closes #${issueNumber}

This PR was automatically created by the taskctl pipeline after all tasks completed.`

// Use proper shell escaping to prevent command injection
const result = await $`gh pr create --repo ${repo} --base dev --head ${featureBranch} --title ${prTitle} --body ${prBody}`
// Bun Shell auto-escapes interpolated values; no manual escaping needed
const result = await $`gh pr create --repo ${repo} --base dev --head ${safeFeatureBranch} --title ${prTitle} --body ${prBody}`
.cwd(parentSession.directory)
.quiet()
.nothrow()
Expand Down Expand Up @@ -413,4 +510,12 @@ async function processAdversarialVerdicts(jobId: string, projectId: string, pmSe
}
}

export { processAdversarialVerdicts, commitTask, escalateToPM, escalateCommitFailure, notifyPM, createPRForJob }
export {
processAdversarialVerdicts,
commitTask,
escalateToPM,
escalateCommitFailure,
notifyPM,
createPRForJob,
mergeTaskBranchesToFeatureBranch,
}
79 changes: 62 additions & 17 deletions packages/opencode/src/tasks/pulse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BackgroundTaskEvent } from "../session/async-tasks"
import { Instance, context as instanceContext } from "../project/instance"
import { Store } from "./store"
import { MessageV2 } from "../session/message-v2"
import { Session } from "../session"
import { Provider } from "../provider/provider"
import {
writeLockFile,
Expand All @@ -15,7 +16,7 @@ import {
scheduleReadyTasks,
sanitizeWorktree,
} from "./pulse-scheduler"
import { processAdversarialVerdicts, notifyPM, escalateToPM, createPRForJob } from "./pulse-verdicts"
import { processAdversarialVerdicts, notifyPM, escalateToPM, createPRForJob, mergeTaskBranchesToFeatureBranch } from "./pulse-verdicts"
import { heartbeatActiveAgents, checkTimeouts, checkSteering, gracefulStop } from "./pulse-monitoring"

// Re-exports for backward compatibility with tests
Expand All @@ -28,7 +29,13 @@ export {
scheduleReadyTasks,
sanitizeWorktree,
} from "./pulse-scheduler"
export { processAdversarialVerdicts, notifyPM, escalateToPM, createPRForJob } from "./pulse-verdicts"
export {
processAdversarialVerdicts,
notifyPM,
escalateToPM,
createPRForJob,
mergeTaskBranchesToFeatureBranch,
} from "./pulse-verdicts"
export { heartbeatActiveAgents, checkTimeouts, checkSteering, gracefulStop } from "./pulse-monitoring"

const log = Log.create({ service: "taskctl.pulse" })
Expand Down Expand Up @@ -215,27 +222,65 @@ export async function checkCompletion(
await Store.updateJob(projectId, jobId, { status: "complete" })
Bus.publish(BackgroundTaskEvent.Completed, { taskID: jobId, sessionID: pmSessionId, parentSessionID: pmSessionId })

// Create PR for the job
const issueNumber = jobTasks[0]?.parent_issue ?? 0
const prResult = await createPRForJob(projectId, jobTasks, pmSessionId, issueNumber)
if (prResult.ok) {
log.info("PR created successfully", { jobId, prUrl: prResult.prUrl })
// Guard: Cannot create PR if no tasks exist (all skipped/overridden)
if (jobTasks.length === 0) {
log.warn("job completed with no tasks, skipping PR creation", { jobId })
const notifyResult = await notifyPM(
pmSessionId,
`🎉 Job complete: all tasks done for issue #${jobTasks[0]?.parent_issue ?? "unknown"}\n\nPR created: ${prResult.prUrl}`,
)
if (!notifyResult.ok) {
log.warn("failed to notify PM of job completion with PR", { jobId, error: notifyResult.error })
}
} else {
log.warn("failed to create PR for completed job", { jobId, error: prResult.error })
const notifyResult = await notifyPM(
pmSessionId,
`🎉 Job complete: all tasks done for issue #${jobTasks[0]?.parent_issue ?? "unknown"}\n\n⚠️ PR creation failed: ${prResult.error}`,
`🎉 Job complete: all tasks were skipped/overridden. No PR created.`,
)
if (!notifyResult.ok) {
log.warn("failed to notify PM of job completion", { jobId, error: notifyResult.error })
}
return
}

// Merge task branches into feature branch before creating PR
const job = await Store.getJob(projectId, jobId)
const featureBranch = job?.feature_branch
let mergeSuccess = true
let mergeError = ""

if (featureBranch) {
const pmSession = await Session.get(pmSessionId).catch(() => null)
if (pmSession?.directory) {
const mergeResult = await mergeTaskBranchesToFeatureBranch(pmSession.directory, featureBranch, jobTasks)
if (!mergeResult.ok) {
mergeSuccess = false
mergeError = mergeResult.error
log.warn("failed to merge task branches", { jobId, error: mergeError })
}
} else {
log.warn("PM session not found for merging task branches", { pmSessionId })
}
} else {
log.warn("No feature branch found for job, skipping task branch merge", { jobId })
}

// Create PR for the job (only if merge succeeded)
const issueNumber = jobTasks[0]?.parent_issue ?? 0
let prResult: { ok: true; prUrl: string } | { ok: false; error: string }

if (!mergeSuccess) {
prResult = { ok: false, error: `Merge failed: ${mergeError}` }
} else {
prResult = await createPRForJob(projectId, jobTasks, pmSessionId, issueNumber)
}

// Build appropriate completion message based on result
let completionMessage: string
if (prResult.ok) {
completionMessage = `🎉 Job complete: all tasks done for issue #${jobTasks[0]?.parent_issue ?? "unknown"}\n\nPR created: ${prResult.prUrl}`
log.info("PR created successfully", { jobId, prUrl: prResult.prUrl })
} else if (!mergeSuccess) {
completionMessage = `🎉 Job complete: all tasks done for issue #${jobTasks[0]?.parent_issue ?? "unknown"}\n\n⚠️ Merge failed: ${prResult.error}`
} else {
completionMessage = `🎉 Job complete: all tasks merged for issue #${jobTasks[0]?.parent_issue ?? "unknown"}\n\n⚠️ PR creation failed: ${prResult.error}`
}

const notifyResult = await notifyPM(pmSessionId, completionMessage)
if (!notifyResult.ok) {
log.warn("failed to notify PM of job completion", { jobId, error: notifyResult.error })
}
} catch (e) {
activeTicks.get(projectId)?.delete(jobId)
Expand Down
Loading