Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions hopper-core/src/execute/forkcli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,21 @@ impl ForkCli {
compiler_fence(Ordering::SeqCst);
crate::log!(trace, "receive status {:?} from fork server (fast)", status);
self.usage.add_time(&t);
let mut wait_outer_status = false;
if status.is_loop_end() {
status = io_utils::receive_line(reader).with_context(|| {
format!(
"program: {program}\n history: {}",
self.history.serialize().unwrap()
)
})?;
wait_outer_status = true;
} else if !status.is_normal() {
// In fast mode, non-normal inner status breaks the loop in server side.
// The server will still send one outer status for stopping the loop.
wait_outer_status = true;
}
if wait_outer_status {
// wait for outer ping for finish process
let _: StatusType =
io_utils::receive_line(reader).context("stop process status")?;
Expand Down Expand Up @@ -276,3 +284,139 @@ fn socket_path() -> PathBuf {
// dir.join("hopper")
dir.join(format!("hopper_socket_{}", since_the_epoch.as_millis()))
}

#[cfg(test)]
mod tests {
use super::*;
use std::io::{BufRead, Write};
use std::time::Duration;

#[cfg(target_family = "unix")]
fn read_fast_request(reader: &mut BufReader<UnixStream>) {
let mut cmd = String::new();
reader.read_line(&mut cmd).unwrap();
assert_eq!(cmd.trim_end(), ForkCmd::Loop.serialize().unwrap());

let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).unwrap();
assert!(n > 0);
if line == "<END>\n" {
break;
}
}
}

#[cfg(target_family = "unix")]
#[test]
fn test_fast_mode_missing_outer_status_times_out() {
let (normal_cli, _normal_srv) = UnixStream::pair().unwrap();
let (fast_cli, fast_srv) = UnixStream::pair().unwrap();
fast_cli
.set_read_timeout(Some(Duration::from_millis(80)))
.unwrap();
fast_cli
.set_write_timeout(Some(Duration::from_millis(80)))
.unwrap();

let mut cli = ForkCli {
socket_path: std::env::temp_dir().join("hopper_socket_unit_test_do_not_exist"),
reader: BufReader::new(normal_cli.try_clone().unwrap()),
writer: BufWriter::new(normal_cli),
fast_io: Some((
BufReader::new(fast_cli.try_clone().unwrap()),
BufWriter::new(fast_cli),
)),
history: vec![],
usage: TimeUsage::default(),
};

let server = std::thread::spawn(move || {
let mut reader = BufReader::new(fast_srv.try_clone().unwrap());
let mut writer = BufWriter::new(fast_srv);

read_fast_request(&mut reader);

writeln!(writer, "{}", StatusType::LoopEnd.serialize().unwrap()).unwrap();
writeln!(writer, "{}", StatusType::Normal.serialize().unwrap()).unwrap();
writer.flush().unwrap();
std::thread::sleep(Duration::from_millis(200));
});

let program = FuzzProgram::default();
let err = cli.execute_program_fast(&program).unwrap_err();
let err_msg = format!("{err:#?}");
assert!(
err_msg.contains("stop process status"),
"unexpected error: {err_msg}"
);
assert!(
err_msg.contains("fail to read line"),
"unexpected error: {err_msg}"
);

server.join().unwrap();
}

#[cfg(target_family = "unix")]
#[test]
fn test_fast_mode_consumes_outer_status_after_non_normal_inner_status() {
let (normal_cli, _normal_srv) = UnixStream::pair().unwrap();
let (fast_cli, fast_srv) = UnixStream::pair().unwrap();
fast_cli
.set_read_timeout(Some(Duration::from_millis(300)))
.unwrap();
fast_cli
.set_write_timeout(Some(Duration::from_millis(300)))
.unwrap();

let mut cli = ForkCli {
socket_path: std::env::temp_dir().join("hopper_socket_unit_test_do_not_exist"),
reader: BufReader::new(normal_cli.try_clone().unwrap()),
writer: BufWriter::new(normal_cli),
fast_io: Some((
BufReader::new(fast_cli.try_clone().unwrap()),
BufWriter::new(fast_cli),
)),
history: vec![],
usage: TimeUsage::default(),
};

let server = std::thread::spawn(move || {
let mut reader = BufReader::new(fast_srv.try_clone().unwrap());
let mut writer = BufWriter::new(fast_srv);

read_fast_request(&mut reader);
writeln!(
writer,
"{}",
StatusType::Crash {
signal: Signal::SIGABRT
}
.serialize()
.unwrap()
)
.unwrap();
writeln!(writer, "{}", StatusType::Normal.serialize().unwrap()).unwrap();
writer.flush().unwrap();

read_fast_request(&mut reader);
writeln!(writer, "{}", StatusType::Ignore.serialize().unwrap()).unwrap();
writeln!(writer, "{}", StatusType::Normal.serialize().unwrap()).unwrap();
writer.flush().unwrap();
});

let program = FuzzProgram::default();
let first = cli.execute_program_fast(&program).unwrap();
assert!(first.is_crash(), "unexpected first status: {first:?}");

let second = cli.execute_program_fast(&program).unwrap();
assert!(
second.is_ignore(),
"expected second inner status from current round, got: {second:?}"
);

server.join().unwrap();
}
}