Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
bab2a28
Revert "Forward session and turn headers to MCP HTTP requests (#15011)"
nicholasclark-openai Mar 19, 2026
7df3c80
Merge branch 'main' into revert-15011-nicholasclark/tool-call-task-he…
nicholasclark-openai Mar 19, 2026
9124fe9
Plumb MCP turn metadata through _meta
nicholasclark-openai Mar 19, 2026
edc8e17
Merge RMCP tool call _meta entries
nicholasclark-openai Mar 19, 2026
fb816b1
Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta
nicholasclark-openai Mar 19, 2026
e84f542
Include session id in MCP turn metadata
nicholasclark-openai Mar 19, 2026
98f97df
Skip non-JSON apps requests in search tool test
nicholasclark-openai Mar 19, 2026
88653c9
Simplify RMCP tool call _meta plumbing
nicholasclark-openai Mar 19, 2026
db3ddea
Restore RMCP meta local variable name
nicholasclark-openai Mar 19, 2026
6e8ebdc
codex: seed turn metadata from app-server turn start
nicholasclark-openai Mar 19, 2026
8efc728
codex: narrow turn metadata propagation
nicholasclark-openai Mar 19, 2026
ec278ab
codex: tighten turn metadata staging
nicholasclark-openai Mar 19, 2026
cbd0700
Restore explicit RMCP tools/call request path
nicholasclark-openai Mar 19, 2026
b679c75
Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta
nicholasclark-openai Mar 19, 2026
5bf604c
Keep parent request values in turn metadata only
nicholasclark-openai Mar 19, 2026
6bf3961
Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta
nicholasclark-openai Mar 19, 2026
2bcc504
Merge branch 'nicholasclark/tool-call-task-headers-_meta' into nichol…
nicholasclark-openai Mar 19, 2026
656d2f5
Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta
nicholasclark-openai Mar 19, 2026
5305a38
Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta
nicholasclark-openai Mar 19, 2026
8ee421c
Merge branch 'nicholasclark/tool-call-task-headers-_meta' into nichol…
nicholasclark-openai Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions codex-rs/app-server-protocol/schema/json/ClientRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -3116,6 +3116,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13954,6 +13954,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11714,6 +11714,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/app-server-protocol/schema/json/v2/TurnStartParams.json
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import type { SandboxPolicy } from "./SandboxPolicy";
import type { UserInput } from "./UserInput";

export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
* Optional string metadata attached only to this turn.
*/
metadata?: { [key in string]?: string } | null, /**
* Override the working directory for this turn and subsequent turns.
*/
cwd?: string | null, /**
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3831,6 +3831,9 @@ pub enum TurnStatus {
pub struct TurnStartParams {
pub thread_id: String,
pub input: Vec<UserInput>,
/// Optional string metadata attached only to this turn.
#[ts(optional = nullable)]
pub metadata: Option<BTreeMap<String, String>>,
/// Override the working directory for this turn and subsequent turns.
#[ts(optional = nullable)]
pub cwd: Option<PathBuf>,
Expand Down Expand Up @@ -7938,6 +7941,7 @@ mod tests {
let without_override = TurnStartParams {
thread_id: "thread_123".to_string(),
input: vec![],
metadata: None,
cwd: None,
approval_policy: None,
approvals_reviewer: None,
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history. Like `thread/start`, `thread/fork` also accepts `ephemeral: true` for an in-memory temporary thread.
The returned `thread.ephemeral` flag tells you whether the session is intentionally in-memory only; when it is `true`, `thread.path` is `null`.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, approval policy, approvals reviewer, etc. This immediately returns the new turn object. The app-server emits `turn/started` when that turn actually begins running.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, approval policy, approvals reviewer, etc. `turn/start.metadata` also lets callers attach arbitrary string metadata to that turn. This immediately returns the new turn object. The app-server emits `turn/started` when that turn actually begins running.
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.

Expand Down Expand Up @@ -139,7 +139,7 @@ Example with notification opt-out:
- `thread/shellCommand` — run a user-initiated `!` shell command against a thread; this runs unsandboxed with full access rather than inheriting the thread sandbox policy. Returns `{}` immediately while progress streams through standard turn/item notifications and any active turn receives the formatted output in its message stream.
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. `metadata` accepts arbitrary string key/value pairs for turn-scoped observability and request forwarding. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.
Expand Down
33 changes: 33 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ use codex_state::ThreadMetadataBuilder;
use codex_state::log_db::LogDbLayer;
use codex_utils_json_to_toml::json_to_toml;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
Expand Down Expand Up @@ -5973,6 +5974,8 @@ impl CodexMessageProcessor {
.into_iter()
.map(V2UserInput::into_core)
.collect();
let next_turn_metadata = params.metadata.clone();
let clear_next_turn_metadata_on_error = next_turn_metadata.is_some();

let has_any_overrides = params.cwd.is_some()
|| params.approval_policy.is_some()
Expand Down Expand Up @@ -6010,6 +6013,13 @@ impl CodexMessageProcessor {
.await;
}

if let Some(metadata) = next_turn_metadata
&& let Err(error) = Self::set_next_turn_metadata(thread.as_ref(), metadata).await
{
self.outgoing.send_error(request_id, error).await;
return;
}

// Start the turn by submitting the user input. Return its submission id as turn_id.
let turn_id = self
.submit_core_op(
Expand Down Expand Up @@ -6038,6 +6048,15 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
if clear_next_turn_metadata_on_error
&& let Err(clear_error) =
Self::set_next_turn_metadata(thread.as_ref(), BTreeMap::new()).await
{
warn!(
error = %clear_error.message,
"failed to clear next turn metadata after turn/start submission error"
);
}
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
Expand All @@ -6062,6 +6081,20 @@ impl CodexMessageProcessor {
})
}

async fn set_next_turn_metadata(
thread: &CodexThread,
metadata: BTreeMap<String, String>,
) -> Result<(), JSONRPCErrorError> {
thread
.set_next_turn_metadata(metadata)
.await
.map_err(|err| JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: err.to_string(),
data: None,
})
}

async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) {
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Expand Down
174 changes: 174 additions & 0 deletions codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,16 +730,21 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
#[cfg(test)]
mod tests {
use super::*;
use app_test_support::create_mock_responses_server_repeating_assistant;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
use tempfile::TempDir;

async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Expand Down Expand Up @@ -857,6 +862,175 @@ mod tests {
.expect("in-process runtime should shutdown cleanly");
}

#[tokio::test]
async fn in_process_start_forwards_turn_metadata_on_turn_requests() {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new().expect("tempdir should create");
let mut client = start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(
ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await
.expect("test config should load"),
),
cli_overrides: vec![
(
"model_provider".to_string(),
TomlValue::String("mock_provider".to_string()),
),
(
"model_providers.mock_provider.name".to_string(),
TomlValue::String("Mock provider for test".to_string()),
),
(
"model_providers.mock_provider.base_url".to_string(),
TomlValue::String(format!("{}/v1", server.uri())),
),
(
"model_providers.mock_provider.wire_api".to_string(),
TomlValue::String("responses".to_string()),
),
(
"model_providers.mock_provider.request_max_retries".to_string(),
TomlValue::Integer(0),
),
(
"model_providers.mock_provider.stream_max_retries".to_string(),
TomlValue::Integer(0),
),
],
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await
.expect("in-process runtime should start");

let thread_start_response: ThreadStartResponse = serde_json::from_value(
client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(5),
params: ThreadStartParams {
model: Some("mock-model".to_string()),
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("thread/start request should work")
.expect("thread/start should succeed"),
)
.expect("thread/start response should parse");

let _turn_start_response: TurnStartResponse = serde_json::from_value(
client
.request(ClientRequest::TurnStart {
request_id: RequestId::Integer(6),
params: TurnStartParams {
thread_id: thread_start_response.thread.id,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
metadata: Some(std::collections::BTreeMap::from([
(
"parentConversationId".to_string(),
"parent-conversation-123".to_string(),
),
(
"parentMessageId".to_string(),
"parent-message-123".to_string(),
),
("parentTurnId".to_string(), "parent-turn-123".to_string()),
])),
..TurnStartParams::default()
},
})
.await
.expect("turn/start request should work")
.expect("turn/start should succeed"),
)
.expect("turn/start response should parse");

let turn_completed = loop {
let event = timeout(std::time::Duration::from_secs(10), client.next_event())
.await
.expect("timed out waiting for turn completion");
match event {
Some(InProcessServerEvent::ServerNotification(
ServerNotification::TurnCompleted(notification),
)) => break notification,
Some(_) => continue,
None => panic!("in-process runtime exited before turn completion"),
}
};
assert_eq!(
turn_completed.turn.status,
TurnStatus::Completed,
"turn error: {:?}",
turn_completed.turn.error
);
assert_eq!(turn_completed.turn.error, None);

let requests = timeout(std::time::Duration::from_secs(5), async {
loop {
let requests = server
.received_requests()
.await
.expect("received requests should be available");
if !requests.is_empty() {
break requests;
}
tokio::task::yield_now().await;
}
})
.await
.expect("timed out waiting for outbound requests");
assert!(!requests.is_empty());
for request in requests {
let turn_metadata_header = request
.headers
.get("x-codex-turn-metadata")
.and_then(|value| value.to_str().ok())
.expect("turn metadata header should be present");
let turn_metadata: serde_json::Value =
serde_json::from_str(turn_metadata_header).expect("turn metadata should be JSON");

assert_eq!(
turn_metadata.pointer("/metadata/parentConversationId"),
Some(&serde_json::json!("parent-conversation-123"))
);
assert_eq!(
turn_metadata.pointer("/metadata/parentMessageId"),
Some(&serde_json::json!("parent-message-123"))
);
assert_eq!(
turn_metadata.pointer("/metadata/parentTurnId"),
Some(&serde_json::json!("parent-turn-123"))
);
}

client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
assert!(server_notification_requires_delivery(
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/src/message_processor/tracing_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
personality: None,
output_schema: None,
collaboration_mode: None,
metadata: None,
},
},
Some(remote_trace),
Expand Down
Loading
Loading