From 495691a0cd09cd8203f31ba140e67bdd8cb4d9fc Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 2 Apr 2026 02:36:03 +0000 Subject: [PATCH 1/2] Enforce agent state transitions at compile time via enum-carried data Move state-specific fields (active_stream, pending_tool_calls, tool_responses) into StreamState enum variants so the compiler prevents accessing data that only exists in a particular state. This eliminates runtime state guards in submit_tool_result and makes invalid field access a compile error rather than a logic bug. https://claude.ai/code/session_014n42dEnaKvcjB6Qj4Pw7PM --- src/llm/agent.rs | 257 +++++++++++++++++++++++------------------------ 1 file changed, 128 insertions(+), 129 deletions(-) diff --git a/src/llm/agent.rs b/src/llm/agent.rs index 819ad1e..e335d1b 100644 --- a/src/llm/agent.rs +++ b/src/llm/agent.rs @@ -119,14 +119,25 @@ pub enum AgentStep { Error(String), } -/// Internal state for the agent stream +/// Internal state for the agent stream. +/// +/// State-specific data is carried inside each variant, enforcing at compile time +/// that fields like the active stream or pending tool responses can only be +/// accessed when the agent is in the corresponding state. enum StreamState { /// Need to make a new chat API request NeedsChatRequest, - /// Currently streaming response from API (stream stored separately for cancel-safety) - Streaming, + /// Currently streaming response from API + Streaming { + stream: futures::stream::BoxStream<'static, Result>, + }, /// All tool requests emitted, waiting for decisions - AwaitingToolDecision, + AwaitingToolDecision { + /// The tool calls the model requested (used for count-checking and message building) + pending_tool_calls: Vec, + /// Tool results submitted so far + tool_responses: Vec, + }, } /// Request mode controlling agent behavior for a single request @@ -182,16 +193,13 @@ pub struct Agent { // Streaming state (Some when actively processing) state: Option, - /// Active response stream (stored separately from state for cancel-safety) - active_stream: - Option>>, mode: RequestMode, - // Accumulated during streaming, consumed when tools complete + // Accumulated during streaming, consumed when tools complete or stream ends. + // These span multiple states (built during Streaming, read during AwaitingToolDecision + // completion), so they live on Agent rather than inside a single variant. streaming_text: String, - streaming_tool_calls: Vec, streaming_thinking: Vec, - tool_responses: Vec, /// When set, fast mode is cooling down until this instant. /// During cooldown, the fast mode beta header is omitted from requests. @@ -222,14 +230,11 @@ impl Agent { // Streaming state starts empty state: None, - active_stream: None, mode: RequestMode::Normal, // Accumulated during streaming streaming_text: String::new(), - streaming_tool_calls: Vec::new(), streaming_thinking: Vec::new(), - tool_responses: Vec::new(), fast_mode_cooldown_until: None, retry_attempt: 0, @@ -259,14 +264,11 @@ impl Agent { // Streaming state starts empty state: None, - active_stream: None, mode: RequestMode::Normal, // Accumulated during streaming streaming_text: String::new(), - streaming_tool_calls: Vec::new(), streaming_thinking: Vec::new(), - tool_responses: Vec::new(), fast_mode_cooldown_until: None, retry_attempt: 0, @@ -393,11 +395,11 @@ impl Agent { self.state = Some(StreamState::NeedsChatRequest); } - /// Cancel the current streaming operation + /// Cancel the current streaming operation. + /// Dropping the state also drops any active stream or pending tool data. pub fn cancel(&mut self) { debug!("Agent::cancel"); self.state = None; - self.active_stream = None; } /// Refresh OAuth token if expired. Returns true if refresh was needed and succeeded. @@ -671,8 +673,7 @@ impl Agent { /// the agent remains in a valid state and can be polled again. pub async fn next(&mut self) -> Option { loop { - // Check state without taking it (cancel-safe) - match self.state.as_ref()? { + match self.state.as_mut()? { StreamState::NeedsChatRequest => { debug!("Agent state: NeedsChatRequest, clearing streaming data"); @@ -686,18 +687,16 @@ impl Agent { // Refresh dynamic system prompt before each request self.refresh_system_prompt(); - // Clear accumulated streaming data for new request + // Clear accumulated cross-state data for new request self.streaming_text.clear(); - self.streaming_tool_calls.clear(); self.streaming_thinking.clear(); - self.tool_responses.clear(); match self.exec_chat_with_retry().await { Ok(response) => { debug!("Agent state: NeedsChatRequest -> Streaming"); - // Store stream separately and update state - self.active_stream = Some(Box::pin(response.stream)); - self.state = Some(StreamState::Streaming); + self.state = Some(StreamState::Streaming { + stream: Box::pin(response.stream), + }); // Continue to process streaming state }, Err(step) => { @@ -710,10 +709,7 @@ impl Agent { } }, - StreamState::Streaming => { - // Get the stream (must exist if we're in Streaming state) - let stream = self.active_stream.as_mut()?; - + StreamState::Streaming { stream } => { match stream.next().await { Some(Ok(event)) => match event { ChatStreamEvent::Start => { @@ -722,7 +718,6 @@ impl Agent { }, ChatStreamEvent::Chunk(chunk) => { self.streaming_text.push_str(&chunk.content); - // State remains Streaming, stream remains in active_stream return Some(match self.mode { RequestMode::Compaction => { AgentStep::CompactionDelta(chunk.content) @@ -735,7 +730,6 @@ impl Agent { // Continue polling }, ChatStreamEvent::ReasoningChunk(chunk) => { - // State remains Streaming return Some(AgentStep::ThinkingDelta(chunk.content)); }, ChatStreamEvent::ThoughtSignatureChunk(_) => { @@ -753,16 +747,32 @@ impl Agent { if let Some(captured) = end.captured_thinking_blocks.take() { self.streaming_thinking = captured; } - if let Some(captured) = end.captured_into_tool_calls() { - self.streaming_tool_calls = captured; + // Capture tool calls from the End event; used below + // when the stream closes (None branch). + let streaming_tool_calls = end + .captured_into_tool_calls() + .unwrap_or_default(); + + if !streaming_tool_calls.is_empty() { + // Stream is done and we have tool calls — transition + // directly to AwaitingToolDecision. + let tool_calls: Vec = streaming_tool_calls + .iter() + .map(ToolCall::from) + .collect(); + self.state = Some(StreamState::AwaitingToolDecision { + pending_tool_calls: streaming_tool_calls, + tool_responses: Vec::new(), + }); + return Some(AgentStep::ToolRequest(tool_calls)); } - // Continue to process stream end + // No tool calls — continue polling; the stream will + // yield None next and we'll handle finish there. }, }, Some(Err(e)) => { let err = format!("{:#}", e); error!("Stream error (attempt {}): {}", self.retry_attempt, err); - self.active_stream = None; self.retry_attempt += 1; if self.retry_attempt >= self.config.max_retries { @@ -781,70 +791,54 @@ impl Agent { }, None => { debug!("Agent: stream returned None (closed)"); - // Stream ended, clean up stream - self.active_stream = None; + // Stream ended with no (remaining) tool calls — finish. + match self.mode { + RequestMode::Compaction => { + self.reset_with_summary(&self.streaming_text.clone()) + }, + RequestMode::Normal => { + let has_content = !self.streaming_thinking.is_empty() + || !self.streaming_text.is_empty(); + if has_content { + let mut msg_content = MessageContent::default(); + + for thinking in &self.streaming_thinking { + msg_content = msg_content.append( + ContentPart::Thinking(thinking.clone()), + ); + } - if self.streaming_tool_calls.is_empty() { - match self.mode { - RequestMode::Compaction => { - self.reset_with_summary(&self.streaming_text.clone()) - }, - RequestMode::Normal => { - // Build message with thinking blocks + text (same pattern as tool use) - // Only push if there's actual content - let has_content = !self.streaming_thinking.is_empty() - || !self.streaming_text.is_empty(); - if has_content { - let mut msg_content = MessageContent::default(); - - // Add thinking blocks first - for thinking in &self.streaming_thinking { - msg_content = msg_content.append( - ContentPart::Thinking(thinking.clone()), - ); - } - - // Add text if non-empty - if !self.streaming_text.is_empty() { - msg_content = msg_content.append( - ContentPart::Text(self.streaming_text.clone()), - ); - } - - self.messages.push(ChatMessage { - role: ChatRole::Assistant, - content: msg_content, - options: None, - }); - } else { - debug!( - "Agent: no content to push (no thinking, no text)" + if !self.streaming_text.is_empty() { + msg_content = msg_content.append( + ContentPart::Text(self.streaming_text.clone()), ); } - }, - } - debug!( - "Agent state: Streaming -> None (Finished), messages={}", - self.messages.len() - ); - self.state = None; - return Some(AgentStep::Finished { - usage: self.total_usage, - }); - } - let tool_calls: Vec = self - .streaming_tool_calls - .iter() - .map(ToolCall::from) - .collect(); - self.state = Some(StreamState::AwaitingToolDecision); - return Some(AgentStep::ToolRequest(tool_calls)); + self.messages.push(ChatMessage { + role: ChatRole::Assistant, + content: msg_content, + options: None, + }); + } else { + debug!( + "Agent: no content to push (no thinking, no text)" + ); + } + }, + } + debug!( + "Agent state: Streaming -> None (Finished), messages={}", + self.messages.len() + ); + self.state = None; + return Some(AgentStep::Finished { + usage: self.total_usage, + }); }, } }, - StreamState::AwaitingToolDecision => { + StreamState::AwaitingToolDecision { .. } => { // Blocked waiting for tool results return None; }, @@ -852,78 +846,83 @@ impl Agent { } } - /// Submit a tool execution result - /// Called by App after ToolExecutor runs the tool + /// Submit a tool execution result. + /// + /// Called by App after ToolExecutor runs the tool. The pending tool calls + /// and accumulated responses live inside the `AwaitingToolDecision` variant, + /// so the compiler ensures this data is only accessible in the correct state. pub fn submit_tool_result(&mut self, call_id: &str, content: String) { debug!("Agent: submit_tool_result call_id={}", call_id); - let state_name = match &self.state { - Some(StreamState::NeedsChatRequest) => "NeedsChatRequest", - Some(StreamState::Streaming { .. }) => "Streaming", - Some(StreamState::AwaitingToolDecision) => "AwaitingToolDecision", - None => "None", + // Extract the mutable state data from the AwaitingToolDecision variant. + // Any other state is a caller bug — warn and bail. + let (pending_tool_calls, tool_responses) = match &mut self.state { + Some(StreamState::AwaitingToolDecision { + pending_tool_calls, + tool_responses, + }) => (pending_tool_calls, tool_responses), + other => { + let state_name = match other { + Some(StreamState::NeedsChatRequest) => "NeedsChatRequest", + Some(StreamState::Streaming { .. }) => "Streaming", + Some(StreamState::AwaitingToolDecision { .. }) => unreachable!(), + None => "None", + }; + tracing::warn!( + "submit_tool_result called in unexpected state: {}", + state_name + ); + return; + }, }; - if !matches!(self.state, Some(StreamState::AwaitingToolDecision)) { - tracing::warn!( - "submit_tool_result called in unexpected state: {}", - state_name - ); - } // Store the response - self.tool_responses - .push(ToolResponse::new(call_id.to_string(), content)); - - // Check if all tools have been decided - // Guard: streaming_tool_calls must be non-empty (otherwise we shouldn't be receiving results) - if self.streaming_tool_calls.is_empty() { - tracing::warn!("submit_tool_result called but no tool calls pending"); - return; - } + tool_responses.push(ToolResponse::new(call_id.to_string(), content)); debug!( "Agent: tool_responses={}/{}", - self.tool_responses.len(), - self.streaming_tool_calls.len() + tool_responses.len(), + pending_tool_calls.len() ); - if self.tool_responses.len() >= self.streaming_tool_calls.len() { + if tool_responses.len() >= pending_tool_calls.len() { debug!( "Agent: all tools complete, building message. thinking_blocks={}, text_len={}, tool_calls={}", self.streaming_thinking.len(), self.streaming_text.len(), - self.streaming_tool_calls.len() + pending_tool_calls.len() ); - // All tools processed - build the assistant message - // Per Anthropic docs: thinking blocks must come first, then text/tool_use + + // All tools processed — build the assistant message. + // Per Anthropic docs: thinking blocks must come first, then text/tool_use. let mut msg_content = MessageContent::default(); - // Add thinking blocks first for thinking in &self.streaming_thinking { msg_content = msg_content.append(ContentPart::Thinking(thinking.clone())); } - // Add text if non-empty if !self.streaming_text.is_empty() { msg_content = msg_content.append(ContentPart::Text(self.streaming_text.clone())); } - // Add tool calls - for tc in &self.streaming_tool_calls { + for tc in pending_tool_calls.iter() { msg_content = msg_content.append(ContentPart::ToolCall(tc.clone())); } - // Add assistant message self.messages.push(ChatMessage { role: ChatRole::Assistant, content: msg_content, options: None, }); - // Add tool responses - for response in std::mem::take(&mut self.tool_responses) { - debug!("Agent: adding tool response - call_id={}", response.call_id); - self.messages.push(ChatMessage::from(response)); + // Take ownership of the tool responses by swapping the state. + // This consumes the AwaitingToolDecision data cleanly. + let old_state = self.state.take(); + if let Some(StreamState::AwaitingToolDecision { tool_responses, .. }) = old_state { + for response in tool_responses { + debug!("Agent: adding tool response - call_id={}", response.call_id); + self.messages.push(ChatMessage::from(response)); + } } debug!( From 22a28702e1e96c8be98f9eaf0be28fe80d7b69c6 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 2 Apr 2026 03:19:32 +0000 Subject: [PATCH 2/2] Extract per-turn transient state into TurnState struct Move streaming_text, streaming_thinking, mode, and retry_attempt out of Agent into a TurnState struct. These fields only exist while processing a request and are dead weight when idle. Agent now has phase/turn as paired Options (both Some while processing, both None when idle) rather than scattered loose fields. Also capture last_result on turn completion so last_message() works after the turn state is consumed. https://claude.ai/code/session_014n42dEnaKvcjB6Qj4Pw7PM --- src/llm/agent.rs | 310 ++++++++++++++++++++++++----------------------- 1 file changed, 158 insertions(+), 152 deletions(-) diff --git a/src/llm/agent.rs b/src/llm/agent.rs index e335d1b..3b1b0a3 100644 --- a/src/llm/agent.rs +++ b/src/llm/agent.rs @@ -119,12 +119,12 @@ pub enum AgentStep { Error(String), } -/// Internal state for the agent stream. +/// The current phase of the agent's request-response cycle. /// -/// State-specific data is carried inside each variant, enforcing at compile time +/// Phase-specific data is carried inside each variant, enforcing at compile time /// that fields like the active stream or pending tool responses can only be -/// accessed when the agent is in the corresponding state. -enum StreamState { +/// accessed when the agent is in the corresponding phase. +enum StreamPhase { /// Need to make a new chat API request NeedsChatRequest, /// Currently streaming response from API @@ -140,6 +140,18 @@ enum StreamState { }, } +/// Per-turn processing state that spans stream phases. +/// +/// Created when a request begins (`send_request`), consumed on completion. +/// Lives as a separate field from `StreamPhase` so the borrow checker allows +/// `&mut self` method calls while the phase enum is being matched. +struct TurnState { + mode: RequestMode, + text: String, + thinking: Vec, + retry_attempt: u32, +} + /// Request mode controlling agent behavior for a single request #[derive(Debug, Clone, Copy, Default)] pub enum RequestMode { @@ -178,36 +190,37 @@ impl RequestMode { /// Called before each LLM request to allow prompt content to change. pub type SystemPromptBuilder = Box String + Send + Sync>; -/// Agent for handling conversations +/// Agent for handling conversations. +/// +/// Fields are organized by lifecycle: +/// - **Configuration**: set at creation, rarely changes (`client`, `config`, `tools`, etc.) +/// - **Conversation**: grows over the session (`messages`, `total_usage`) +/// - **Active processing**: present only while handling a request (`phase`, `turn`) +/// - **Cross-turn operational state**: spans multiple request cycles (`fast_mode_cooldown_until`) pub struct Agent { + // Configuration client: Client, config: AgentRuntimeConfig, tools: ToolRegistry, - messages: Vec, system_prompt: String, - total_usage: Usage, - /// OAuth credentials for Claude Max (if available) - oauth: Option, - /// Optional dynamic prompt builder - called before each request system_prompt_builder: Option, + oauth: Option, - // Streaming state (Some when actively processing) - state: Option, - mode: RequestMode, + // Conversation + messages: Vec, + total_usage: Usage, - // Accumulated during streaming, consumed when tools complete or stream ends. - // These span multiple states (built during Streaming, read during AwaitingToolDecision - // completion), so they live on Agent rather than inside a single variant. - streaming_text: String, - streaming_thinking: Vec, + // Active processing (both Some while handling a request, both None when idle). + // Split into two fields so the borrow checker allows &mut self method calls + // while matching on the phase enum. + phase: Option, + turn: Option, + + /// Result text from the last completed turn, for `last_message()`. + last_result: Option, /// When set, fast mode is cooling down until this instant. - /// During cooldown, the fast mode beta header is omitted from requests. fast_mode_cooldown_until: Option, - - /// Retry attempt counter, persists across calls to exec_chat_with_retry. - /// Reset on successful request or new user message. - retry_attempt: u32, } impl Agent { @@ -224,20 +237,13 @@ impl Agent { tools, messages: vec![ChatMessage::system(system_prompt)], system_prompt: system_prompt.to_string(), - total_usage: Usage::default(), - oauth, system_prompt_builder: None, - - // Streaming state starts empty - state: None, - mode: RequestMode::Normal, - - // Accumulated during streaming - streaming_text: String::new(), - streaming_thinking: Vec::new(), - + oauth, + total_usage: Usage::default(), + phase: None, + turn: None, + last_result: None, fast_mode_cooldown_until: None, - retry_attempt: 0, } } @@ -258,20 +264,13 @@ impl Agent { tools, messages: vec![ChatMessage::system(&system_prompt)], system_prompt, - total_usage: Usage::default(), - oauth, system_prompt_builder: Some(prompt_builder), - - // Streaming state starts empty - state: None, - mode: RequestMode::Normal, - - // Accumulated during streaming - streaming_text: String::new(), - streaming_thinking: Vec::new(), - + oauth, + total_usage: Usage::default(), + phase: None, + turn: None, + last_result: None, fast_mode_cooldown_until: None, - retry_attempt: 0, } } @@ -386,20 +385,25 @@ impl Agent { .collect() } - /// Send a user message to the agent - /// Call next() repeatedly to get AgentSteps until None + /// Send a user message to the agent. + /// Call `next()` repeatedly to get `AgentStep`s until `None`. pub fn send_request(&mut self, user_input: &str, mode: RequestMode) { self.messages.push(ChatMessage::user(user_input)); - self.mode = mode; - self.retry_attempt = 0; - self.state = Some(StreamState::NeedsChatRequest); + self.phase = Some(StreamPhase::NeedsChatRequest); + self.turn = Some(TurnState { + mode, + text: String::new(), + thinking: Vec::new(), + retry_attempt: 0, + }); } /// Cancel the current streaming operation. - /// Dropping the state also drops any active stream or pending tool data. + /// Dropping the phase/turn also drops any active stream or pending tool data. pub fn cancel(&mut self) { debug!("Agent::cancel"); - self.state = None; + self.phase = None; + self.turn = None; } /// Refresh OAuth token if expired. Returns true if refresh was needed and succeeded. @@ -446,13 +450,9 @@ impl Agent { } /// Get the last assistant message text (for returning sub-agent results). - /// Returns the accumulated streaming text if present. + /// Returns the result text captured when the last turn completed. pub fn last_message(&self) -> Option { - if self.streaming_text.is_empty() { - None - } else { - Some(self.streaming_text.clone()) - } + self.last_result.clone() } /// Reset the agent with a new context after compaction @@ -563,7 +563,8 @@ impl Agent { } let mut request = ChatRequest::new(messages); - let mode_opts = self.mode.options(&self.config); + let mode = self.turn.as_ref().expect("exec_chat_with_retry called without active turn").mode; + let mode_opts = mode.options(&self.config); if mode_opts.tools_enabled { request = request.with_tools(self.get_tools()); } @@ -619,7 +620,11 @@ impl Agent { .with_reasoning_effort(ReasoningEffort::Budget(mode_opts.thinking_budget)); } - self.retry_attempt += 1; + let turn = self.turn.as_mut().expect("exec_chat_with_retry called without active turn"); + turn.retry_attempt += 1; + let attempt = turn.retry_attempt; + let max_retries = self.config.max_retries; + match self .client .exec_chat_stream(&self.config.model, request.clone(), Some(&chat_options)) @@ -627,12 +632,12 @@ impl Agent { { Ok(resp) => { info!("Chat request successful"); - self.retry_attempt = 0; + self.turn.as_mut().unwrap().retry_attempt = 0; Ok(resp) }, Err(e) => { let err = format!("{:#}", e); - error!("Chat request failed (attempt {}): {}", self.retry_attempt, err); + error!("Chat request failed (attempt {}): {}", attempt, err); // If fast mode is active and we hit a rate limit or overloaded // error, trigger cooldown and retry without the fast mode header. @@ -643,15 +648,16 @@ impl Agent { ); self.fast_mode_cooldown_until = Some(Instant::now() + FAST_MODE_COOLDOWN); // Don't count fast mode fallback as a retry attempt - self.retry_attempt -= 1; + let turn = self.turn.as_mut().unwrap(); + turn.retry_attempt -= 1; return Err(AgentStep::Retrying { - attempt: self.retry_attempt, + attempt: turn.retry_attempt, error: format!("Fast mode rate limited, falling back to standard speed"), }); } - if self.retry_attempt >= self.config.max_retries { - self.retry_attempt = 0; + if attempt >= max_retries { + self.turn.as_mut().unwrap().retry_attempt = 0; return Err(AgentStep::Error(format!( "API error ({}): {}", self.config.model, err @@ -659,66 +665,71 @@ impl Agent { } // Return retry step, caller should call next() again Err(AgentStep::Retrying { - attempt: self.retry_attempt, + attempt, error: err, }) }, } } - /// Get the next step from the agent - /// Returns None when streaming is complete or awaiting tool decisions + /// Get the next step from the agent. + /// Returns None when streaming is complete or awaiting tool decisions. /// /// This method is cancel-safe: if the future is dropped mid-poll, /// the agent remains in a valid state and can be polled again. pub async fn next(&mut self) -> Option { loop { - match self.state.as_mut()? { - StreamState::NeedsChatRequest => { - debug!("Agent state: NeedsChatRequest, clearing streaming data"); - - // Exponential backoff before retrying: 2s, 4s, 8s, 16s, ... - if self.retry_attempt > 0 { - let delay = Duration::from_secs(2u64.pow(self.retry_attempt)); - info!("Backoff: waiting {}s before retry attempt {}", delay.as_secs(), self.retry_attempt + 1); + // NeedsChatRequest doesn't capture data from the match, so the + // borrow on self.phase is released — allowing &mut self method calls. + // Streaming captures `stream`, holding the borrow, but only accesses + // other fields (turn, messages, total_usage) via disjoint field borrows. + match self.phase.as_mut()? { + StreamPhase::NeedsChatRequest => { + let turn = self.turn.as_mut().expect("phase without turn"); + debug!("Agent phase: NeedsChatRequest"); + + // Exponential backoff before retrying + if turn.retry_attempt > 0 { + let delay = Duration::from_secs(2u64.pow(turn.retry_attempt)); + info!("Backoff: waiting {}s before retry attempt {}", delay.as_secs(), turn.retry_attempt + 1); tokio::time::sleep(delay).await; } - // Refresh dynamic system prompt before each request - self.refresh_system_prompt(); + // Clear accumulated cross-phase data for new request + turn.text.clear(); + turn.thinking.clear(); - // Clear accumulated cross-state data for new request - self.streaming_text.clear(); - self.streaming_thinking.clear(); + // Borrow on `turn` is dropped here (NeedsChatRequest captured + // nothing from phase), so &mut self methods are available. + self.refresh_system_prompt(); match self.exec_chat_with_retry().await { Ok(response) => { - debug!("Agent state: NeedsChatRequest -> Streaming"); - self.state = Some(StreamState::Streaming { + debug!("Agent phase: NeedsChatRequest -> Streaming"); + self.phase = Some(StreamPhase::Streaming { stream: Box::pin(response.stream), }); - // Continue to process streaming state }, Err(step) => { - // Retrying or Error - state stays NeedsChatRequest for retry if !matches!(step, AgentStep::Retrying { .. }) { - self.state = None; + self.phase = None; + self.turn = None; } return Some(step); }, } }, - StreamState::Streaming { stream } => { + StreamPhase::Streaming { stream } => { match stream.next().await { Some(Ok(event)) => match event { ChatStreamEvent::Start => { debug!("Agent: got ChatStreamEvent::Start"); - // Continue polling }, ChatStreamEvent::Chunk(chunk) => { - self.streaming_text.push_str(&chunk.content); - return Some(match self.mode { + let turn = self.turn.as_mut().expect("phase without turn"); + turn.text.push_str(&chunk.content); + return Some(match turn.mode { RequestMode::Compaction => { AgentStep::CompactionDelta(chunk.content) }, @@ -727,14 +738,11 @@ impl Agent { }, ChatStreamEvent::ToolCallChunk(_) => { debug!("Agent: got ToolCallChunk"); - // Continue polling }, ChatStreamEvent::ReasoningChunk(chunk) => { return Some(AgentStep::ThinkingDelta(chunk.content)); }, - ChatStreamEvent::ThoughtSignatureChunk(_) => { - // Gemini thought signatures - continue polling - }, + ChatStreamEvent::ThoughtSignatureChunk(_) => {}, ChatStreamEvent::End(mut end) => { debug!("Agent: got ChatStreamEvent::End"); if let Some(ref genai_usage) = end.captured_usage { @@ -744,73 +752,71 @@ impl Agent { } else { debug!("No captured_usage in End event"); } + let turn = self.turn.as_mut().expect("phase without turn"); if let Some(captured) = end.captured_thinking_blocks.take() { - self.streaming_thinking = captured; + turn.thinking = captured; } - // Capture tool calls from the End event; used below - // when the stream closes (None branch). let streaming_tool_calls = end .captured_into_tool_calls() .unwrap_or_default(); if !streaming_tool_calls.is_empty() { - // Stream is done and we have tool calls — transition - // directly to AwaitingToolDecision. let tool_calls: Vec = streaming_tool_calls .iter() .map(ToolCall::from) .collect(); - self.state = Some(StreamState::AwaitingToolDecision { + self.phase = Some(StreamPhase::AwaitingToolDecision { pending_tool_calls: streaming_tool_calls, tool_responses: Vec::new(), }); return Some(AgentStep::ToolRequest(tool_calls)); } - // No tool calls — continue polling; the stream will - // yield None next and we'll handle finish there. }, }, Some(Err(e)) => { let err = format!("{:#}", e); - error!("Stream error (attempt {}): {}", self.retry_attempt, err); - - self.retry_attempt += 1; - if self.retry_attempt >= self.config.max_retries { - self.retry_attempt = 0; - self.state = None; + let turn = self.turn.as_mut().expect("phase without turn"); + error!("Stream error (attempt {}): {}", turn.retry_attempt, err); + + turn.retry_attempt += 1; + if turn.retry_attempt >= self.config.max_retries { + turn.retry_attempt = 0; + self.phase = None; + self.turn = None; return Some(AgentStep::Error(format!( "Stream error ({}): {}", self.config.model, err ))); } - // Go back to NeedsChatRequest so the retry loop picks it up - self.state = Some(StreamState::NeedsChatRequest); + let attempt = turn.retry_attempt; + self.phase = Some(StreamPhase::NeedsChatRequest); return Some(AgentStep::Retrying { - attempt: self.retry_attempt, + attempt, error: err, }); }, None => { debug!("Agent: stream returned None (closed)"); - // Stream ended with no (remaining) tool calls — finish. - match self.mode { + let turn = self.turn.as_ref().expect("phase without turn"); + match turn.mode { RequestMode::Compaction => { - self.reset_with_summary(&self.streaming_text.clone()) + let summary = turn.text.clone(); + self.reset_with_summary(&summary); }, RequestMode::Normal => { - let has_content = !self.streaming_thinking.is_empty() - || !self.streaming_text.is_empty(); + let has_content = !turn.thinking.is_empty() + || !turn.text.is_empty(); if has_content { let mut msg_content = MessageContent::default(); - for thinking in &self.streaming_thinking { + for thinking in &turn.thinking { msg_content = msg_content.append( ContentPart::Thinking(thinking.clone()), ); } - if !self.streaming_text.is_empty() { + if !turn.text.is_empty() { msg_content = msg_content.append( - ContentPart::Text(self.streaming_text.clone()), + ContentPart::Text(turn.text.clone()), ); } @@ -827,10 +833,16 @@ impl Agent { }, } debug!( - "Agent state: Streaming -> None (Finished), messages={}", + "Agent phase: Streaming -> None (Finished), messages={}", self.messages.len() ); - self.state = None; + // Capture result before clearing turn state + let result_text = self.turn.as_ref() + .map(|t| t.text.clone()) + .filter(|t| !t.is_empty()); + self.last_result = result_text; + self.phase = None; + self.turn = None; return Some(AgentStep::Finished { usage: self.total_usage, }); @@ -838,8 +850,7 @@ impl Agent { } }, - StreamState::AwaitingToolDecision { .. } => { - // Blocked waiting for tool results + StreamPhase::AwaitingToolDecision { .. } => { return None; }, } @@ -850,33 +861,30 @@ impl Agent { /// /// Called by App after ToolExecutor runs the tool. The pending tool calls /// and accumulated responses live inside the `AwaitingToolDecision` variant, - /// so the compiler ensures this data is only accessible in the correct state. + /// so the compiler ensures this data is only accessible in the correct phase. pub fn submit_tool_result(&mut self, call_id: &str, content: String) { debug!("Agent: submit_tool_result call_id={}", call_id); - // Extract the mutable state data from the AwaitingToolDecision variant. - // Any other state is a caller bug — warn and bail. - let (pending_tool_calls, tool_responses) = match &mut self.state { - Some(StreamState::AwaitingToolDecision { + let (pending_tool_calls, tool_responses) = match &mut self.phase { + Some(StreamPhase::AwaitingToolDecision { pending_tool_calls, tool_responses, }) => (pending_tool_calls, tool_responses), other => { - let state_name = match other { - Some(StreamState::NeedsChatRequest) => "NeedsChatRequest", - Some(StreamState::Streaming { .. }) => "Streaming", - Some(StreamState::AwaitingToolDecision { .. }) => unreachable!(), + let phase_name = match other { + Some(StreamPhase::NeedsChatRequest) => "NeedsChatRequest", + Some(StreamPhase::Streaming { .. }) => "Streaming", + Some(StreamPhase::AwaitingToolDecision { .. }) => unreachable!(), None => "None", }; tracing::warn!( - "submit_tool_result called in unexpected state: {}", - state_name + "submit_tool_result called in unexpected phase: {}", + phase_name ); return; }, }; - // Store the response tool_responses.push(ToolResponse::new(call_id.to_string(), content)); debug!( @@ -886,23 +894,22 @@ impl Agent { ); if tool_responses.len() >= pending_tool_calls.len() { + let turn = self.turn.as_ref().expect("phase without turn"); debug!( "Agent: all tools complete, building message. thinking_blocks={}, text_len={}, tool_calls={}", - self.streaming_thinking.len(), - self.streaming_text.len(), + turn.thinking.len(), + turn.text.len(), pending_tool_calls.len() ); - // All tools processed — build the assistant message. - // Per Anthropic docs: thinking blocks must come first, then text/tool_use. let mut msg_content = MessageContent::default(); - for thinking in &self.streaming_thinking { + for thinking in &turn.thinking { msg_content = msg_content.append(ContentPart::Thinking(thinking.clone())); } - if !self.streaming_text.is_empty() { - msg_content = msg_content.append(ContentPart::Text(self.streaming_text.clone())); + if !turn.text.is_empty() { + msg_content = msg_content.append(ContentPart::Text(turn.text.clone())); } for tc in pending_tool_calls.iter() { @@ -915,10 +922,9 @@ impl Agent { options: None, }); - // Take ownership of the tool responses by swapping the state. - // This consumes the AwaitingToolDecision data cleanly. - let old_state = self.state.take(); - if let Some(StreamState::AwaitingToolDecision { tool_responses, .. }) = old_state { + // Take ownership of the tool responses by consuming the phase. + let old_phase = self.phase.take(); + if let Some(StreamPhase::AwaitingToolDecision { tool_responses, .. }) = old_phase { for response in tool_responses { debug!("Agent: adding tool response - call_id={}", response.call_id); self.messages.push(ChatMessage::from(response)); @@ -930,9 +936,9 @@ impl Agent { self.messages.len() ); - debug!("Agent: state -> NeedsChatRequest (ready for continuation)"); - self.retry_attempt = 0; - self.state = Some(StreamState::NeedsChatRequest); + debug!("Agent: phase -> NeedsChatRequest (ready for continuation)"); + self.turn.as_mut().unwrap().retry_attempt = 0; + self.phase = Some(StreamPhase::NeedsChatRequest); } } }