Skip to content

Commit 1309725

Browse files
lovasoaclaude
andcommitted
use .instrument() instead of .entered() for async spans
Span guards from .entered() do not propagate correctly across await points. Switch to tracing::Instrument to ensure spans are properly associated with their async tasks throughout their lifetime. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3b0c37c commit 1309725

4 files changed

Lines changed: 142 additions & 134 deletions

File tree

src/webserver/database/execute_queries.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::{anyhow, Context};
22
use futures_util::stream::Stream;
33
use futures_util::StreamExt;
4+
use tracing::Instrument;
45
use serde_json::Value;
56
use std::borrow::Cow;
67
use std::path::Path;
@@ -68,24 +69,23 @@ pub fn stream_query_results_with_conn<'a>(
6869
request.server_timing.record("bind_params");
6970
let connection = take_connection(&request.app_state.db, db_connection, request).await?;
7071
log::trace!("Executing query {:?}", query.sql);
71-
let _query_span = tracing::info_span!(
72+
let query_span = tracing::info_span!(
7273
"db.query",
7374
db.query.text = query.sql,
7475
db.system.name = request.app_state.db.info.database_type.otel_name(),
7576
code.file.path = %source_file.display(),
7677
code.line.number = source_line_number(stmt.query_position.start.line),
77-
)
78-
.entered();
78+
);
7979
let mut stream = connection.fetch_many(query);
8080
let mut error = None;
81-
while let Some(elem) = stream.next().await {
81+
while let Some(elem) = stream.next().instrument(query_span.clone()).await {
8282
let mut query_result = parse_single_sql_result(source_file, stmt, elem);
8383
if let DbItem::Error(e) = query_result {
8484
error = Some(e);
8585
break;
8686
}
8787
apply_json_columns(&mut query_result, &stmt.json_columns);
88-
apply_delayed_functions(request, &stmt.delayed_functions, &mut query_result).await?;
88+
apply_delayed_functions(request, &stmt.delayed_functions, &mut query_result).instrument(query_span.clone()).await?;
8989
for db_item in parse_dynamic_rows(query_result) {
9090
yield db_item;
9191
}
@@ -229,15 +229,14 @@ async fn execute_set_variable_query<'a>(
229229
query.sql
230230
);
231231

232-
let _query_span = tracing::info_span!(
232+
let query_span = tracing::info_span!(
233233
"db.query",
234234
db.query.text = query.sql,
235235
db.system.name = request.app_state.db.info.database_type.otel_name(),
236236
code.file.path = %source_file.display(),
237237
code.line.number = source_line_number(statement.query_position.start.line),
238-
)
239-
.entered();
240-
let value = match connection.fetch_optional(query).await {
238+
);
239+
let value = match connection.fetch_optional(query).instrument(query_span).await {
241240
Ok(Some(row)) => row_to_string(&row),
242241
Ok(None) => None,
243242
Err(e) => {
@@ -309,8 +308,8 @@ async fn take_connection<'a>(
309308
return Ok(c);
310309
}
311310
let pool_size = db.connection.size();
312-
let _acquire_span = tracing::info_span!("db.pool.acquire", db.pool.size = pool_size,).entered();
313-
match db.connection.acquire().await {
311+
let acquire_span = tracing::info_span!("db.pool.acquire", db.pool.size = pool_size,);
312+
match db.connection.acquire().instrument(acquire_span).await {
314313
Ok(c) => {
315314
log::debug!("Acquired a database connection");
316315
request.server_timing.record("db_conn");

src/webserver/database/sqlpage_functions/functions.rs

Lines changed: 119 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::webserver::{
1212
};
1313
use anyhow::{anyhow, Context};
1414
use futures_util::StreamExt;
15+
use tracing::Instrument;
1516
use mime_guess::mime;
1617
use std::fmt::Write;
1718
use std::{borrow::Cow, ffi::OsStr, str::FromStr};
@@ -136,16 +137,16 @@ async fn exec<'a>(
136137
Make sure you understand the security implications before enabling it, and never allow user input to be passed as the first argument to this function.
137138
You can enable it by setting the allow_exec option to true in the sqlpage.json configuration file.")
138139
}
139-
let _exec_span = tracing::info_span!(
140+
let exec_span = tracing::info_span!(
140141
"subprocess",
141142
otel.name = format!("EXEC {program_name}"),
142143
process.command = %program_name,
143144
process.args_count = args.len(),
144-
)
145-
.entered();
145+
);
146146
let res = tokio::process::Command::new(&*program_name)
147147
.args(args.iter().map(|x| &**x))
148148
.output()
149+
.instrument(exec_span)
149150
.await
150151
.with_context(|| {
151152
let mut s = format!("Unable to execute command: {program_name}");
@@ -220,46 +221,49 @@ async fn fetch(
220221
url.full = %http_request.url,
221222
http.response.status_code = tracing::field::Empty,
222223
);
223-
let _guard = fetch_span.enter();
224224

225-
let client = make_http_client(&request.app_state.config)
226-
.with_context(|| "Unable to create an HTTP client")?;
227-
let req = build_request(&client, &http_request)?;
225+
async {
226+
let client = make_http_client(&request.app_state.config)
227+
.with_context(|| "Unable to create an HTTP client")?;
228+
let req = build_request(&client, &http_request)?;
228229

229-
log::info!("Fetching {}", http_request.url);
230-
let mut response = if let Some(body) = &http_request.body {
231-
let (body, req) = prepare_request_body(body, req)?;
232-
req.send_body(body)
233-
} else {
234-
req.send()
235-
}
236-
.await
237-
.map_err(|e| anyhow!("Unable to fetch {}: {e}", http_request.url))?;
230+
log::info!("Fetching {}", http_request.url);
231+
let mut response = if let Some(body) = &http_request.body {
232+
let (body, req) = prepare_request_body(body, req)?;
233+
req.send_body(body)
234+
} else {
235+
req.send()
236+
}
237+
.await
238+
.map_err(|e| anyhow!("Unable to fetch {}: {e}", http_request.url))?;
238239

239-
fetch_span.record(
240-
"http.response.status_code",
241-
i64::from(response.status().as_u16()),
242-
);
240+
tracing::Span::current().record(
241+
"http.response.status_code",
242+
i64::from(response.status().as_u16()),
243+
);
243244

244-
log::debug!(
245-
"Finished fetching {}. Status: {}",
246-
http_request.url,
247-
response.status()
248-
);
245+
log::debug!(
246+
"Finished fetching {}. Status: {}",
247+
http_request.url,
248+
response.status()
249+
);
249250

250-
let body = response
251-
.body()
252-
.await
253-
.with_context(|| {
254-
format!(
255-
"Unable to read the body of the response from {}",
256-
http_request.url
257-
)
258-
})?
259-
.to_vec();
260-
let response_str = decode_response(body, http_request.response_encoding.as_deref())?;
261-
log::debug!("Fetch response: {response_str}");
262-
Ok(Some(response_str))
251+
let body = response
252+
.body()
253+
.await
254+
.with_context(|| {
255+
format!(
256+
"Unable to read the body of the response from {}",
257+
http_request.url
258+
)
259+
})?
260+
.to_vec();
261+
let response_str = decode_response(body, http_request.response_encoding.as_deref())?;
262+
log::debug!("Fetch response: {response_str}");
263+
Ok(Some(response_str))
264+
}
265+
.instrument(fetch_span)
266+
.await
263267
}
264268

265269
fn decode_response(response: Vec<u8>, encoding: Option<&str>) -> anyhow::Result<String> {
@@ -315,84 +319,88 @@ async fn fetch_with_meta(
315319
url.full = %http_request.url,
316320
http.response.status_code = tracing::field::Empty,
317321
);
318-
let _guard = fetch_span.enter();
319322

320-
let client = make_http_client(&request.app_state.config)
321-
.with_context(|| "Unable to create an HTTP client")?;
322-
let req = build_request(&client, &http_request)?;
323+
async {
324+
let client = make_http_client(&request.app_state.config)
325+
.with_context(|| "Unable to create an HTTP client")?;
326+
let req = build_request(&client, &http_request)?;
323327

324-
log::info!("Fetching {} with metadata", http_request.url);
325-
let response_result = if let Some(body) = &http_request.body {
326-
let (body, req) = prepare_request_body(body, req)?;
327-
req.send_body(body).await
328-
} else {
329-
req.send().await
330-
};
331-
332-
let mut resp_str = Vec::new();
333-
let mut encoder = serde_json::Serializer::new(&mut resp_str);
334-
let mut obj = encoder.serialize_map(Some(3))?;
335-
match response_result {
336-
Ok(mut response) => {
337-
let status = response.status();
338-
fetch_span.record("http.response.status_code", i64::from(status.as_u16()));
339-
obj.serialize_entry("status", &status.as_u16())?;
340-
let mut has_error = false;
341-
if status.is_server_error() {
342-
has_error = true;
343-
obj.serialize_entry("error", &format!("Server error: {status}"))?;
344-
}
328+
log::info!("Fetching {} with metadata", http_request.url);
329+
let response_result = if let Some(body) = &http_request.body {
330+
let (body, req) = prepare_request_body(body, req)?;
331+
req.send_body(body).await
332+
} else {
333+
req.send().await
334+
};
335+
336+
let mut resp_str = Vec::new();
337+
let mut encoder = serde_json::Serializer::new(&mut resp_str);
338+
let mut obj = encoder.serialize_map(Some(3))?;
339+
match response_result {
340+
Ok(mut response) => {
341+
let status = response.status();
342+
tracing::Span::current()
343+
.record("http.response.status_code", i64::from(status.as_u16()));
344+
obj.serialize_entry("status", &status.as_u16())?;
345+
let mut has_error = false;
346+
if status.is_server_error() {
347+
has_error = true;
348+
obj.serialize_entry("error", &format!("Server error: {status}"))?;
349+
}
345350

346-
let headers = response.headers();
347-
348-
let is_json = headers
349-
.get("content-type")
350-
.and_then(|v| v.to_str().ok())
351-
.unwrap_or_default()
352-
.starts_with("application/json");
353-
354-
obj.serialize_entry(
355-
"headers",
356-
&headers
357-
.iter()
358-
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default()))
359-
.collect::<std::collections::HashMap<_, _>>(),
360-
)?;
361-
362-
match response.body().await {
363-
Ok(body) => {
364-
let body_bytes = body.to_vec();
365-
let body_str =
366-
decode_response(body_bytes, http_request.response_encoding.as_deref())?;
367-
if is_json {
368-
obj.serialize_entry(
369-
"json_body",
370-
&serde_json::value::RawValue::from_string(body_str)?,
371-
)?;
372-
} else {
373-
obj.serialize_entry("body", &body_str)?;
351+
let headers = response.headers();
352+
353+
let is_json = headers
354+
.get("content-type")
355+
.and_then(|v| v.to_str().ok())
356+
.unwrap_or_default()
357+
.starts_with("application/json");
358+
359+
obj.serialize_entry(
360+
"headers",
361+
&headers
362+
.iter()
363+
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default()))
364+
.collect::<std::collections::HashMap<_, _>>(),
365+
)?;
366+
367+
match response.body().await {
368+
Ok(body) => {
369+
let body_bytes = body.to_vec();
370+
let body_str =
371+
decode_response(body_bytes, http_request.response_encoding.as_deref())?;
372+
if is_json {
373+
obj.serialize_entry(
374+
"json_body",
375+
&serde_json::value::RawValue::from_string(body_str)?,
376+
)?;
377+
} else {
378+
obj.serialize_entry("body", &body_str)?;
379+
}
374380
}
375-
}
376-
Err(e) => {
377-
log::warn!("Failed to read response body: {e}");
378-
if !has_error {
379-
obj.serialize_entry(
380-
"error",
381-
&format!("Failed to read response body: {e}"),
382-
)?;
381+
Err(e) => {
382+
log::warn!("Failed to read response body: {e}");
383+
if !has_error {
384+
obj.serialize_entry(
385+
"error",
386+
&format!("Failed to read response body: {e}"),
387+
)?;
388+
}
383389
}
384390
}
385391
}
392+
Err(e) => {
393+
log::warn!("Request failed: {e}");
394+
obj.serialize_entry("error", &format!("Request failed: {e}"))?;
395+
}
386396
}
387-
Err(e) => {
388-
log::warn!("Request failed: {e}");
389-
obj.serialize_entry("error", &format!("Request failed: {e}"))?;
390-
}
391-
}
392397

393-
obj.end()?;
394-
let return_value = String::from_utf8(resp_str)?;
395-
Ok(Some(return_value))
398+
obj.end()?;
399+
let return_value = String::from_utf8(resp_str)?;
400+
Ok(Some(return_value))
401+
}
402+
.instrument(fetch_span)
403+
.await
396404
}
397405

398406
pub(crate) async fn hash_password(password: Option<String>) -> anyhow::Result<Option<String>> {
@@ -610,12 +618,11 @@ async fn run_sql<'a>(
610618
log::debug!("run_sql: first argument is NULL, returning NULL");
611619
return Ok(None);
612620
};
613-
let _run_sql_span = tracing::info_span!(
621+
let run_sql_span = tracing::info_span!(
614622
"sqlpage.file",
615623
otel.name = format!("SQL {sql_file_path}"),
616624
code.file.path = %sql_file_path,
617-
)
618-
.entered();
625+
);
619626
let app_state = &request.app_state;
620627
let sql_file = app_state
621628
.sql_file_cache
@@ -624,6 +631,7 @@ async fn run_sql<'a>(
624631
std::path::Path::new(sql_file_path.as_ref()),
625632
true,
626633
)
634+
.instrument(run_sql_span.clone())
627635
.await
628636
.with_context(|| format!("run_sql: invalid path {sql_file_path:?}"))?;
629637
let tmp_req = if let Some(variables) = variables {
@@ -652,7 +660,7 @@ async fn run_sql<'a>(
652660
let mut json_results_bytes = Vec::new();
653661
let mut json_encoder = serde_json::Serializer::new(&mut json_results_bytes);
654662
let mut seq = json_encoder.serialize_seq(None)?;
655-
while let Some(db_item) = results_stream.next().await {
663+
while let Some(db_item) = results_stream.next().instrument(run_sql_span.clone()).await {
656664
use crate::webserver::database::DbItem::{Error, FinishedQuery, Row};
657665
match db_item {
658666
Row(row) => {

0 commit comments

Comments
 (0)