diff --git a/src/llm/agent.rs b/src/llm/agent.rs index 819ad1e..3b1b0a3 100644 --- a/src/llm/agent.rs +++ b/src/llm/agent.rs @@ -119,14 +119,37 @@ pub enum AgentStep { Error(String), } -/// Internal state for the agent stream -enum StreamState { +/// The current phase of the agent's request-response cycle. +/// +/// Phase-specific data is carried inside each variant, enforcing at compile time +/// that fields like the active stream or pending tool responses can only be +/// accessed when the agent is in the corresponding phase. +enum StreamPhase { /// Need to make a new chat API request NeedsChatRequest, - /// Currently streaming response from API (stream stored separately for cancel-safety) - Streaming, + /// Currently streaming response from API + Streaming { + stream: futures::stream::BoxStream<'static, Result>, + }, /// All tool requests emitted, waiting for decisions - AwaitingToolDecision, + AwaitingToolDecision { + /// The tool calls the model requested (used for count-checking and message building) + pending_tool_calls: Vec, + /// Tool results submitted so far + tool_responses: Vec, + }, +} + +/// Per-turn processing state that spans stream phases. +/// +/// Created when a request begins (`send_request`), consumed on completion. +/// Lives as a separate field from `StreamPhase` so the borrow checker allows +/// `&mut self` method calls while the phase enum is being matched. +struct TurnState { + mode: RequestMode, + text: String, + thinking: Vec, + retry_attempt: u32, } /// Request mode controlling agent behavior for a single request @@ -167,39 +190,37 @@ impl RequestMode { /// Called before each LLM request to allow prompt content to change. pub type SystemPromptBuilder = Box String + Send + Sync>; -/// Agent for handling conversations +/// Agent for handling conversations. +/// +/// Fields are organized by lifecycle: +/// - **Configuration**: set at creation, rarely changes (`client`, `config`, `tools`, etc.) +/// - **Conversation**: grows over the session (`messages`, `total_usage`) +/// - **Active processing**: present only while handling a request (`phase`, `turn`) +/// - **Cross-turn operational state**: spans multiple request cycles (`fast_mode_cooldown_until`) pub struct Agent { + // Configuration client: Client, config: AgentRuntimeConfig, tools: ToolRegistry, - messages: Vec, system_prompt: String, - total_usage: Usage, - /// OAuth credentials for Claude Max (if available) - oauth: Option, - /// Optional dynamic prompt builder - called before each request system_prompt_builder: Option, + oauth: Option, - // Streaming state (Some when actively processing) - state: Option, - /// Active response stream (stored separately from state for cancel-safety) - active_stream: - Option>>, - mode: RequestMode, + // Conversation + messages: Vec, + total_usage: Usage, + + // Active processing (both Some while handling a request, both None when idle). + // Split into two fields so the borrow checker allows &mut self method calls + // while matching on the phase enum. + phase: Option, + turn: Option, - // Accumulated during streaming, consumed when tools complete - streaming_text: String, - streaming_tool_calls: Vec, - streaming_thinking: Vec, - tool_responses: Vec, + /// Result text from the last completed turn, for `last_message()`. + last_result: Option, /// When set, fast mode is cooling down until this instant. - /// During cooldown, the fast mode beta header is omitted from requests. fast_mode_cooldown_until: Option, - - /// Retry attempt counter, persists across calls to exec_chat_with_retry. - /// Reset on successful request or new user message. - retry_attempt: u32, } impl Agent { @@ -216,23 +237,13 @@ impl Agent { tools, messages: vec![ChatMessage::system(system_prompt)], system_prompt: system_prompt.to_string(), - total_usage: Usage::default(), - oauth, system_prompt_builder: None, - - // Streaming state starts empty - state: None, - active_stream: None, - mode: RequestMode::Normal, - - // Accumulated during streaming - streaming_text: String::new(), - streaming_tool_calls: Vec::new(), - streaming_thinking: Vec::new(), - tool_responses: Vec::new(), - + oauth, + total_usage: Usage::default(), + phase: None, + turn: None, + last_result: None, fast_mode_cooldown_until: None, - retry_attempt: 0, } } @@ -253,23 +264,13 @@ impl Agent { tools, messages: vec![ChatMessage::system(&system_prompt)], system_prompt, - total_usage: Usage::default(), - oauth, system_prompt_builder: Some(prompt_builder), - - // Streaming state starts empty - state: None, - active_stream: None, - mode: RequestMode::Normal, - - // Accumulated during streaming - streaming_text: String::new(), - streaming_tool_calls: Vec::new(), - streaming_thinking: Vec::new(), - tool_responses: Vec::new(), - + oauth, + total_usage: Usage::default(), + phase: None, + turn: None, + last_result: None, fast_mode_cooldown_until: None, - retry_attempt: 0, } } @@ -384,20 +385,25 @@ impl Agent { .collect() } - /// Send a user message to the agent - /// Call next() repeatedly to get AgentSteps until None + /// Send a user message to the agent. + /// Call `next()` repeatedly to get `AgentStep`s until `None`. pub fn send_request(&mut self, user_input: &str, mode: RequestMode) { self.messages.push(ChatMessage::user(user_input)); - self.mode = mode; - self.retry_attempt = 0; - self.state = Some(StreamState::NeedsChatRequest); + self.phase = Some(StreamPhase::NeedsChatRequest); + self.turn = Some(TurnState { + mode, + text: String::new(), + thinking: Vec::new(), + retry_attempt: 0, + }); } - /// Cancel the current streaming operation + /// Cancel the current streaming operation. + /// Dropping the phase/turn also drops any active stream or pending tool data. pub fn cancel(&mut self) { debug!("Agent::cancel"); - self.state = None; - self.active_stream = None; + self.phase = None; + self.turn = None; } /// Refresh OAuth token if expired. Returns true if refresh was needed and succeeded. @@ -444,13 +450,9 @@ impl Agent { } /// Get the last assistant message text (for returning sub-agent results). - /// Returns the accumulated streaming text if present. + /// Returns the result text captured when the last turn completed. pub fn last_message(&self) -> Option { - if self.streaming_text.is_empty() { - None - } else { - Some(self.streaming_text.clone()) - } + self.last_result.clone() } /// Reset the agent with a new context after compaction @@ -561,7 +563,8 @@ impl Agent { } let mut request = ChatRequest::new(messages); - let mode_opts = self.mode.options(&self.config); + let mode = self.turn.as_ref().expect("exec_chat_with_retry called without active turn").mode; + let mode_opts = mode.options(&self.config); if mode_opts.tools_enabled { request = request.with_tools(self.get_tools()); } @@ -617,7 +620,11 @@ impl Agent { .with_reasoning_effort(ReasoningEffort::Budget(mode_opts.thinking_budget)); } - self.retry_attempt += 1; + let turn = self.turn.as_mut().expect("exec_chat_with_retry called without active turn"); + turn.retry_attempt += 1; + let attempt = turn.retry_attempt; + let max_retries = self.config.max_retries; + match self .client .exec_chat_stream(&self.config.model, request.clone(), Some(&chat_options)) @@ -625,12 +632,12 @@ impl Agent { { Ok(resp) => { info!("Chat request successful"); - self.retry_attempt = 0; + self.turn.as_mut().unwrap().retry_attempt = 0; Ok(resp) }, Err(e) => { let err = format!("{:#}", e); - error!("Chat request failed (attempt {}): {}", self.retry_attempt, err); + error!("Chat request failed (attempt {}): {}", attempt, err); // If fast mode is active and we hit a rate limit or overloaded // error, trigger cooldown and retry without the fast mode header. @@ -641,15 +648,16 @@ impl Agent { ); self.fast_mode_cooldown_until = Some(Instant::now() + FAST_MODE_COOLDOWN); // Don't count fast mode fallback as a retry attempt - self.retry_attempt -= 1; + let turn = self.turn.as_mut().unwrap(); + turn.retry_attempt -= 1; return Err(AgentStep::Retrying { - attempt: self.retry_attempt, + attempt: turn.retry_attempt, error: format!("Fast mode rate limited, falling back to standard speed"), }); } - if self.retry_attempt >= self.config.max_retries { - self.retry_attempt = 0; + if attempt >= max_retries { + self.turn.as_mut().unwrap().retry_attempt = 0; return Err(AgentStep::Error(format!( "API error ({}): {}", self.config.model, err @@ -657,73 +665,71 @@ impl Agent { } // Return retry step, caller should call next() again Err(AgentStep::Retrying { - attempt: self.retry_attempt, + attempt, error: err, }) }, } } - /// Get the next step from the agent - /// Returns None when streaming is complete or awaiting tool decisions + /// Get the next step from the agent. + /// Returns None when streaming is complete or awaiting tool decisions. /// /// This method is cancel-safe: if the future is dropped mid-poll, /// the agent remains in a valid state and can be polled again. pub async fn next(&mut self) -> Option { loop { - // Check state without taking it (cancel-safe) - match self.state.as_ref()? { - StreamState::NeedsChatRequest => { - debug!("Agent state: NeedsChatRequest, clearing streaming data"); - - // Exponential backoff before retrying: 2s, 4s, 8s, 16s, ... - if self.retry_attempt > 0 { - let delay = Duration::from_secs(2u64.pow(self.retry_attempt)); - info!("Backoff: waiting {}s before retry attempt {}", delay.as_secs(), self.retry_attempt + 1); + // NeedsChatRequest doesn't capture data from the match, so the + // borrow on self.phase is released — allowing &mut self method calls. + // Streaming captures `stream`, holding the borrow, but only accesses + // other fields (turn, messages, total_usage) via disjoint field borrows. + match self.phase.as_mut()? { + StreamPhase::NeedsChatRequest => { + let turn = self.turn.as_mut().expect("phase without turn"); + debug!("Agent phase: NeedsChatRequest"); + + // Exponential backoff before retrying + if turn.retry_attempt > 0 { + let delay = Duration::from_secs(2u64.pow(turn.retry_attempt)); + info!("Backoff: waiting {}s before retry attempt {}", delay.as_secs(), turn.retry_attempt + 1); tokio::time::sleep(delay).await; } - // Refresh dynamic system prompt before each request - self.refresh_system_prompt(); + // Clear accumulated cross-phase data for new request + turn.text.clear(); + turn.thinking.clear(); - // Clear accumulated streaming data for new request - self.streaming_text.clear(); - self.streaming_tool_calls.clear(); - self.streaming_thinking.clear(); - self.tool_responses.clear(); + // Borrow on `turn` is dropped here (NeedsChatRequest captured + // nothing from phase), so &mut self methods are available. + self.refresh_system_prompt(); match self.exec_chat_with_retry().await { Ok(response) => { - debug!("Agent state: NeedsChatRequest -> Streaming"); - // Store stream separately and update state - self.active_stream = Some(Box::pin(response.stream)); - self.state = Some(StreamState::Streaming); - // Continue to process streaming state + debug!("Agent phase: NeedsChatRequest -> Streaming"); + self.phase = Some(StreamPhase::Streaming { + stream: Box::pin(response.stream), + }); }, Err(step) => { - // Retrying or Error - state stays NeedsChatRequest for retry if !matches!(step, AgentStep::Retrying { .. }) { - self.state = None; + self.phase = None; + self.turn = None; } return Some(step); }, } }, - StreamState::Streaming => { - // Get the stream (must exist if we're in Streaming state) - let stream = self.active_stream.as_mut()?; - + StreamPhase::Streaming { stream } => { match stream.next().await { Some(Ok(event)) => match event { ChatStreamEvent::Start => { debug!("Agent: got ChatStreamEvent::Start"); - // Continue polling }, ChatStreamEvent::Chunk(chunk) => { - self.streaming_text.push_str(&chunk.content); - // State remains Streaming, stream remains in active_stream - return Some(match self.mode { + let turn = self.turn.as_mut().expect("phase without turn"); + turn.text.push_str(&chunk.content); + return Some(match turn.mode { RequestMode::Compaction => { AgentStep::CompactionDelta(chunk.content) }, @@ -732,15 +738,11 @@ impl Agent { }, ChatStreamEvent::ToolCallChunk(_) => { debug!("Agent: got ToolCallChunk"); - // Continue polling }, ChatStreamEvent::ReasoningChunk(chunk) => { - // State remains Streaming return Some(AgentStep::ThinkingDelta(chunk.content)); }, - ChatStreamEvent::ThoughtSignatureChunk(_) => { - // Gemini thought signatures - continue polling - }, + ChatStreamEvent::ThoughtSignatureChunk(_) => {}, ChatStreamEvent::End(mut end) => { debug!("Agent: got ChatStreamEvent::End"); if let Some(ref genai_usage) = end.captured_usage { @@ -750,180 +752,183 @@ impl Agent { } else { debug!("No captured_usage in End event"); } + let turn = self.turn.as_mut().expect("phase without turn"); if let Some(captured) = end.captured_thinking_blocks.take() { - self.streaming_thinking = captured; + turn.thinking = captured; } - if let Some(captured) = end.captured_into_tool_calls() { - self.streaming_tool_calls = captured; + let streaming_tool_calls = end + .captured_into_tool_calls() + .unwrap_or_default(); + + if !streaming_tool_calls.is_empty() { + let tool_calls: Vec = streaming_tool_calls + .iter() + .map(ToolCall::from) + .collect(); + self.phase = Some(StreamPhase::AwaitingToolDecision { + pending_tool_calls: streaming_tool_calls, + tool_responses: Vec::new(), + }); + return Some(AgentStep::ToolRequest(tool_calls)); } - // Continue to process stream end }, }, Some(Err(e)) => { let err = format!("{:#}", e); - error!("Stream error (attempt {}): {}", self.retry_attempt, err); - self.active_stream = None; - - self.retry_attempt += 1; - if self.retry_attempt >= self.config.max_retries { - self.retry_attempt = 0; - self.state = None; + let turn = self.turn.as_mut().expect("phase without turn"); + error!("Stream error (attempt {}): {}", turn.retry_attempt, err); + + turn.retry_attempt += 1; + if turn.retry_attempt >= self.config.max_retries { + turn.retry_attempt = 0; + self.phase = None; + self.turn = None; return Some(AgentStep::Error(format!( "Stream error ({}): {}", self.config.model, err ))); } - // Go back to NeedsChatRequest so the retry loop picks it up - self.state = Some(StreamState::NeedsChatRequest); + let attempt = turn.retry_attempt; + self.phase = Some(StreamPhase::NeedsChatRequest); return Some(AgentStep::Retrying { - attempt: self.retry_attempt, + attempt, error: err, }); }, None => { debug!("Agent: stream returned None (closed)"); - // Stream ended, clean up stream - self.active_stream = None; + let turn = self.turn.as_ref().expect("phase without turn"); + match turn.mode { + RequestMode::Compaction => { + let summary = turn.text.clone(); + self.reset_with_summary(&summary); + }, + RequestMode::Normal => { + let has_content = !turn.thinking.is_empty() + || !turn.text.is_empty(); + if has_content { + let mut msg_content = MessageContent::default(); + + for thinking in &turn.thinking { + msg_content = msg_content.append( + ContentPart::Thinking(thinking.clone()), + ); + } - if self.streaming_tool_calls.is_empty() { - match self.mode { - RequestMode::Compaction => { - self.reset_with_summary(&self.streaming_text.clone()) - }, - RequestMode::Normal => { - // Build message with thinking blocks + text (same pattern as tool use) - // Only push if there's actual content - let has_content = !self.streaming_thinking.is_empty() - || !self.streaming_text.is_empty(); - if has_content { - let mut msg_content = MessageContent::default(); - - // Add thinking blocks first - for thinking in &self.streaming_thinking { - msg_content = msg_content.append( - ContentPart::Thinking(thinking.clone()), - ); - } - - // Add text if non-empty - if !self.streaming_text.is_empty() { - msg_content = msg_content.append( - ContentPart::Text(self.streaming_text.clone()), - ); - } - - self.messages.push(ChatMessage { - role: ChatRole::Assistant, - content: msg_content, - options: None, - }); - } else { - debug!( - "Agent: no content to push (no thinking, no text)" + if !turn.text.is_empty() { + msg_content = msg_content.append( + ContentPart::Text(turn.text.clone()), ); } - }, - } - debug!( - "Agent state: Streaming -> None (Finished), messages={}", - self.messages.len() - ); - self.state = None; - return Some(AgentStep::Finished { - usage: self.total_usage, - }); - } - let tool_calls: Vec = self - .streaming_tool_calls - .iter() - .map(ToolCall::from) - .collect(); - self.state = Some(StreamState::AwaitingToolDecision); - return Some(AgentStep::ToolRequest(tool_calls)); + self.messages.push(ChatMessage { + role: ChatRole::Assistant, + content: msg_content, + options: None, + }); + } else { + debug!( + "Agent: no content to push (no thinking, no text)" + ); + } + }, + } + debug!( + "Agent phase: Streaming -> None (Finished), messages={}", + self.messages.len() + ); + // Capture result before clearing turn state + let result_text = self.turn.as_ref() + .map(|t| t.text.clone()) + .filter(|t| !t.is_empty()); + self.last_result = result_text; + self.phase = None; + self.turn = None; + return Some(AgentStep::Finished { + usage: self.total_usage, + }); }, } }, - StreamState::AwaitingToolDecision => { - // Blocked waiting for tool results + StreamPhase::AwaitingToolDecision { .. } => { return None; }, } } } - /// Submit a tool execution result - /// Called by App after ToolExecutor runs the tool + /// Submit a tool execution result. + /// + /// Called by App after ToolExecutor runs the tool. The pending tool calls + /// and accumulated responses live inside the `AwaitingToolDecision` variant, + /// so the compiler ensures this data is only accessible in the correct phase. pub fn submit_tool_result(&mut self, call_id: &str, content: String) { debug!("Agent: submit_tool_result call_id={}", call_id); - let state_name = match &self.state { - Some(StreamState::NeedsChatRequest) => "NeedsChatRequest", - Some(StreamState::Streaming { .. }) => "Streaming", - Some(StreamState::AwaitingToolDecision) => "AwaitingToolDecision", - None => "None", + let (pending_tool_calls, tool_responses) = match &mut self.phase { + Some(StreamPhase::AwaitingToolDecision { + pending_tool_calls, + tool_responses, + }) => (pending_tool_calls, tool_responses), + other => { + let phase_name = match other { + Some(StreamPhase::NeedsChatRequest) => "NeedsChatRequest", + Some(StreamPhase::Streaming { .. }) => "Streaming", + Some(StreamPhase::AwaitingToolDecision { .. }) => unreachable!(), + None => "None", + }; + tracing::warn!( + "submit_tool_result called in unexpected phase: {}", + phase_name + ); + return; + }, }; - if !matches!(self.state, Some(StreamState::AwaitingToolDecision)) { - tracing::warn!( - "submit_tool_result called in unexpected state: {}", - state_name - ); - } - // Store the response - self.tool_responses - .push(ToolResponse::new(call_id.to_string(), content)); - - // Check if all tools have been decided - // Guard: streaming_tool_calls must be non-empty (otherwise we shouldn't be receiving results) - if self.streaming_tool_calls.is_empty() { - tracing::warn!("submit_tool_result called but no tool calls pending"); - return; - } + tool_responses.push(ToolResponse::new(call_id.to_string(), content)); debug!( "Agent: tool_responses={}/{}", - self.tool_responses.len(), - self.streaming_tool_calls.len() + tool_responses.len(), + pending_tool_calls.len() ); - if self.tool_responses.len() >= self.streaming_tool_calls.len() { + if tool_responses.len() >= pending_tool_calls.len() { + let turn = self.turn.as_ref().expect("phase without turn"); debug!( "Agent: all tools complete, building message. thinking_blocks={}, text_len={}, tool_calls={}", - self.streaming_thinking.len(), - self.streaming_text.len(), - self.streaming_tool_calls.len() + turn.thinking.len(), + turn.text.len(), + pending_tool_calls.len() ); - // All tools processed - build the assistant message - // Per Anthropic docs: thinking blocks must come first, then text/tool_use + let mut msg_content = MessageContent::default(); - // Add thinking blocks first - for thinking in &self.streaming_thinking { + for thinking in &turn.thinking { msg_content = msg_content.append(ContentPart::Thinking(thinking.clone())); } - // Add text if non-empty - if !self.streaming_text.is_empty() { - msg_content = msg_content.append(ContentPart::Text(self.streaming_text.clone())); + if !turn.text.is_empty() { + msg_content = msg_content.append(ContentPart::Text(turn.text.clone())); } - // Add tool calls - for tc in &self.streaming_tool_calls { + for tc in pending_tool_calls.iter() { msg_content = msg_content.append(ContentPart::ToolCall(tc.clone())); } - // Add assistant message self.messages.push(ChatMessage { role: ChatRole::Assistant, content: msg_content, options: None, }); - // Add tool responses - for response in std::mem::take(&mut self.tool_responses) { - debug!("Agent: adding tool response - call_id={}", response.call_id); - self.messages.push(ChatMessage::from(response)); + // Take ownership of the tool responses by consuming the phase. + let old_phase = self.phase.take(); + if let Some(StreamPhase::AwaitingToolDecision { tool_responses, .. }) = old_phase { + for response in tool_responses { + debug!("Agent: adding tool response - call_id={}", response.call_id); + self.messages.push(ChatMessage::from(response)); + } } debug!( @@ -931,9 +936,9 @@ impl Agent { self.messages.len() ); - debug!("Agent: state -> NeedsChatRequest (ready for continuation)"); - self.retry_attempt = 0; - self.state = Some(StreamState::NeedsChatRequest); + debug!("Agent: phase -> NeedsChatRequest (ready for continuation)"); + self.turn.as_mut().unwrap().retry_attempt = 0; + self.phase = Some(StreamPhase::NeedsChatRequest); } } }