Skip to content

Commit fc37dfb

Browse files
hyperpolymathclaude
andcommitted
feat(ai): add Claude streaming + tool_use with BoJ cartridge routing
Phase 1: Rust SSE streaming provider for Anthropic Messages API Phase 2: ReScript streaming types (streamChunk, toolCallState, streamingState) Phase 3: sendMessageStreaming command wrapper Phase 4: New Msg variants for stream chunks and tool calls Phase 5: AiEngine streaming helpers (parse, append, default state) Phase 6: Update.res handlers for streaming + tool use loop Claude's tool_use requests route through BoJ MCP cartridges. Panel-N receives streaming neural tokens in real-time. Panel-L constraints become system prompt rules. Panel-W shows tool call results. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3ed0554 commit fc37dfb

12 files changed

Lines changed: 876 additions & 1 deletion

File tree

src-tauri/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ tauri-plugin-fs = "2"
2929
serde = { version = "1", features = ["derive"] }
3030
serde_json = "1"
3131
once_cell = "1"
32-
reqwest = { version = "0.12", features = ["json", "blocking"] }
32+
reqwest = { version = "0.12", features = ["json", "blocking", "stream"] }
33+
futures = "0.3"
3334
dirs = "6"
3435
notify = { version = "7", features = ["serde"] }
3536
notify-debouncer-mini = "0.5"

src-tauri/src/ai/commands.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,85 @@ pub async fn ai_get_state() -> Result<String, String> {
255255
let config_file = load_config()?;
256256
serde_json::to_string(&config_file).map_err(|e| format!("Serialisation error: {e}"))
257257
}
258+
259+
/// Send a streaming message to the Anthropic provider with tool_use support.
260+
///
261+
/// Unlike `ai_send_message` which returns a complete response, this command
262+
/// fires and forgets — results arrive via Tauri events on the `ai:stream-chunk`
263+
/// channel. The frontend listens for these events and feeds them into the
264+
/// TEA update loop as `AiStreamChunkReceived` messages.
265+
///
266+
/// Returns `"streaming_started"` immediately. Errors during streaming are
267+
/// emitted as `StreamChunk::Error` events rather than returned here.
268+
#[tauri::command]
269+
pub async fn ai_send_message_streaming(
270+
app_handle: tauri::AppHandle,
271+
request: StreamingRequest,
272+
) -> Result<String, String> {
273+
let config_file = load_config()?;
274+
275+
// Select provider: explicit choice or auto-select by priority.
276+
// Currently, streaming only supports Anthropic. Other providers
277+
// fall back to the non-streaming path.
278+
let provider_config: ProviderConfig = if let Some(ref pid) = request.provider_id {
279+
config_file
280+
.providers
281+
.iter()
282+
.find(|p| &p.id == pid)
283+
.ok_or(format!("Provider {:?} not configured", pid))?
284+
.clone()
285+
} else {
286+
// Auto-select: highest priority, enabled, Anthropic preferred.
287+
let mut candidates: Vec<&ProviderConfig> = config_file
288+
.providers
289+
.iter()
290+
.filter(|p| p.enabled)
291+
.collect();
292+
candidates.sort_by_key(|p| p.priority);
293+
(*candidates
294+
.first()
295+
.ok_or("No enabled providers configured")?)
296+
.clone()
297+
};
298+
299+
// Verify this is Anthropic — streaming is only implemented for Anthropic.
300+
if provider_config.id != ProviderId::Anthropic {
301+
return Err(format!(
302+
"Streaming only supported for Anthropic, got {:?}",
303+
provider_config.id
304+
));
305+
}
306+
307+
// Clone owned data for the spawned task (must be 'static).
308+
let tools_owned = request.tools.clone();
309+
let tool_results_owned = request.tool_results.clone();
310+
let system_prompt = request.system_prompt.clone();
311+
let history = request.history.clone();
312+
let content = request.content.clone();
313+
314+
// Spawn the streaming task so we can return immediately.
315+
tauri::async_runtime::spawn(async move {
316+
let tools_ref = tools_owned.as_deref();
317+
let tool_results_ref = tool_results_owned.as_deref();
318+
319+
if let Err(e) = providers::send_anthropic_streaming(
320+
&provider_config,
321+
&system_prompt,
322+
&history,
323+
&content,
324+
tools_ref,
325+
tool_results_ref,
326+
app_handle.clone(),
327+
)
328+
.await
329+
{
330+
use tauri::Emitter;
331+
let _ = app_handle.emit(
332+
"ai:stream-chunk",
333+
&StreamChunk::Error(e),
334+
);
335+
}
336+
});
337+
338+
Ok("streaming_started".to_string())
339+
}

src-tauri/src/ai/providers.rs

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,243 @@ pub async fn send_local(
459459
})
460460
}
461461

462+
// ---------------------------------------------------------------------------
463+
// Streaming provider — Anthropic Messages API with SSE + tool_use
464+
// ---------------------------------------------------------------------------
465+
466+
/// Send a streaming message via the Anthropic Messages API.
467+
///
468+
/// Instead of waiting for a complete response, this function reads
469+
/// Server-Sent Events (SSE) from the Anthropic API and emits each
470+
/// chunk to the frontend as a Tauri event (`ai:stream-chunk`).
471+
///
472+
/// Supports tool_use: if `tools` is provided, Claude may request tool
473+
/// calls which the frontend routes through BoJ MCP cartridges.
474+
pub async fn send_anthropic_streaming(
475+
config: &ProviderConfig,
476+
system_prompt: &str,
477+
messages: &[AiMessage],
478+
user_content: &str,
479+
tools: Option<&[ToolDefinition]>,
480+
tool_results: Option<&[ToolResult]>,
481+
app_handle: tauri::AppHandle,
482+
) -> Result<(), String> {
483+
use futures::StreamExt;
484+
use tauri::Emitter;
485+
486+
let api_key = resolve_api_key(config)?;
487+
488+
// Build a client without the standard timeout — streaming responses
489+
// can take longer than 120s for large generations.
490+
let client = Client::builder()
491+
.build()
492+
.map_err(|e| format!("HTTP client error: {e}"))?;
493+
494+
// Build conversation history in Anthropic format.
495+
let mut api_messages: Vec<Value> = messages
496+
.iter()
497+
.filter(|m| m.role != MessageRole::System)
498+
.map(|m| {
499+
json!({
500+
"role": match m.role {
501+
MessageRole::User => "user",
502+
MessageRole::Assistant => "assistant",
503+
_ => "user",
504+
},
505+
"content": m.content,
506+
})
507+
})
508+
.collect();
509+
510+
// If tool_results are provided, append them as a user message with
511+
// tool_result content blocks (Anthropic's multi-turn tool_use format).
512+
if let Some(results) = tool_results {
513+
let tool_result_blocks: Vec<Value> = results
514+
.iter()
515+
.map(|tr| {
516+
json!({
517+
"type": "tool_result",
518+
"tool_use_id": tr.tool_use_id,
519+
"content": tr.content,
520+
"is_error": tr.is_error,
521+
})
522+
})
523+
.collect();
524+
api_messages.push(json!({
525+
"role": "user",
526+
"content": tool_result_blocks,
527+
}));
528+
} else {
529+
// Append the new user message.
530+
api_messages.push(json!({
531+
"role": "user",
532+
"content": user_content,
533+
}));
534+
}
535+
536+
let mut body = json!({
537+
"model": config.model,
538+
"max_tokens": 4096,
539+
"system": system_prompt,
540+
"messages": api_messages,
541+
"stream": true,
542+
});
543+
544+
// Include tool definitions if provided.
545+
if let Some(tool_defs) = tools {
546+
let tools_json: Vec<Value> = tool_defs
547+
.iter()
548+
.map(|t| {
549+
json!({
550+
"name": t.name,
551+
"description": t.description,
552+
"input_schema": t.input_schema,
553+
})
554+
})
555+
.collect();
556+
body["tools"] = json!(tools_json);
557+
}
558+
559+
let resp = client
560+
.post("https://api.anthropic.com/v1/messages")
561+
.header("x-api-key", &api_key)
562+
.header("anthropic-version", "2023-06-01")
563+
.header("content-type", "application/json")
564+
.json(&body)
565+
.send()
566+
.await
567+
.map_err(|e| format!("Anthropic streaming request failed: {e}"))?;
568+
569+
let status = resp.status();
570+
if !status.is_success() {
571+
let error_text = resp.text().await.unwrap_or_default();
572+
let _ = app_handle.emit(
573+
"ai:stream-chunk",
574+
&StreamChunk::Error(format!("Anthropic HTTP {}: {}", status, error_text)),
575+
);
576+
return Err(format!("Anthropic HTTP {}: {}", status, error_text));
577+
}
578+
579+
// Read SSE stream line by line.
580+
let mut stream = resp.bytes_stream();
581+
let mut buffer = String::new();
582+
let mut input_tokens: u32 = 0;
583+
let mut output_tokens: u32 = 0;
584+
let mut in_tool_use = false;
585+
586+
while let Some(chunk_result) = stream.next().await {
587+
let chunk_bytes = chunk_result.map_err(|e| format!("Stream read error: {e}"))?;
588+
let chunk_str = String::from_utf8_lossy(&chunk_bytes);
589+
buffer.push_str(&chunk_str);
590+
591+
// Process complete lines in the buffer.
592+
while let Some(newline_pos) = buffer.find('\n') {
593+
let line = buffer[..newline_pos].trim().to_string();
594+
buffer = buffer[newline_pos + 1..].to_string();
595+
596+
// SSE format: lines starting with "data: " contain JSON.
597+
if !line.starts_with("data: ") {
598+
continue;
599+
}
600+
let json_str = &line[6..];
601+
if json_str == "[DONE]" {
602+
continue;
603+
}
604+
605+
let parsed: Value = match serde_json::from_str(json_str) {
606+
Ok(v) => v,
607+
Err(_) => continue,
608+
};
609+
610+
let event_type = parsed["type"].as_str().unwrap_or("");
611+
612+
match event_type {
613+
"message_start" => {
614+
// Extract input token usage from the message start event.
615+
input_tokens = parsed["message"]["usage"]["input_tokens"]
616+
.as_u64()
617+
.unwrap_or(0) as u32;
618+
}
619+
"content_block_start" => {
620+
let block_type = parsed["content_block"]["type"].as_str().unwrap_or("");
621+
if block_type == "tool_use" {
622+
in_tool_use = true;
623+
let id = parsed["content_block"]["id"]
624+
.as_str()
625+
.unwrap_or("")
626+
.to_string();
627+
let name = parsed["content_block"]["name"]
628+
.as_str()
629+
.unwrap_or("")
630+
.to_string();
631+
let _ = app_handle.emit(
632+
"ai:stream-chunk",
633+
&StreamChunk::ToolUseStart { id, name },
634+
);
635+
}
636+
// text blocks: wait for deltas
637+
}
638+
"content_block_delta" => {
639+
let delta_type = parsed["delta"]["type"].as_str().unwrap_or("");
640+
match delta_type {
641+
"text_delta" => {
642+
let text = parsed["delta"]["text"]
643+
.as_str()
644+
.unwrap_or("")
645+
.to_string();
646+
let _ = app_handle
647+
.emit("ai:stream-chunk", &StreamChunk::TextDelta(text));
648+
}
649+
"input_json_delta" => {
650+
let partial_json = parsed["delta"]["partial_json"]
651+
.as_str()
652+
.unwrap_or("")
653+
.to_string();
654+
let _ = app_handle.emit(
655+
"ai:stream-chunk",
656+
&StreamChunk::ToolUseDelta(partial_json),
657+
);
658+
}
659+
_ => {}
660+
}
661+
}
662+
"content_block_stop" => {
663+
if in_tool_use {
664+
let _ =
665+
app_handle.emit("ai:stream-chunk", &StreamChunk::ToolUseEnd);
666+
in_tool_use = false;
667+
}
668+
}
669+
"message_delta" => {
670+
output_tokens = parsed["usage"]["output_tokens"]
671+
.as_u64()
672+
.unwrap_or(0) as u32;
673+
}
674+
"message_stop" => {
675+
let _ = app_handle.emit(
676+
"ai:stream-chunk",
677+
&StreamChunk::Complete {
678+
input_tokens,
679+
output_tokens,
680+
},
681+
);
682+
}
683+
"error" => {
684+
let error_msg = parsed["error"]["message"]
685+
.as_str()
686+
.unwrap_or("Unknown streaming error")
687+
.to_string();
688+
let _ = app_handle
689+
.emit("ai:stream-chunk", &StreamChunk::Error(error_msg));
690+
}
691+
_ => {}
692+
}
693+
}
694+
}
695+
696+
Ok(())
697+
}
698+
462699
/// Route a message to the correct provider's send function.
463700
pub async fn send_message(
464701
config: &ProviderConfig,

0 commit comments

Comments
 (0)