From 4f55c6ce0538e9bdaa6f15a811464f8e78bf9f7d Mon Sep 17 00:00:00 2001 From: sp1npx Date: Tue, 3 Mar 2026 11:45:30 +0800 Subject: [PATCH] update --- hopper-core/src/execute/forkcli.rs | 144 +++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/hopper-core/src/execute/forkcli.rs b/hopper-core/src/execute/forkcli.rs index f8acb68..958f5c1 100644 --- a/hopper-core/src/execute/forkcli.rs +++ b/hopper-core/src/execute/forkcli.rs @@ -131,6 +131,7 @@ impl ForkCli { compiler_fence(Ordering::SeqCst); crate::log!(trace, "receive status {:?} from fork server (fast)", status); self.usage.add_time(&t); + let mut wait_outer_status = false; if status.is_loop_end() { status = io_utils::receive_line(reader).with_context(|| { format!( @@ -138,6 +139,13 @@ impl ForkCli { self.history.serialize().unwrap() ) })?; + wait_outer_status = true; + } else if !status.is_normal() { + // In fast mode, non-normal inner status breaks the loop in server side. + // The server will still send one outer status for stopping the loop. + wait_outer_status = true; + } + if wait_outer_status { // wait for outer ping for finish process let _: StatusType = io_utils::receive_line(reader).context("stop process status")?; @@ -276,3 +284,139 @@ fn socket_path() -> PathBuf { // dir.join("hopper") dir.join(format!("hopper_socket_{}", since_the_epoch.as_millis())) } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::{BufRead, Write}; + use std::time::Duration; + + #[cfg(target_family = "unix")] + fn read_fast_request(reader: &mut BufReader) { + let mut cmd = String::new(); + reader.read_line(&mut cmd).unwrap(); + assert_eq!(cmd.trim_end(), ForkCmd::Loop.serialize().unwrap()); + + let mut line = String::new(); + loop { + line.clear(); + let n = reader.read_line(&mut line).unwrap(); + assert!(n > 0); + if line == "\n" { + break; + } + } + } + + #[cfg(target_family = "unix")] + #[test] + fn test_fast_mode_missing_outer_status_times_out() { + let (normal_cli, _normal_srv) = UnixStream::pair().unwrap(); + let (fast_cli, fast_srv) = UnixStream::pair().unwrap(); + fast_cli + .set_read_timeout(Some(Duration::from_millis(80))) + .unwrap(); + fast_cli + .set_write_timeout(Some(Duration::from_millis(80))) + .unwrap(); + + let mut cli = ForkCli { + socket_path: std::env::temp_dir().join("hopper_socket_unit_test_do_not_exist"), + reader: BufReader::new(normal_cli.try_clone().unwrap()), + writer: BufWriter::new(normal_cli), + fast_io: Some(( + BufReader::new(fast_cli.try_clone().unwrap()), + BufWriter::new(fast_cli), + )), + history: vec![], + usage: TimeUsage::default(), + }; + + let server = std::thread::spawn(move || { + let mut reader = BufReader::new(fast_srv.try_clone().unwrap()); + let mut writer = BufWriter::new(fast_srv); + + read_fast_request(&mut reader); + + writeln!(writer, "{}", StatusType::LoopEnd.serialize().unwrap()).unwrap(); + writeln!(writer, "{}", StatusType::Normal.serialize().unwrap()).unwrap(); + writer.flush().unwrap(); + std::thread::sleep(Duration::from_millis(200)); + }); + + let program = FuzzProgram::default(); + let err = cli.execute_program_fast(&program).unwrap_err(); + let err_msg = format!("{err:#?}"); + assert!( + err_msg.contains("stop process status"), + "unexpected error: {err_msg}" + ); + assert!( + err_msg.contains("fail to read line"), + "unexpected error: {err_msg}" + ); + + server.join().unwrap(); + } + + #[cfg(target_family = "unix")] + #[test] + fn test_fast_mode_consumes_outer_status_after_non_normal_inner_status() { + let (normal_cli, _normal_srv) = UnixStream::pair().unwrap(); + let (fast_cli, fast_srv) = UnixStream::pair().unwrap(); + fast_cli + .set_read_timeout(Some(Duration::from_millis(300))) + .unwrap(); + fast_cli + .set_write_timeout(Some(Duration::from_millis(300))) + .unwrap(); + + let mut cli = ForkCli { + socket_path: std::env::temp_dir().join("hopper_socket_unit_test_do_not_exist"), + reader: BufReader::new(normal_cli.try_clone().unwrap()), + writer: BufWriter::new(normal_cli), + fast_io: Some(( + BufReader::new(fast_cli.try_clone().unwrap()), + BufWriter::new(fast_cli), + )), + history: vec![], + usage: TimeUsage::default(), + }; + + let server = std::thread::spawn(move || { + let mut reader = BufReader::new(fast_srv.try_clone().unwrap()); + let mut writer = BufWriter::new(fast_srv); + + read_fast_request(&mut reader); + writeln!( + writer, + "{}", + StatusType::Crash { + signal: Signal::SIGABRT + } + .serialize() + .unwrap() + ) + .unwrap(); + writeln!(writer, "{}", StatusType::Normal.serialize().unwrap()).unwrap(); + writer.flush().unwrap(); + + read_fast_request(&mut reader); + writeln!(writer, "{}", StatusType::Ignore.serialize().unwrap()).unwrap(); + writeln!(writer, "{}", StatusType::Normal.serialize().unwrap()).unwrap(); + writer.flush().unwrap(); + }); + + let program = FuzzProgram::default(); + let first = cli.execute_program_fast(&program).unwrap(); + assert!(first.is_crash(), "unexpected first status: {first:?}"); + + let second = cli.execute_program_fast(&program).unwrap(); + assert!( + second.is_ignore(), + "expected second inner status from current round, got: {second:?}" + ); + + server.join().unwrap(); + } +}