Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 153 additions & 23 deletions src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use std::{
fs::File,
io, mem,
io,
path::{Path, PathBuf},
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};
Expand Down Expand Up @@ -590,6 +590,55 @@ pub(crate) trait ProfilerEngine: Send + Sync + 'static {
fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
}

/// Holds the profiler task state and performs a final synchronous report
/// when the task is cancelled (e.g. Tokio runtime shutdown) before a
/// graceful stop. The local reporter will flush its contents on drop.
/// For other reporters, you must call `RunningProfiler::stop().await`
/// to ensure the last sample is uploaded.
struct ProfilerTaskState<E: ProfilerEngine> {
state: ProfilerState<E>,
reporter: Box<dyn Reporter + Send + Sync>,
agent_metadata: Option<AgentMetadata>,
reporting_interval: Duration,
completed_normally: bool,
}

impl<E: ProfilerEngine> ProfilerTaskState<E> {
fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let start = self.state.stop()?.ok_or("profiler was not running")?;
let jfr_file = self.state.jfr_file.as_ref().ok_or("jfr file missing")?;
let jfr_path = jfr_file.active_path();
if jfr_path.metadata()?.len() == 0 {
return Ok(());
}
let metadata = ReportMetadata {
instance: self
.agent_metadata
.as_ref()
.unwrap_or(&AgentMetadata::NoMetadata),
start: start.duration_since(UNIX_EPOCH)?,
end: SystemTime::now().duration_since(UNIX_EPOCH)?,
reporting_interval: self.reporting_interval,
};
self.reporter
.report_blocking(&jfr_path, &metadata)
.map_err(|e| e.to_string())?;
Ok(())
}
}

impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
fn drop(&mut self) {
if self.completed_normally || !self.state.is_started() {
return;
}
tracing::info!("profiler task cancelled, attempting final report on drop");
if let Err(err) = self.try_final_report() {
tracing::warn!(?err, "failed to report on drop");
}
}
}

#[derive(Debug, Error)]
#[non_exhaustive]
enum TickError {
Expand Down Expand Up @@ -1092,18 +1141,6 @@ impl Profiler {
}

fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
struct LogOnDrop;
impl Drop for LogOnDrop {
fn drop(&mut self) {
// Tokio will call destructors during runtime shutdown. Have something that will
// emit a log in that case
tracing::info!(
"unable to upload the last jfr due to Tokio runtime shutdown. \
Add a call to `RunningProfiler::stop` to wait for jfr upload to finish."
);
}
}

// Initialize async profiler - needs to be done once.
E::init_async_profiler()?;
tracing::info!("successfully initialized async profiler.");
Expand All @@ -1113,19 +1150,23 @@ impl Profiler {

// Get profiles at the configured interval rate.
let join_handle = tokio::spawn(async move {
let mut state = match ProfilerState::new(asprof, self.profiler_options) {
let state = match ProfilerState::new(asprof, self.profiler_options) {
Ok(state) => state,
Err(err) => {
tracing::error!(?err, "unable to create profiler state");
return;
}
};

// Lazily-loaded if not specified up front.
let mut agent_metadata = self.agent_metadata;
let mut done = false;
let mut task = ProfilerTaskState {
state,
reporter: self.reporter,
agent_metadata: self.agent_metadata,
reporting_interval: self.reporting_interval,
completed_normally: false,
};

let guard = LogOnDrop;
let mut done = false;
while !done {
// Wait until a timer or exit event
tokio::select! {
Expand All @@ -1145,10 +1186,10 @@ impl Profiler {
}

if let Err(err) = profiler_tick(
&mut state,
&mut agent_metadata,
&*self.reporter,
self.reporting_interval,
&mut task.state,
&mut task.agent_metadata,
&*task.reporter,
task.reporting_interval,
)
.await
{
Expand All @@ -1165,7 +1206,7 @@ impl Profiler {
}
}

mem::forget(guard);
task.completed_normally = true;
tracing::info!("profiling task finished");
});

Expand Down Expand Up @@ -1575,4 +1616,93 @@ mod tests {
bad => panic!("{bad:?}"),
};
}

/// A reporter that tracks both async and blocking reports separately.
struct BlockingMockReporter {
async_tx: tokio::sync::mpsc::Sender<String>,
blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
}
impl std::fmt::Debug for BlockingMockReporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockingMockReporter").finish()
}
}

#[async_trait::async_trait]
impl Reporter for BlockingMockReporter {
async fn report(
&self,
jfr: Vec<u8>,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
self.async_tx
.send(String::from_utf8(jfr).unwrap())
.await
.unwrap();
Ok(())
}

fn report_blocking(
&self,
jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
self.blocking_reports
.lock()
.unwrap()
.push(String::from_utf8(jfr).unwrap());
Ok(())
}
}

/// Simulates a runtime shutdown while the profiler is running.
/// The profiler should call report_blocking on drop to flush the
/// last sample.
#[test]
fn test_profiler_report_on_drop() {
let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.start_paused(true)
.build()
.unwrap();

let reports_clone = blocking_reports.clone();
rt.block_on(async {
let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
let agent = ProfilerBuilder::default()
.with_reporter(BlockingMockReporter {
async_tx,
blocking_reports: reports_clone,
})
.with_custom_agent_metadata(AgentMetadata::NoMetadata)
.build();
// Detach so the stop channel doesn't trigger a graceful stop
// when the block_on future returns.
agent
.spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
counter: AtomicU32::new(0),
})
.unwrap()
.detach();

// Wait for first async report to confirm profiler is running
let jfr = async_rx.recv().await.unwrap();
assert_eq!(jfr, "JFR0");
// Return without stopping — runtime drop will cancel the task.
});

// Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop.
drop(rt);

let reports = blocking_reports.lock().unwrap();
assert_eq!(
reports.len(),
1,
"expected exactly one blocking report on drop"
);
assert_eq!(reports[0], "JFR1");
}
}
58 changes: 51 additions & 7 deletions src/reporter/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use async_trait::async_trait;
use chrono::SecondsFormat;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use thiserror::Error;

Expand All @@ -23,12 +23,19 @@ enum LocalReporterError {
///
/// The files are reported with the filename `yyyy-mm-ddTHH-MM-SSZ.jfr`
///
/// Unlike other reporters (e.g. S3), the local reporter will
/// flush any pending JFR data on drop when the profiler task is cancelled
/// (for example, during Tokio runtime shutdown). Other reporters require
/// an explicit call to [`RunningProfiler::stop`] to ensure the last sample
/// is uploaded.
///
/// It does not currently use the metadata, so if you are using
/// [LocalReporter] alone, rather than inside a [MultiReporter], you
/// can just use [AgentMetadata::NoMetadata] as metadata.
///
/// [AgentMetadata::NoMetadata]: crate::metadata::AgentMetadata::NoMetadata
/// [MultiReporter]: crate::reporter::multi::MultiReporter
/// [`RunningProfiler::stop`]: crate::profiler::RunningProfiler::stop
///
/// ### Example
///
Expand Down Expand Up @@ -61,18 +68,22 @@ impl LocalReporter {
}
}

fn jfr_file_name() -> String {
let time: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
let time = time
.to_rfc3339_opts(SecondsFormat::Secs, true)
.replace(":", "-");
format!("{time}.jfr")
}

/// Writes the jfr file to disk.
async fn report_profiling_data(
&self,
jfr: Vec<u8>,
_metadata_obj: &ReportMetadata<'_>,
) -> Result<(), std::io::Error> {
let time: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
let time = time
.to_rfc3339_opts(SecondsFormat::Secs, true)
.replace(":", "-");
tracing::debug!("reporting {time}.jfr");
let file_name = format!("{time}.jfr");
let file_name = Self::jfr_file_name();
tracing::debug!("reporting {file_name}");
tokio::fs::write(self.directory.join(file_name), jfr).await?;
Ok(())
}
Expand All @@ -89,6 +100,18 @@ impl Reporter for LocalReporter {
.await
.map_err(|e| Box::new(LocalReporterError::IoError(e)) as _)
}

fn report_blocking(
&self,
jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let file_name = Self::jfr_file_name();
tracing::debug!("reporting {file_name} (blocking)");
std::fs::copy(jfr_path, self.directory.join(file_name))
.map(|_| ())
.map_err(|e| Box::new(LocalReporterError::IoError(e)) as _)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -120,4 +143,25 @@ mod test {
.unwrap();
assert_eq!(tokio::fs::read(jfr_file.path()).await.unwrap(), b"JFR");
}

#[test]
fn test_local_reporter_report_blocking() {
let dir = tempfile::tempdir().unwrap();
let src = dir.path().join("input.jfr");
std::fs::write(&src, b"JFR-DROP").unwrap();
let out_dir = tempfile::tempdir().unwrap();
let reporter = LocalReporter::new(out_dir.path());
reporter.report_blocking(&src, &DUMMY_METADATA).unwrap();
let jfr_file = std::fs::read_dir(out_dir.path())
.unwrap()
.flat_map(|f| f.ok())
.filter(|f| {
Path::new(&f.file_name())
.extension()
.is_some_and(|e| e == "jfr")
})
.next()
.unwrap();
assert_eq!(std::fs::read(jfr_file.path()).unwrap(), b"JFR-DROP");
Comment on lines +150 to +165
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this test testing? It does not seem to actually test that dropping the local reporter does something.

}
}
19 changes: 19 additions & 0 deletions src/reporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! 3. [multi::MultiReporter], which allows combining multiple reporters.

use std::fmt;
use std::path::Path;

use async_trait::async_trait;

Expand Down Expand Up @@ -38,4 +39,22 @@ pub trait Reporter: fmt::Debug {
jfr: Vec<u8>,
metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>>;

/// Synchronously report profiling data. Called during drop when the
/// async runtime is shutting down and async reporting is not possible.
///
/// The default implementation does nothing. Reporters that can perform
/// synchronous I/O (like [`local::LocalReporter`]) should override this.
#[doc(hidden)]
fn report_blocking(
&self,
_jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
tracing::info!(
"reporter does not support synchronous reporting, last sample will be lost. \
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this message is only reached if a sample is actually lost

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this say call .stop().await`? Do we already have something that does that? Do we know anything like the time range of that sample?

Call `RunningProfiler::stop().await` before shutdown to ensure all samples are uploaded."
);
Ok(())
}
}
Loading
Loading