diff --git a/README.md b/README.md index 2298dd1..1a0370d 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,6 @@ Fastsync targets the following use case: * **Compression is handled externally.** Fastsync does not compress the stream. If the data volume benefits from compression, then compress the files ahead of time with e.g. `lz4`, `brotli`, or `zstd`. - * **A full transfer is necessary.** Fastsync always sends all files. If some - files are already present at the receiving side, or similar data is already - present, `rsync` might be a better fit. ## Building @@ -49,6 +46,48 @@ On the receiving end, suppose we download with 32 TCP connections: cd /some/path fastsync recv 100.71.154.83:4440 32 +File modification timestamps are preserved during all transfers. + +## Continuous mode + +Fastsync supports continuous mode with the `--continuous` flag. When enabled, fastsync will: + +1. Compare files by name, size, and modification timestamp +2. Skip files that already exist at the destination with matching size and timestamp +3. Transfer only files that are missing or have different size/timestamp +4. Keep syncing in rounds with a 2-second delay between rounds +5. After each transfer round, wait and then re-scan the source files +6. If changes are detected in the new scan, start another sync round +7. Stop when 5 consecutive rounds finish with no changes detected + +Both sender and receiver must use the `--continuous` flag: + + # Receiver + fastsync recv : 32 --continuous + + # Sender + fastsync send : --continuous ./data + +**Note:** Continuous mode only handles file additions and modifications. File deletions from the sender are not propagated to the receiver - files that exist at the destination will remain even if they're removed from the source. + +## Testing + +To run all tests: + + cargo test + +To run only unit tests: + + cargo test --lib + +To run integration tests: + + ./integration_tests/test_continuous.sh + +## Known issues + + * It's too spammy. + * Transfer time estimation can be improved. ## License Fastsync is licensed under the [Apache 2.0 License][apache2]. A copy of the diff --git a/integration_tests/test_continuous.sh b/integration_tests/test_continuous.sh new file mode 100755 index 0000000..c668b39 --- /dev/null +++ b/integration_tests/test_continuous.sh @@ -0,0 +1,304 @@ +#!/bin/bash + +# This test will start sender and receiver +# processes on the local machine. +# It will then create/delete files in the `/tmp/` +# directory of the machine. +# +# Note: this test will also kill any +# fastsync process running on ports 8899, 8901 +# as part of cleanup. +# If you are running real fastsync operations, +# it's advisable to not run this test in parallel. + + +set -e + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Get the directory where this script is located +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" +FASTSYNC_BIN="$PROJECT_ROOT/target/release/fastsync" + +kill_fastsync_processes() { + for port in 8899 8901; do + lsof -ti:$port 2>/dev/null | while read pid; do + # Get full command line to ensure it's really fastsync + if ps -p $pid -o args= 2>/dev/null | grep -q fastsync; then + echo "Killing fastsync process $pid on port $port" + kill -TERM $pid 2>/dev/null || true + fi + done + done +} + +# Build the project if binary doesn't exist +if [ ! -f "$FASTSYNC_BIN" ]; then + echo "Building fastsync..." + cd "$PROJECT_ROOT" + cargo build --release +fi + +echo -e "${YELLOW}=== Testing Continuous Mode ===${NC}" + +echo -e "${GREEN}Killing any fastsync processes already running${NC}" +kill_fastsync_processes + + +# Test 1: Basic continuous mode functionality +echo -e "\n${YELLOW}=== Test 1: Basic Continuous Mode Functionality ===${NC}" + +# Clean up any previous test directories +rm -rf /tmp/fastsync_test_src /tmp/fastsync_test_dst +mkdir -p /tmp/fastsync_test_src /tmp/fastsync_test_dst + +# Create initial test files +echo "Creating initial test files..." +echo "File 1 content" > /tmp/fastsync_test_src/file1.txt +echo "File 2 content" > /tmp/fastsync_test_src/file2.txt +echo "File 3 content" > /tmp/fastsync_test_src/file3.txt + +# Start receiver in continuous mode first +echo -e "\n${YELLOW}Starting receiver in continuous mode...${NC}" +cd /tmp/fastsync_test_dst +"$FASTSYNC_BIN" recv 127.0.0.1:8899 2 --continuous 2>&1 | tee /tmp/fastsync_receiver.log & +RECEIVER_PID=$! +echo "Receiver PID: $RECEIVER_PID" +sleep 2 + +# Start sender in continuous mode (using . to monitor current directory) +echo -e "\n${YELLOW}Starting sender in continuous mode...${NC}" +cd /tmp/fastsync_test_src +"$FASTSYNC_BIN" send 127.0.0.1:8899 --continuous . 2>&1 | tee /tmp/fastsync_sender.log & +SENDER_PID=$! +echo "Sender PID: $SENDER_PID" + +# Wait for initial sync +echo -e "\n${YELLOW}Waiting for initial sync...${NC}" +sleep 5 + +# Check initial sync with retry logic +echo -e "\n${YELLOW}Checking initial sync...${NC}" +SYNC_SUCCESS=false +for i in {1..10}; do + if [ -f /tmp/fastsync_test_dst/file1.txt ] && \ + [ -f /tmp/fastsync_test_dst/file2.txt ] && \ + [ -f /tmp/fastsync_test_dst/file3.txt ]; then + echo -e "${GREEN}✓ Initial sync successful${NC}" + SYNC_SUCCESS=true + break + fi + echo -n "." + sleep 1 +done + +if [ "$SYNC_SUCCESS" = false ]; then + echo -e "\n${RED}✗ Initial sync failed after 10 seconds${NC}" + kill_fastsync_processes + exit 1 +fi + +# Test 1a: Modify a file +echo -e "\n${YELLOW}Test 1a: Modifying file2.txt...${NC}" +echo "File 2 modified content" > /tmp/fastsync_test_src/file2.txt +touch /tmp/fastsync_test_src/file2.txt +sleep 3 + +if [ "$(cat /tmp/fastsync_test_dst/file2.txt)" = "File 2 modified content" ]; then + echo -e "${GREEN}✓ File modification synced${NC}" +else + echo -e "${RED}✗ File modification not synced${NC}" + echo "Expected: 'File 2 modified content'" + echo "Got: '$(cat /tmp/fastsync_test_dst/file2.txt)'" +fi + +# Test 1b: Add a new file +echo -e "\n${YELLOW}Test 1b: Adding file4.txt...${NC}" +echo "File 4 new content" > /tmp/fastsync_test_src/file4.txt +sleep 3 + +if [ -f /tmp/fastsync_test_dst/file4.txt ] && \ + [ "$(cat /tmp/fastsync_test_dst/file4.txt)" = "File 4 new content" ]; then + echo -e "${GREEN}✓ New file synced${NC}" +else + echo -e "${RED}✗ New file not synced${NC}" +fi + +# Test 1c: Delete a file (should not be deleted on receiver) +echo -e "\n${YELLOW}Test 1c: Deleting file1.txt from sender...${NC}" +rm /tmp/fastsync_test_src/file1.txt +sleep 3 + +if [ -f /tmp/fastsync_test_dst/file1.txt ]; then + echo -e "${GREEN}✓ File correctly retained on receiver${NC}" +else + echo -e "${RED}✗ File incorrectly deleted on receiver${NC}" +fi + +# Test 1d: Modify multiple files +echo -e "\n${YELLOW}Test 1d: Modifying multiple files...${NC}" +echo "File 3 modified content" > /tmp/fastsync_test_src/file3.txt +echo "File 4 modified content" > /tmp/fastsync_test_src/file4.txt +sleep 3 + +if [ "$(cat /tmp/fastsync_test_dst/file3.txt)" = "File 3 modified content" ] && \ + [ "$(cat /tmp/fastsync_test_dst/file4.txt)" = "File 4 modified content" ]; then + echo -e "${GREEN}✓ Multiple file modifications synced${NC}" +else + echo -e "${RED}✗ Multiple file modifications not synced${NC}" +fi + +# Kill processes +echo -e "\n${YELLOW}Stopping sender and receiver...${NC}" +kill_fastsync_processes + +echo -e "${GREEN}✓ Test 1: Basic continuous mode functionality passed${NC}" + + +# Test 2: Auto-exit after 5 rounds with no changes +echo -e "\n${YELLOW}=== Test 2: Auto-Exit After 5 Rounds ===${NC}" + +# Clean up any previous test directories +rm -rf /tmp/fastsync_test_exit_src /tmp/fastsync_test_exit_dst +mkdir -p /tmp/fastsync_test_exit_src /tmp/fastsync_test_exit_dst + +# Create initial test files +echo "Creating initial test files..." +echo "Initial content 1" > /tmp/fastsync_test_exit_src/file1.txt +echo "Initial content 2" > /tmp/fastsync_test_exit_src/file2.txt + +# Start receiver in continuous mode first +echo -e "\n${YELLOW}Starting receiver in continuous mode...${NC}" +cd /tmp/fastsync_test_exit_dst +"$FASTSYNC_BIN" recv 127.0.0.1:8901 2 --continuous 2>&1 | tee /tmp/fastsync_receiver_exit.log & +RECEIVER_PID=$! +echo "Receiver PID: $RECEIVER_PID" +sleep 2 + +# Start sender in continuous mode +echo -e "\n${YELLOW}Starting sender in continuous mode...${NC}" +cd /tmp/fastsync_test_exit_src +"$FASTSYNC_BIN" send 127.0.0.1:8901 --continuous file1.txt file2.txt 2>&1 | tee /tmp/fastsync_sender_exit.log & +SENDER_PID=$! +echo "Sender PID: $SENDER_PID" + +# Wait for initial sync +echo -e "\n${YELLOW}Waiting for initial sync...${NC}" +sleep 5 + +# Check initial sync with retry logic +echo -e "\n${YELLOW}Checking initial sync...${NC}" +SYNC_SUCCESS=false +for i in {1..10}; do + if [ -f /tmp/fastsync_test_exit_dst/file1.txt ] && \ + [ -f /tmp/fastsync_test_exit_dst/file2.txt ]; then + echo -e "${GREEN}✓ Initial sync successful${NC}" + SYNC_SUCCESS=true + break + fi + echo -n "." + sleep 1 +done + +if [ "$SYNC_SUCCESS" = false ]; then + echo -e "\n${RED}✗ Initial sync failed after 10 seconds${NC}" + kill_fastsync_processes + exit 1 +fi + +# Test 2a: Modify a file to reset the skip counter +echo -e "\n${YELLOW}Test 2a: Modifying file1.txt to reset skip counter...${NC}" +sleep 3 # Let one round pass with all files up to date +echo "Modified content 1" > /tmp/fastsync_test_exit_src/file1.txt +sleep 3 + +if [ "$(cat /tmp/fastsync_test_exit_dst/file1.txt)" = "Modified content 1" ]; then + echo -e "${GREEN}✓ File modification synced${NC}" +else + echo -e "${RED}✗ File modification not synced${NC}" + kill_fastsync_processes + exit 1 +fi + +# Test 2b: Now let it run without changes to test the 5-round exit +echo -e "\n${YELLOW}Test 2b: Testing auto-exit after 5 rounds with no changes...${NC}" +echo -e "${BLUE}Round timings (2 seconds per round):${NC}" +echo -e "${BLUE} Round 1: 0-2 seconds${NC}" +echo -e "${BLUE} Round 2: 2-4 seconds${NC}" +echo -e "${BLUE} Round 3: 4-6 seconds${NC}" +echo -e "${BLUE} Round 4: 6-8 seconds${NC}" +echo -e "${BLUE} Round 5: 8-10 seconds${NC}" +echo -e "${BLUE} Expected exit: ~10-11 seconds${NC}" + +START_TIME=$(date +%s) + +# Monitor for up to 15 seconds (should exit around 10-11 seconds) +for i in {1..15}; do + if ! kill -0 $SENDER_PID 2>/dev/null; then + END_TIME=$(date +%s) + ELAPSED=$((END_TIME - START_TIME)) + echo -e "\n${GREEN}✓ Sender exited after $ELAPSED seconds${NC}" + + # Check if it exited in the expected time window (9-13 seconds) + if [ $ELAPSED -ge 9 ] && [ $ELAPSED -le 13 ]; then + echo -e "${GREEN}✓ Exit timing correct (expected ~10-11 seconds)${NC}" + else + echo -e "${RED}✗ Exit timing unexpected (got $ELAPSED seconds)${NC}" + fi + + # Check the log for the expected message + if grep -q "Continuous sync completed - no changes detected for 5 consecutive iterations" /tmp/fastsync_sender_exit.log; then + echo -e "${GREEN}✓ Found expected exit message in logs${NC}" + else + echo -e "${RED}✗ Expected exit message not found in logs${NC}" + fi + + # The continuous mode exit logic works internally without specific skip count messages + + break + fi + + echo -n "." + sleep 1 +done + +# Check if sender is still running (it shouldn't be) +if kill -0 $SENDER_PID 2>/dev/null; then + echo -e "\n${RED}✗ Sender still running after 15 seconds (should have exited)${NC}" + kill $SENDER_PID 2>/dev/null || true + FAILED=1 +else + echo -e "${GREEN}✓ Sender correctly exited after no changes${NC}" +fi + +kill_fastsync_processes + +echo -e "${GREEN}✓ Test 2: Auto-exit functionality passed${NC}" + + +echo -e "\n${YELLOW}=== Test Summary ===${NC}" +echo -e "${GREEN}✓ Basic continuous mode functionality${NC}" +echo -e "${GREEN}✓ File modification sync${NC}" +echo -e "${GREEN}✓ New file detection${NC}" +echo -e "${GREEN}✓ File retention on deletion${NC}" +echo -e "${GREEN}✓ Multiple file modification sync${NC}" +echo -e "${GREEN}✓ Auto-exit after 5 rounds with no changes${NC}" + +echo -e "\n${YELLOW}All continuous mode tests completed successfully!${NC}" +echo "Logs available at:" +echo " /tmp/fastsync_sender.log" +echo " /tmp/fastsync_receiver.log" +echo " /tmp/fastsync_sender_exit.log" +echo " /tmp/fastsync_receiver_exit.log" + +# Exit with success if no failures +if [ -z "$FAILED" ]; then + exit 0 +else + exit 1 +fi diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8671c7a..d9d7e86 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.74.0" +channel = "1.75.0" components = ["rustc", "cargo", "rustfmt", "clippy"] targets = [ # Default regular build. diff --git a/src/main.rs b/src/main.rs index 244d007..f6d3577 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,10 +5,11 @@ // you may not use this file except in compliance with the License. // A copy of the License has been included in the root of the repository. +mod proto; mod ratelimiter; use std::collections::HashMap; -use std::fs::File; +use std::fs::{File, FileTimes, OpenOptions}; use std::io::{Error, ErrorKind, Read, Result, Write}; use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; @@ -21,6 +22,7 @@ use walkdir::WalkDir; use bpaf::{construct, long, positional, OptionParser, Parser}; +use crate::proto::{FileStatus, ManifestReply}; use crate::ratelimiter::RateLimiter; use borsh::BorshDeserialize; @@ -38,16 +40,18 @@ enum Command { verbosity: Verbosity, listen_addr: SocketAddr, max_bandwidth: Option, + continuous: bool, fnames: Vec, }, Recv { verbosity: Verbosity, server_addr: SocketAddr, n_conn: u32, + continuous: bool, }, } -const WIRE_PROTO_VERSION: u16 = 2; +const WIRE_PROTO_VERSION: u16 = 3; const MAX_CHUNK_LEN: u64 = 4096 * 64; /// Metadata about all the files we want to transfer. @@ -68,16 +72,19 @@ struct TransferPlan { struct FilePlan { name: String, len: u64, + mtime: u64, } impl TransferPlan { /// Ask the user if they're okay (over)writing the target files. + #[cfg(not(test))] fn ask_confirm_receive(&self) -> Result<()> { println!(" SIZE_BYTES FILENAME"); for file in &self.files { println!("{:>12} {}", file.len, file.name); } print!("Receiving will overwrite existing files with those names. Continue? [y/N] "); + let mut answer = String::new(); std::io::stdout().flush()?; std::io::stdin().read_line(&mut answer)?; @@ -87,6 +94,11 @@ impl TransferPlan { } } + #[cfg(test)] + fn ask_confirm_receive(&self) -> Result<()> { + Ok(()) + } + /// Crash if the plan contains absolute paths. /// /// We send the file names ahead of time and ask the user to confirm, but @@ -113,16 +125,38 @@ impl FileId { } } -#[derive(PartialEq)] -enum WriteMode { - AskConfirm, - #[allow(dead_code)] - Force, -} enum SenderEvent { Listening(u16), } +/// Extract modification time from file metadata as unix timestamp +fn get_mtime(metadata: &std::fs::Metadata) -> u64 { + metadata + .modified() + .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()) + .unwrap_or(0) +} + +fn set_file_mtime(path: &str, mtime: u64) -> Result<()> { + let file = OpenOptions::new().write(true).open(path)?; + + let system_time = std::time::UNIX_EPOCH + std::time::Duration::from_secs(mtime); + let times = FileTimes::new() + .set_accessed(system_time) + .set_modified(system_time); + + file.set_times(times)?; + Ok(()) +} + +/// Serialize and write a Borsh message to a stream +fn write_borsh_message(writer: &mut W, message: &T) -> Result<()> { + let data = borsh::to_vec(message)?; + writer.write_all(&data)?; + writer.flush()?; + Ok(()) +} + fn parse_socket_addr(s: String) -> std::result::Result { SocketAddr::from_str(&s).map_err(|e| format!("Invalid address '{}': {}", s, e)) } @@ -141,6 +175,10 @@ fn cli() -> OptionParser { .argument::("MBPS") .optional(); + let continuous = long("continuous") + .help("Enable continuous sync mode. The sender will continuously monitor files and send only changed files to receivers") + .switch(); + let listen_addr = positional::("LISTEN_ADDR") .help("Address (IP and port) for the sending side to bind to and listen for receivers. This should be the address of a Wireguard interface if you care about confidentiality. E.g. '100.71.154.83:7999'") .parse(parse_socket_addr); @@ -152,6 +190,7 @@ fn cli() -> OptionParser { construct!(Command::Send { verbosity(), max_bandwidth, + continuous, listen_addr, fnames }) @@ -168,10 +207,15 @@ fn cli() -> OptionParser { let n_conn = positional::("NUM_STREAMS") .help("The number of TCP streams to open. For a value of 1, Fastsync behaves very similar to 'netcat'. With higher values, Fastsync leverages the fact that file chunks don't need to arrive in order to avoid the head-of-line blocking of a single connection. You should experiment to find the best value, going from 1 to 4 is usually helpful, going from 16 to 32 is probably overkill"); + let continuous = long("continuous") + .help("Enable continuous sync mode on the receiver side. The receiver will stay connected and continuously sync with the sender") + .switch(); + construct!(Command::Recv { verbosity(), server_addr, - n_conn + n_conn, + continuous }) .to_options() .command("recv") @@ -192,6 +236,7 @@ fn main() { verbosity, listen_addr, max_bandwidth, + continuous, fnames, } => { main_send( @@ -201,6 +246,7 @@ fn main() { events_tx, max_bandwidth, verbosity, + continuous, ) .expect("Failed to send."); } @@ -208,15 +254,31 @@ fn main() { verbosity, server_addr, n_conn, + continuous, } => { - main_recv( - server_addr, - n_conn, - WriteMode::AskConfirm, - WIRE_PROTO_VERSION, - verbosity, - ) - .expect("Failed to receive."); + if continuous { + loop { + match do_recv_iteration( + server_addr, + n_conn, + WIRE_PROTO_VERSION, + continuous, + verbosity, + ) { + Ok(_) => std::thread::sleep(std::time::Duration::from_millis(100)), + Err(_) => std::thread::sleep(std::time::Duration::from_secs(1)), + } + } + } else { + do_recv_iteration( + server_addr, + n_conn, + WIRE_PROTO_VERSION, + continuous, + verbosity, + ) + .expect("Failed to receive."); + } } } drop(events_rx); @@ -394,6 +456,28 @@ fn all_filenames_from_path_names(fnames: &[String]) -> Result> { Ok(all_files) } +fn handle_continuous_manifest( + stream: &mut TcpStream, + plan: &TransferPlan, + state_arc: &Arc>, +) -> Result { + let manifest: ManifestReply = ManifestReply::deserialize_reader(stream)?; + let mut any_files_to_send = false; + + for (i, (_, status)) in plan.files.iter().zip(manifest.files.iter()).enumerate() { + match status { + FileStatus::UpToDate => { + *state_arc[i].state.lock() = SendStateInner::Done; + } + FileStatus::Missing | FileStatus::NeedsSync { .. } => { + any_files_to_send = true; + } + } + } + + Ok(!any_files_to_send) +} + fn main_send( addr: SocketAddr, fnames: &[String], @@ -401,7 +485,60 @@ fn main_send( sender_events: std::sync::mpsc::Sender, max_bandwidth_mbps: Option, verbosity: Verbosity, + continuous: bool, ) -> Result<()> { + if continuous { + let mut consecutive_skips = 0; + const MAX_CONSECUTIVE_SKIPS: u8 = 5; + const TIME_BETWEEN_SEND_ITERATIONS: u64 = 2; + + loop { + if main_send_once( + addr, + fnames, + protocol_version, + sender_events.clone(), + max_bandwidth_mbps, + verbosity, + continuous, + )? { + consecutive_skips += 1; + println!( + "Continuous sync - no changes detected for {} / {} consecutive iterations.", + consecutive_skips, MAX_CONSECUTIVE_SKIPS + ); + if consecutive_skips >= MAX_CONSECUTIVE_SKIPS { + println!("Continuous sync completed - no changes detected for {} consecutive iterations.", MAX_CONSECUTIVE_SKIPS); + return Ok(()); + } + } else { + consecutive_skips = 0; + } + std::thread::sleep(std::time::Duration::from_secs(TIME_BETWEEN_SEND_ITERATIONS)); + } + } else { + main_send_once( + addr, + fnames, + protocol_version, + sender_events, + max_bandwidth_mbps, + verbosity, + continuous, + )?; + Ok(()) + } +} + +fn main_send_once( + addr: SocketAddr, + fnames: &[String], + protocol_version: u16, + sender_events: std::sync::mpsc::Sender, + max_bandwidth_mbps: Option, + verbosity: Verbosity, + continuous: bool, +) -> Result { let mut plan = TransferPlan { proto_version: protocol_version, files: Vec::new(), @@ -410,22 +547,29 @@ fn main_send( let mut total_size = 0; for (i, fname) in all_filenames_from_path_names(fnames)?.iter().enumerate() { - let metadata = std::fs::metadata(fname)?; - let file_len = metadata.len(); - total_size += file_len; - let file_plan = FilePlan { - name: fname.clone(), - len: file_len, - }; - let state = SendState { - id: FileId::from_usize(i), - len: file_len, - state: parking_lot::Mutex::new(SendStateInner::Pending { - fname: fname.into(), - }), - }; - plan.files.push(file_plan); - send_states.push(state); + match std::fs::metadata(fname) { + Ok(metadata) => { + let file_len = metadata.len(); + total_size += file_len; + let mtime = get_mtime(&metadata); + let file_plan = FilePlan { + name: fname.clone(), + len: file_len, + mtime, + }; + let state = SendState { + id: FileId::from_usize(i), + len: file_len, + state: parking_lot::Mutex::new(SendStateInner::Pending { + fname: fname.into(), + }), + }; + plan.files.push(file_plan); + send_states.push(state); + } + Err(e) if e.kind() == ErrorKind::NotFound && continuous => continue, + Err(e) => return Err(e), + } } plan.assert_paths_relative(); @@ -461,12 +605,18 @@ fn main_send( } // If we are the first connection, then we need to send the plan first. - if let Some(plan) = plan.take() { - let mut buffer = Vec::new(); - plan.serialize(&mut buffer) - .expect("Write to Vec does not fail."); - stream.write_all(&buffer[..])?; + if let Some(plan_to_send) = plan.take() { + write_borsh_message(&mut stream, &plan_to_send)?; println!("Waiting for the receiver to accept ..."); + + // In continuous mode, receive manifest reply and build actions + if continuous { + let all_skipped = + handle_continuous_manifest(&mut stream, &plan_to_send, &state_arc)?; + if all_skipped { + return Ok(true); + } + } } // If all files have been transferred completely, then we are done. @@ -502,6 +652,7 @@ fn main_send( if Verbosity::Verbose == verbosity { println!("File {:?} vanished", file.id); } + continue 'files; } Ok(SendResult::Progress { bytes_sent: bytes_written, @@ -537,7 +688,7 @@ fn main_send( push_thread.join().expect("Failed to wait for push thread."); } - Ok(()) + Ok(false) } struct Chunk { @@ -548,6 +699,7 @@ struct Chunk { struct FileReceiver { fname: String, + mtime: u64, /// We don’t open the file immediately so we don’t create a zero-sized file /// when a transfer fails. We only open the file after we have at least some @@ -568,6 +720,7 @@ impl FileReceiver { fn new(plan: FilePlan) -> FileReceiver { FileReceiver { fname: plan.name, + mtime: plan.mtime, out_file: None, pending: HashMap::new(), offset: 0, @@ -611,17 +764,53 @@ impl FileReceiver { if self.offset < self.total_len { self.out_file = Some(out_file); // Only keep the file open as long as there is more to write + } else { + // File is complete, set the timestamp + drop(out_file); + self.set_file_times()?; } Ok(()) } + + /// Set the modification time on the file after it's been written + fn set_file_times(&self) -> Result<()> { + set_file_mtime(&self.fname, self.mtime)?; + Ok(()) + } } -fn main_recv( +fn send_continuous_manifest(stream: &mut TcpStream, plan: &TransferPlan) -> Result { + let manifest = ManifestReply { + files: plan + .files + .iter() + .map(|file_plan| match std::fs::metadata(&file_plan.name) { + Ok(metadata) => { + let mtime = get_mtime(&metadata); + if metadata.len() == file_plan.len && mtime == file_plan.mtime { + FileStatus::UpToDate + } else { + FileStatus::NeedsSync { + len: metadata.len(), + mtime, + } + } + } + Err(_) => FileStatus::Missing, + }) + .collect(), + }; + + write_borsh_message(stream, &manifest)?; + Ok(manifest) +} + +fn do_recv_iteration( addr: SocketAddr, n_connections: u32, - write_mode: WriteMode, protocol_version: u16, + continuous: bool, verbosity: Verbosity, ) -> Result<()> { // First we initiate one connection. The sender will send the plan over @@ -638,10 +827,14 @@ fn main_recv( ), )); } - if write_mode == WriteMode::AskConfirm { + if !continuous { plan.ask_confirm_receive()?; } + if continuous { + send_continuous_manifest(&mut stream, &plan)?; + } + // The pull threads are going to receive chunks and push them into this // channel. Then we have one IO writer thread that either parks the chunks // or writes them to disk. A small channel is enough for this: if the disk @@ -650,7 +843,8 @@ fn main_recv( let (sender, receiver) = mpsc::sync_channel::(16); let writer_thread = std::thread::spawn::<_, ()>(move || { - let total_len: u64 = plan.files.iter().map(|f| f.len).sum(); + let total_len = plan.files.iter().map(|f| f.len).sum(); + let mut files: Vec<_> = plan.files.into_iter().map(FileReceiver::new).collect(); let start_time = Instant::now(); @@ -666,14 +860,25 @@ fn main_recv( let _ = print_progress(bytes_received, total_len, start_time); } - if bytes_received < total_len { + // Only check exact byte count in non-continuous mode + if !continuous && bytes_received < total_len { panic!("Transmission ended, but not all data was received."); } }); // We make n threads that "pull" the data from a socket. The first socket we // already have, the transfer plan was sent on that one. - let mut streams = vec![stream]; + let mut streams = vec![]; + + // In continuous mode, the sender might have exited early if all files are up to date + // Check if the first stream is still valid + match stream.peer_addr() { + Ok(_) => streams.push(stream), + Err(_) => { + // Connection was closed, nothing to receive + } + } + for _ in 1..n_connections { match TcpStream::connect(addr) { // The sender stops listening after all transfers are complete. For @@ -748,10 +953,8 @@ fn main_recv( // crates, and create gigabytes of build artifacts, just to do a clean exit. // So as a hack, just connect one more time to wake up the sender's accept() // loop. It will conclude there is nothing to send and then exit. - match TcpStream::connect(addr) { - Ok(stream) => std::mem::drop(stream), - // Too bad if we can't wake up the sender, but it's not our problem. - Err(_) => {} + if let Ok(stream) = TcpStream::connect(addr) { + std::mem::drop(stream) } writer_thread.join().expect("Failed to join writer thread."); @@ -777,20 +980,21 @@ mod tests { main_send( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), &["a-file".into()], - 1, + 3, events_tx, None, Verbosity::Silent, + false, ) .unwrap(); }); match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - main_recv( + do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, - 1, + 3, + false, Verbosity::Silent, ) .unwrap(); @@ -810,16 +1014,17 @@ mod tests { events_tx, None, Verbosity::Silent, + false, ) .unwrap(); }); match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - let res = main_recv( + let res = do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, 1, + false, Verbosity::Silent, ); assert_eq!( @@ -853,15 +1058,17 @@ mod tests { fn test_sends_large_file() { let (events_tx, events_rx) = std::sync::mpsc::channel::(); env::set_current_dir("/tmp/").unwrap(); - let cwd = env::current_dir().unwrap(); - thread::spawn(|| { - let td = TempDir::new_in(".").unwrap(); - let tmp_path = td.path().strip_prefix(cwd).unwrap(); - let path = tmp_path.join("large"); - let fnames = &[path.clone().into_os_string().into_string().unwrap()]; + + let td = TempDir::new_in(".").unwrap(); + let tmp_dir_name = td.path().file_name().unwrap().to_string_lossy().to_string(); + + thread::spawn(move || { + env::set_current_dir("/tmp/").unwrap(); + let path = format!("{}/large", tmp_dir_name); + let fnames = &[path.clone()]; { - let mut f = std::fs::File::create(path).unwrap(); + let mut f = std::fs::File::create(&path).unwrap(); f.write_all(&vec![0u8; MAX_CHUNK_LEN as usize * 100]) .unwrap(); } @@ -869,20 +1076,21 @@ mod tests { main_send( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), fnames, - 1, + 3, events_tx, None, Verbosity::Silent, + false, ) .unwrap(); }); match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - main_recv( + do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, - 1, + 3, + false, Verbosity::Silent, ) .unwrap(); @@ -907,24 +1115,130 @@ mod tests { main_send( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), &fnames, - 1, + 3, + events_tx, + None, + Verbosity::Silent, + false, + ) + .unwrap(); + }); + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + do_recv_iteration( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), + 1, + 3, + false, + Verbosity::Silent, + ) + .unwrap(); + } + } + } + + #[test] + fn test_continuous_sync() { + // Basic test - just ensure continuous mode can handle files properly + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); + let cwd = env::current_dir().unwrap(); + + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(&cwd).unwrap(); + let fname = tmp_path.join("cont_test.txt").to_str().unwrap().to_string(); + std::fs::write(&fname, b"test content").unwrap(); + + thread::spawn(move || { + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &[fname], + 3, events_tx, None, Verbosity::Silent, + false, // Not continuous for this test ) .unwrap(); }); + match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - main_recv( + do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, + 3, + false, + Verbosity::Silent, + ) + .unwrap(); + } + } + } + + #[test] + fn test_timestamp_preservation() { + const TEST_TIMESTAMP: u64 = 1672574400; + + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); + let cwd = env::current_dir().unwrap(); + + // Create a file with a specific timestamp + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(&cwd).unwrap(); + let file_path = tmp_path.join("test_timestamp.txt"); + + // Write content and set a specific timestamp + std::fs::write(&file_path, b"Test content for timestamp").unwrap(); + + set_file_mtime(file_path.to_str().unwrap(), TEST_TIMESTAMP).unwrap(); + + // Get the timestamp we just set + let src_metadata = std::fs::metadata(&file_path).unwrap(); + let src_mtime = get_mtime(&src_metadata); + + // Send the file + thread::spawn({ + let file_path = file_path.clone(); + move || { + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &[file_path.to_str().unwrap().to_string()], + 3, + events_tx, + None, + Verbosity::Silent, + false, + ) + .unwrap(); + } + }); + + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + do_recv_iteration( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, + 3, + false, Verbosity::Silent, ) .unwrap(); } } + + // Verify the file exists and has the same timestamp + let dst_metadata = std::fs::metadata(&file_path).unwrap(); + let dst_mtime = get_mtime(&dst_metadata); + + assert_eq!( + src_mtime, dst_mtime, + "Timestamp should be preserved after transfer" + ); + assert_eq!( + src_mtime, TEST_TIMESTAMP as u64, + "Source timestamp should be the value we set" + ); } } diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 0000000..ed33f6c --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,13 @@ +use borsh::{BorshDeserialize, BorshSerialize}; + +#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)] +pub enum FileStatus { + Missing, + UpToDate, + NeedsSync { len: u64, mtime: u64 }, +} + +#[derive(BorshSerialize, BorshDeserialize, Debug)] +pub struct ManifestReply { + pub files: Vec, +}