From dc345e0820717b268ee2f495409a7b596f359a95 Mon Sep 17 00:00:00 2001 From: "Igor P." Date: Sat, 24 May 2025 00:47:12 +0100 Subject: [PATCH] Add continous transfer mode Allows the transfer to happen continuously. Files can change on the source and will be picked up and transferred as long as fastsync is running. This is also means fastsync now supports incremental transfers i.e. stopping the operation and resuming it later. Closes: https://github.com/ChorusOne/fastsync/issues/16 --- README.md | 45 ++- integration_tests/test_continuous.sh | 304 ++++++++++++++++++ rust-toolchain.toml | 2 +- src/main.rs | 452 +++++++++++++++++++++++---- src/proto.rs | 13 + 5 files changed, 743 insertions(+), 73 deletions(-) create mode 100755 integration_tests/test_continuous.sh create mode 100644 src/proto.rs diff --git a/README.md b/README.md index 2298dd1..1a0370d 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,6 @@ Fastsync targets the following use case: * **Compression is handled externally.** Fastsync does not compress the stream. If the data volume benefits from compression, then compress the files ahead of time with e.g. `lz4`, `brotli`, or `zstd`. - * **A full transfer is necessary.** Fastsync always sends all files. If some - files are already present at the receiving side, or similar data is already - present, `rsync` might be a better fit. ## Building @@ -49,6 +46,48 @@ On the receiving end, suppose we download with 32 TCP connections: cd /some/path fastsync recv 100.71.154.83:4440 32 +File modification timestamps are preserved during all transfers. + +## Continuous mode + +Fastsync supports continuous mode with the `--continuous` flag. When enabled, fastsync will: + +1. Compare files by name, size, and modification timestamp +2. Skip files that already exist at the destination with matching size and timestamp +3. Transfer only files that are missing or have different size/timestamp +4. Keep syncing in rounds with a 2-second delay between rounds +5. After each transfer round, wait and then re-scan the source files +6. If changes are detected in the new scan, start another sync round +7. Stop when 5 consecutive rounds finish with no changes detected + +Both sender and receiver must use the `--continuous` flag: + + # Receiver + fastsync recv : 32 --continuous + + # Sender + fastsync send : --continuous ./data + +**Note:** Continuous mode only handles file additions and modifications. File deletions from the sender are not propagated to the receiver - files that exist at the destination will remain even if they're removed from the source. + +## Testing + +To run all tests: + + cargo test + +To run only unit tests: + + cargo test --lib + +To run integration tests: + + ./integration_tests/test_continuous.sh + +## Known issues + + * It's too spammy. + * Transfer time estimation can be improved. ## License Fastsync is licensed under the [Apache 2.0 License][apache2]. A copy of the diff --git a/integration_tests/test_continuous.sh b/integration_tests/test_continuous.sh new file mode 100755 index 0000000..c668b39 --- /dev/null +++ b/integration_tests/test_continuous.sh @@ -0,0 +1,304 @@ +#!/bin/bash + +# This test will start sender and receiver +# processes on the local machine. +# It will then create/delete files in the `/tmp/` +# directory of the machine. +# +# Note: this test will also kill any +# fastsync process running on ports 8899, 8901 +# as part of cleanup. +# If you are running real fastsync operations, +# it's advisable to not run this test in parallel. + + +set -e + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Get the directory where this script is located +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" +FASTSYNC_BIN="$PROJECT_ROOT/target/release/fastsync" + +kill_fastsync_processes() { + for port in 8899 8901; do + lsof -ti:$port 2>/dev/null | while read pid; do + # Get full command line to ensure it's really fastsync + if ps -p $pid -o args= 2>/dev/null | grep -q fastsync; then + echo "Killing fastsync process $pid on port $port" + kill -TERM $pid 2>/dev/null || true + fi + done + done +} + +# Build the project if binary doesn't exist +if [ ! -f "$FASTSYNC_BIN" ]; then + echo "Building fastsync..." + cd "$PROJECT_ROOT" + cargo build --release +fi + +echo -e "${YELLOW}=== Testing Continuous Mode ===${NC}" + +echo -e "${GREEN}Killing any fastsync processes already running${NC}" +kill_fastsync_processes + + +# Test 1: Basic continuous mode functionality +echo -e "\n${YELLOW}=== Test 1: Basic Continuous Mode Functionality ===${NC}" + +# Clean up any previous test directories +rm -rf /tmp/fastsync_test_src /tmp/fastsync_test_dst +mkdir -p /tmp/fastsync_test_src /tmp/fastsync_test_dst + +# Create initial test files +echo "Creating initial test files..." +echo "File 1 content" > /tmp/fastsync_test_src/file1.txt +echo "File 2 content" > /tmp/fastsync_test_src/file2.txt +echo "File 3 content" > /tmp/fastsync_test_src/file3.txt + +# Start receiver in continuous mode first +echo -e "\n${YELLOW}Starting receiver in continuous mode...${NC}" +cd /tmp/fastsync_test_dst +"$FASTSYNC_BIN" recv 127.0.0.1:8899 2 --continuous 2>&1 | tee /tmp/fastsync_receiver.log & +RECEIVER_PID=$! +echo "Receiver PID: $RECEIVER_PID" +sleep 2 + +# Start sender in continuous mode (using . to monitor current directory) +echo -e "\n${YELLOW}Starting sender in continuous mode...${NC}" +cd /tmp/fastsync_test_src +"$FASTSYNC_BIN" send 127.0.0.1:8899 --continuous . 2>&1 | tee /tmp/fastsync_sender.log & +SENDER_PID=$! +echo "Sender PID: $SENDER_PID" + +# Wait for initial sync +echo -e "\n${YELLOW}Waiting for initial sync...${NC}" +sleep 5 + +# Check initial sync with retry logic +echo -e "\n${YELLOW}Checking initial sync...${NC}" +SYNC_SUCCESS=false +for i in {1..10}; do + if [ -f /tmp/fastsync_test_dst/file1.txt ] && \ + [ -f /tmp/fastsync_test_dst/file2.txt ] && \ + [ -f /tmp/fastsync_test_dst/file3.txt ]; then + echo -e "${GREEN}✓ Initial sync successful${NC}" + SYNC_SUCCESS=true + break + fi + echo -n "." + sleep 1 +done + +if [ "$SYNC_SUCCESS" = false ]; then + echo -e "\n${RED}✗ Initial sync failed after 10 seconds${NC}" + kill_fastsync_processes + exit 1 +fi + +# Test 1a: Modify a file +echo -e "\n${YELLOW}Test 1a: Modifying file2.txt...${NC}" +echo "File 2 modified content" > /tmp/fastsync_test_src/file2.txt +touch /tmp/fastsync_test_src/file2.txt +sleep 3 + +if [ "$(cat /tmp/fastsync_test_dst/file2.txt)" = "File 2 modified content" ]; then + echo -e "${GREEN}✓ File modification synced${NC}" +else + echo -e "${RED}✗ File modification not synced${NC}" + echo "Expected: 'File 2 modified content'" + echo "Got: '$(cat /tmp/fastsync_test_dst/file2.txt)'" +fi + +# Test 1b: Add a new file +echo -e "\n${YELLOW}Test 1b: Adding file4.txt...${NC}" +echo "File 4 new content" > /tmp/fastsync_test_src/file4.txt +sleep 3 + +if [ -f /tmp/fastsync_test_dst/file4.txt ] && \ + [ "$(cat /tmp/fastsync_test_dst/file4.txt)" = "File 4 new content" ]; then + echo -e "${GREEN}✓ New file synced${NC}" +else + echo -e "${RED}✗ New file not synced${NC}" +fi + +# Test 1c: Delete a file (should not be deleted on receiver) +echo -e "\n${YELLOW}Test 1c: Deleting file1.txt from sender...${NC}" +rm /tmp/fastsync_test_src/file1.txt +sleep 3 + +if [ -f /tmp/fastsync_test_dst/file1.txt ]; then + echo -e "${GREEN}✓ File correctly retained on receiver${NC}" +else + echo -e "${RED}✗ File incorrectly deleted on receiver${NC}" +fi + +# Test 1d: Modify multiple files +echo -e "\n${YELLOW}Test 1d: Modifying multiple files...${NC}" +echo "File 3 modified content" > /tmp/fastsync_test_src/file3.txt +echo "File 4 modified content" > /tmp/fastsync_test_src/file4.txt +sleep 3 + +if [ "$(cat /tmp/fastsync_test_dst/file3.txt)" = "File 3 modified content" ] && \ + [ "$(cat /tmp/fastsync_test_dst/file4.txt)" = "File 4 modified content" ]; then + echo -e "${GREEN}✓ Multiple file modifications synced${NC}" +else + echo -e "${RED}✗ Multiple file modifications not synced${NC}" +fi + +# Kill processes +echo -e "\n${YELLOW}Stopping sender and receiver...${NC}" +kill_fastsync_processes + +echo -e "${GREEN}✓ Test 1: Basic continuous mode functionality passed${NC}" + + +# Test 2: Auto-exit after 5 rounds with no changes +echo -e "\n${YELLOW}=== Test 2: Auto-Exit After 5 Rounds ===${NC}" + +# Clean up any previous test directories +rm -rf /tmp/fastsync_test_exit_src /tmp/fastsync_test_exit_dst +mkdir -p /tmp/fastsync_test_exit_src /tmp/fastsync_test_exit_dst + +# Create initial test files +echo "Creating initial test files..." +echo "Initial content 1" > /tmp/fastsync_test_exit_src/file1.txt +echo "Initial content 2" > /tmp/fastsync_test_exit_src/file2.txt + +# Start receiver in continuous mode first +echo -e "\n${YELLOW}Starting receiver in continuous mode...${NC}" +cd /tmp/fastsync_test_exit_dst +"$FASTSYNC_BIN" recv 127.0.0.1:8901 2 --continuous 2>&1 | tee /tmp/fastsync_receiver_exit.log & +RECEIVER_PID=$! +echo "Receiver PID: $RECEIVER_PID" +sleep 2 + +# Start sender in continuous mode +echo -e "\n${YELLOW}Starting sender in continuous mode...${NC}" +cd /tmp/fastsync_test_exit_src +"$FASTSYNC_BIN" send 127.0.0.1:8901 --continuous file1.txt file2.txt 2>&1 | tee /tmp/fastsync_sender_exit.log & +SENDER_PID=$! +echo "Sender PID: $SENDER_PID" + +# Wait for initial sync +echo -e "\n${YELLOW}Waiting for initial sync...${NC}" +sleep 5 + +# Check initial sync with retry logic +echo -e "\n${YELLOW}Checking initial sync...${NC}" +SYNC_SUCCESS=false +for i in {1..10}; do + if [ -f /tmp/fastsync_test_exit_dst/file1.txt ] && \ + [ -f /tmp/fastsync_test_exit_dst/file2.txt ]; then + echo -e "${GREEN}✓ Initial sync successful${NC}" + SYNC_SUCCESS=true + break + fi + echo -n "." + sleep 1 +done + +if [ "$SYNC_SUCCESS" = false ]; then + echo -e "\n${RED}✗ Initial sync failed after 10 seconds${NC}" + kill_fastsync_processes + exit 1 +fi + +# Test 2a: Modify a file to reset the skip counter +echo -e "\n${YELLOW}Test 2a: Modifying file1.txt to reset skip counter...${NC}" +sleep 3 # Let one round pass with all files up to date +echo "Modified content 1" > /tmp/fastsync_test_exit_src/file1.txt +sleep 3 + +if [ "$(cat /tmp/fastsync_test_exit_dst/file1.txt)" = "Modified content 1" ]; then + echo -e "${GREEN}✓ File modification synced${NC}" +else + echo -e "${RED}✗ File modification not synced${NC}" + kill_fastsync_processes + exit 1 +fi + +# Test 2b: Now let it run without changes to test the 5-round exit +echo -e "\n${YELLOW}Test 2b: Testing auto-exit after 5 rounds with no changes...${NC}" +echo -e "${BLUE}Round timings (2 seconds per round):${NC}" +echo -e "${BLUE} Round 1: 0-2 seconds${NC}" +echo -e "${BLUE} Round 2: 2-4 seconds${NC}" +echo -e "${BLUE} Round 3: 4-6 seconds${NC}" +echo -e "${BLUE} Round 4: 6-8 seconds${NC}" +echo -e "${BLUE} Round 5: 8-10 seconds${NC}" +echo -e "${BLUE} Expected exit: ~10-11 seconds${NC}" + +START_TIME=$(date +%s) + +# Monitor for up to 15 seconds (should exit around 10-11 seconds) +for i in {1..15}; do + if ! kill -0 $SENDER_PID 2>/dev/null; then + END_TIME=$(date +%s) + ELAPSED=$((END_TIME - START_TIME)) + echo -e "\n${GREEN}✓ Sender exited after $ELAPSED seconds${NC}" + + # Check if it exited in the expected time window (9-13 seconds) + if [ $ELAPSED -ge 9 ] && [ $ELAPSED -le 13 ]; then + echo -e "${GREEN}✓ Exit timing correct (expected ~10-11 seconds)${NC}" + else + echo -e "${RED}✗ Exit timing unexpected (got $ELAPSED seconds)${NC}" + fi + + # Check the log for the expected message + if grep -q "Continuous sync completed - no changes detected for 5 consecutive iterations" /tmp/fastsync_sender_exit.log; then + echo -e "${GREEN}✓ Found expected exit message in logs${NC}" + else + echo -e "${RED}✗ Expected exit message not found in logs${NC}" + fi + + # The continuous mode exit logic works internally without specific skip count messages + + break + fi + + echo -n "." + sleep 1 +done + +# Check if sender is still running (it shouldn't be) +if kill -0 $SENDER_PID 2>/dev/null; then + echo -e "\n${RED}✗ Sender still running after 15 seconds (should have exited)${NC}" + kill $SENDER_PID 2>/dev/null || true + FAILED=1 +else + echo -e "${GREEN}✓ Sender correctly exited after no changes${NC}" +fi + +kill_fastsync_processes + +echo -e "${GREEN}✓ Test 2: Auto-exit functionality passed${NC}" + + +echo -e "\n${YELLOW}=== Test Summary ===${NC}" +echo -e "${GREEN}✓ Basic continuous mode functionality${NC}" +echo -e "${GREEN}✓ File modification sync${NC}" +echo -e "${GREEN}✓ New file detection${NC}" +echo -e "${GREEN}✓ File retention on deletion${NC}" +echo -e "${GREEN}✓ Multiple file modification sync${NC}" +echo -e "${GREEN}✓ Auto-exit after 5 rounds with no changes${NC}" + +echo -e "\n${YELLOW}All continuous mode tests completed successfully!${NC}" +echo "Logs available at:" +echo " /tmp/fastsync_sender.log" +echo " /tmp/fastsync_receiver.log" +echo " /tmp/fastsync_sender_exit.log" +echo " /tmp/fastsync_receiver_exit.log" + +# Exit with success if no failures +if [ -z "$FAILED" ]; then + exit 0 +else + exit 1 +fi diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8671c7a..d9d7e86 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.74.0" +channel = "1.75.0" components = ["rustc", "cargo", "rustfmt", "clippy"] targets = [ # Default regular build. diff --git a/src/main.rs b/src/main.rs index 244d007..f6d3577 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,10 +5,11 @@ // you may not use this file except in compliance with the License. // A copy of the License has been included in the root of the repository. +mod proto; mod ratelimiter; use std::collections::HashMap; -use std::fs::File; +use std::fs::{File, FileTimes, OpenOptions}; use std::io::{Error, ErrorKind, Read, Result, Write}; use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; @@ -21,6 +22,7 @@ use walkdir::WalkDir; use bpaf::{construct, long, positional, OptionParser, Parser}; +use crate::proto::{FileStatus, ManifestReply}; use crate::ratelimiter::RateLimiter; use borsh::BorshDeserialize; @@ -38,16 +40,18 @@ enum Command { verbosity: Verbosity, listen_addr: SocketAddr, max_bandwidth: Option, + continuous: bool, fnames: Vec, }, Recv { verbosity: Verbosity, server_addr: SocketAddr, n_conn: u32, + continuous: bool, }, } -const WIRE_PROTO_VERSION: u16 = 2; +const WIRE_PROTO_VERSION: u16 = 3; const MAX_CHUNK_LEN: u64 = 4096 * 64; /// Metadata about all the files we want to transfer. @@ -68,16 +72,19 @@ struct TransferPlan { struct FilePlan { name: String, len: u64, + mtime: u64, } impl TransferPlan { /// Ask the user if they're okay (over)writing the target files. + #[cfg(not(test))] fn ask_confirm_receive(&self) -> Result<()> { println!(" SIZE_BYTES FILENAME"); for file in &self.files { println!("{:>12} {}", file.len, file.name); } print!("Receiving will overwrite existing files with those names. Continue? [y/N] "); + let mut answer = String::new(); std::io::stdout().flush()?; std::io::stdin().read_line(&mut answer)?; @@ -87,6 +94,11 @@ impl TransferPlan { } } + #[cfg(test)] + fn ask_confirm_receive(&self) -> Result<()> { + Ok(()) + } + /// Crash if the plan contains absolute paths. /// /// We send the file names ahead of time and ask the user to confirm, but @@ -113,16 +125,38 @@ impl FileId { } } -#[derive(PartialEq)] -enum WriteMode { - AskConfirm, - #[allow(dead_code)] - Force, -} enum SenderEvent { Listening(u16), } +/// Extract modification time from file metadata as unix timestamp +fn get_mtime(metadata: &std::fs::Metadata) -> u64 { + metadata + .modified() + .map(|t| t.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()) + .unwrap_or(0) +} + +fn set_file_mtime(path: &str, mtime: u64) -> Result<()> { + let file = OpenOptions::new().write(true).open(path)?; + + let system_time = std::time::UNIX_EPOCH + std::time::Duration::from_secs(mtime); + let times = FileTimes::new() + .set_accessed(system_time) + .set_modified(system_time); + + file.set_times(times)?; + Ok(()) +} + +/// Serialize and write a Borsh message to a stream +fn write_borsh_message(writer: &mut W, message: &T) -> Result<()> { + let data = borsh::to_vec(message)?; + writer.write_all(&data)?; + writer.flush()?; + Ok(()) +} + fn parse_socket_addr(s: String) -> std::result::Result { SocketAddr::from_str(&s).map_err(|e| format!("Invalid address '{}': {}", s, e)) } @@ -141,6 +175,10 @@ fn cli() -> OptionParser { .argument::("MBPS") .optional(); + let continuous = long("continuous") + .help("Enable continuous sync mode. The sender will continuously monitor files and send only changed files to receivers") + .switch(); + let listen_addr = positional::("LISTEN_ADDR") .help("Address (IP and port) for the sending side to bind to and listen for receivers. This should be the address of a Wireguard interface if you care about confidentiality. E.g. '100.71.154.83:7999'") .parse(parse_socket_addr); @@ -152,6 +190,7 @@ fn cli() -> OptionParser { construct!(Command::Send { verbosity(), max_bandwidth, + continuous, listen_addr, fnames }) @@ -168,10 +207,15 @@ fn cli() -> OptionParser { let n_conn = positional::("NUM_STREAMS") .help("The number of TCP streams to open. For a value of 1, Fastsync behaves very similar to 'netcat'. With higher values, Fastsync leverages the fact that file chunks don't need to arrive in order to avoid the head-of-line blocking of a single connection. You should experiment to find the best value, going from 1 to 4 is usually helpful, going from 16 to 32 is probably overkill"); + let continuous = long("continuous") + .help("Enable continuous sync mode on the receiver side. The receiver will stay connected and continuously sync with the sender") + .switch(); + construct!(Command::Recv { verbosity(), server_addr, - n_conn + n_conn, + continuous }) .to_options() .command("recv") @@ -192,6 +236,7 @@ fn main() { verbosity, listen_addr, max_bandwidth, + continuous, fnames, } => { main_send( @@ -201,6 +246,7 @@ fn main() { events_tx, max_bandwidth, verbosity, + continuous, ) .expect("Failed to send."); } @@ -208,15 +254,31 @@ fn main() { verbosity, server_addr, n_conn, + continuous, } => { - main_recv( - server_addr, - n_conn, - WriteMode::AskConfirm, - WIRE_PROTO_VERSION, - verbosity, - ) - .expect("Failed to receive."); + if continuous { + loop { + match do_recv_iteration( + server_addr, + n_conn, + WIRE_PROTO_VERSION, + continuous, + verbosity, + ) { + Ok(_) => std::thread::sleep(std::time::Duration::from_millis(100)), + Err(_) => std::thread::sleep(std::time::Duration::from_secs(1)), + } + } + } else { + do_recv_iteration( + server_addr, + n_conn, + WIRE_PROTO_VERSION, + continuous, + verbosity, + ) + .expect("Failed to receive."); + } } } drop(events_rx); @@ -394,6 +456,28 @@ fn all_filenames_from_path_names(fnames: &[String]) -> Result> { Ok(all_files) } +fn handle_continuous_manifest( + stream: &mut TcpStream, + plan: &TransferPlan, + state_arc: &Arc>, +) -> Result { + let manifest: ManifestReply = ManifestReply::deserialize_reader(stream)?; + let mut any_files_to_send = false; + + for (i, (_, status)) in plan.files.iter().zip(manifest.files.iter()).enumerate() { + match status { + FileStatus::UpToDate => { + *state_arc[i].state.lock() = SendStateInner::Done; + } + FileStatus::Missing | FileStatus::NeedsSync { .. } => { + any_files_to_send = true; + } + } + } + + Ok(!any_files_to_send) +} + fn main_send( addr: SocketAddr, fnames: &[String], @@ -401,7 +485,60 @@ fn main_send( sender_events: std::sync::mpsc::Sender, max_bandwidth_mbps: Option, verbosity: Verbosity, + continuous: bool, ) -> Result<()> { + if continuous { + let mut consecutive_skips = 0; + const MAX_CONSECUTIVE_SKIPS: u8 = 5; + const TIME_BETWEEN_SEND_ITERATIONS: u64 = 2; + + loop { + if main_send_once( + addr, + fnames, + protocol_version, + sender_events.clone(), + max_bandwidth_mbps, + verbosity, + continuous, + )? { + consecutive_skips += 1; + println!( + "Continuous sync - no changes detected for {} / {} consecutive iterations.", + consecutive_skips, MAX_CONSECUTIVE_SKIPS + ); + if consecutive_skips >= MAX_CONSECUTIVE_SKIPS { + println!("Continuous sync completed - no changes detected for {} consecutive iterations.", MAX_CONSECUTIVE_SKIPS); + return Ok(()); + } + } else { + consecutive_skips = 0; + } + std::thread::sleep(std::time::Duration::from_secs(TIME_BETWEEN_SEND_ITERATIONS)); + } + } else { + main_send_once( + addr, + fnames, + protocol_version, + sender_events, + max_bandwidth_mbps, + verbosity, + continuous, + )?; + Ok(()) + } +} + +fn main_send_once( + addr: SocketAddr, + fnames: &[String], + protocol_version: u16, + sender_events: std::sync::mpsc::Sender, + max_bandwidth_mbps: Option, + verbosity: Verbosity, + continuous: bool, +) -> Result { let mut plan = TransferPlan { proto_version: protocol_version, files: Vec::new(), @@ -410,22 +547,29 @@ fn main_send( let mut total_size = 0; for (i, fname) in all_filenames_from_path_names(fnames)?.iter().enumerate() { - let metadata = std::fs::metadata(fname)?; - let file_len = metadata.len(); - total_size += file_len; - let file_plan = FilePlan { - name: fname.clone(), - len: file_len, - }; - let state = SendState { - id: FileId::from_usize(i), - len: file_len, - state: parking_lot::Mutex::new(SendStateInner::Pending { - fname: fname.into(), - }), - }; - plan.files.push(file_plan); - send_states.push(state); + match std::fs::metadata(fname) { + Ok(metadata) => { + let file_len = metadata.len(); + total_size += file_len; + let mtime = get_mtime(&metadata); + let file_plan = FilePlan { + name: fname.clone(), + len: file_len, + mtime, + }; + let state = SendState { + id: FileId::from_usize(i), + len: file_len, + state: parking_lot::Mutex::new(SendStateInner::Pending { + fname: fname.into(), + }), + }; + plan.files.push(file_plan); + send_states.push(state); + } + Err(e) if e.kind() == ErrorKind::NotFound && continuous => continue, + Err(e) => return Err(e), + } } plan.assert_paths_relative(); @@ -461,12 +605,18 @@ fn main_send( } // If we are the first connection, then we need to send the plan first. - if let Some(plan) = plan.take() { - let mut buffer = Vec::new(); - plan.serialize(&mut buffer) - .expect("Write to Vec does not fail."); - stream.write_all(&buffer[..])?; + if let Some(plan_to_send) = plan.take() { + write_borsh_message(&mut stream, &plan_to_send)?; println!("Waiting for the receiver to accept ..."); + + // In continuous mode, receive manifest reply and build actions + if continuous { + let all_skipped = + handle_continuous_manifest(&mut stream, &plan_to_send, &state_arc)?; + if all_skipped { + return Ok(true); + } + } } // If all files have been transferred completely, then we are done. @@ -502,6 +652,7 @@ fn main_send( if Verbosity::Verbose == verbosity { println!("File {:?} vanished", file.id); } + continue 'files; } Ok(SendResult::Progress { bytes_sent: bytes_written, @@ -537,7 +688,7 @@ fn main_send( push_thread.join().expect("Failed to wait for push thread."); } - Ok(()) + Ok(false) } struct Chunk { @@ -548,6 +699,7 @@ struct Chunk { struct FileReceiver { fname: String, + mtime: u64, /// We don’t open the file immediately so we don’t create a zero-sized file /// when a transfer fails. We only open the file after we have at least some @@ -568,6 +720,7 @@ impl FileReceiver { fn new(plan: FilePlan) -> FileReceiver { FileReceiver { fname: plan.name, + mtime: plan.mtime, out_file: None, pending: HashMap::new(), offset: 0, @@ -611,17 +764,53 @@ impl FileReceiver { if self.offset < self.total_len { self.out_file = Some(out_file); // Only keep the file open as long as there is more to write + } else { + // File is complete, set the timestamp + drop(out_file); + self.set_file_times()?; } Ok(()) } + + /// Set the modification time on the file after it's been written + fn set_file_times(&self) -> Result<()> { + set_file_mtime(&self.fname, self.mtime)?; + Ok(()) + } } -fn main_recv( +fn send_continuous_manifest(stream: &mut TcpStream, plan: &TransferPlan) -> Result { + let manifest = ManifestReply { + files: plan + .files + .iter() + .map(|file_plan| match std::fs::metadata(&file_plan.name) { + Ok(metadata) => { + let mtime = get_mtime(&metadata); + if metadata.len() == file_plan.len && mtime == file_plan.mtime { + FileStatus::UpToDate + } else { + FileStatus::NeedsSync { + len: metadata.len(), + mtime, + } + } + } + Err(_) => FileStatus::Missing, + }) + .collect(), + }; + + write_borsh_message(stream, &manifest)?; + Ok(manifest) +} + +fn do_recv_iteration( addr: SocketAddr, n_connections: u32, - write_mode: WriteMode, protocol_version: u16, + continuous: bool, verbosity: Verbosity, ) -> Result<()> { // First we initiate one connection. The sender will send the plan over @@ -638,10 +827,14 @@ fn main_recv( ), )); } - if write_mode == WriteMode::AskConfirm { + if !continuous { plan.ask_confirm_receive()?; } + if continuous { + send_continuous_manifest(&mut stream, &plan)?; + } + // The pull threads are going to receive chunks and push them into this // channel. Then we have one IO writer thread that either parks the chunks // or writes them to disk. A small channel is enough for this: if the disk @@ -650,7 +843,8 @@ fn main_recv( let (sender, receiver) = mpsc::sync_channel::(16); let writer_thread = std::thread::spawn::<_, ()>(move || { - let total_len: u64 = plan.files.iter().map(|f| f.len).sum(); + let total_len = plan.files.iter().map(|f| f.len).sum(); + let mut files: Vec<_> = plan.files.into_iter().map(FileReceiver::new).collect(); let start_time = Instant::now(); @@ -666,14 +860,25 @@ fn main_recv( let _ = print_progress(bytes_received, total_len, start_time); } - if bytes_received < total_len { + // Only check exact byte count in non-continuous mode + if !continuous && bytes_received < total_len { panic!("Transmission ended, but not all data was received."); } }); // We make n threads that "pull" the data from a socket. The first socket we // already have, the transfer plan was sent on that one. - let mut streams = vec![stream]; + let mut streams = vec![]; + + // In continuous mode, the sender might have exited early if all files are up to date + // Check if the first stream is still valid + match stream.peer_addr() { + Ok(_) => streams.push(stream), + Err(_) => { + // Connection was closed, nothing to receive + } + } + for _ in 1..n_connections { match TcpStream::connect(addr) { // The sender stops listening after all transfers are complete. For @@ -748,10 +953,8 @@ fn main_recv( // crates, and create gigabytes of build artifacts, just to do a clean exit. // So as a hack, just connect one more time to wake up the sender's accept() // loop. It will conclude there is nothing to send and then exit. - match TcpStream::connect(addr) { - Ok(stream) => std::mem::drop(stream), - // Too bad if we can't wake up the sender, but it's not our problem. - Err(_) => {} + if let Ok(stream) = TcpStream::connect(addr) { + std::mem::drop(stream) } writer_thread.join().expect("Failed to join writer thread."); @@ -777,20 +980,21 @@ mod tests { main_send( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), &["a-file".into()], - 1, + 3, events_tx, None, Verbosity::Silent, + false, ) .unwrap(); }); match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - main_recv( + do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, - 1, + 3, + false, Verbosity::Silent, ) .unwrap(); @@ -810,16 +1014,17 @@ mod tests { events_tx, None, Verbosity::Silent, + false, ) .unwrap(); }); match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - let res = main_recv( + let res = do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, 1, + false, Verbosity::Silent, ); assert_eq!( @@ -853,15 +1058,17 @@ mod tests { fn test_sends_large_file() { let (events_tx, events_rx) = std::sync::mpsc::channel::(); env::set_current_dir("/tmp/").unwrap(); - let cwd = env::current_dir().unwrap(); - thread::spawn(|| { - let td = TempDir::new_in(".").unwrap(); - let tmp_path = td.path().strip_prefix(cwd).unwrap(); - let path = tmp_path.join("large"); - let fnames = &[path.clone().into_os_string().into_string().unwrap()]; + + let td = TempDir::new_in(".").unwrap(); + let tmp_dir_name = td.path().file_name().unwrap().to_string_lossy().to_string(); + + thread::spawn(move || { + env::set_current_dir("/tmp/").unwrap(); + let path = format!("{}/large", tmp_dir_name); + let fnames = &[path.clone()]; { - let mut f = std::fs::File::create(path).unwrap(); + let mut f = std::fs::File::create(&path).unwrap(); f.write_all(&vec![0u8; MAX_CHUNK_LEN as usize * 100]) .unwrap(); } @@ -869,20 +1076,21 @@ mod tests { main_send( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), fnames, - 1, + 3, events_tx, None, Verbosity::Silent, + false, ) .unwrap(); }); match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - main_recv( + do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, - 1, + 3, + false, Verbosity::Silent, ) .unwrap(); @@ -907,24 +1115,130 @@ mod tests { main_send( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), &fnames, - 1, + 3, + events_tx, + None, + Verbosity::Silent, + false, + ) + .unwrap(); + }); + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + do_recv_iteration( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), + 1, + 3, + false, + Verbosity::Silent, + ) + .unwrap(); + } + } + } + + #[test] + fn test_continuous_sync() { + // Basic test - just ensure continuous mode can handle files properly + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); + let cwd = env::current_dir().unwrap(); + + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(&cwd).unwrap(); + let fname = tmp_path.join("cont_test.txt").to_str().unwrap().to_string(); + std::fs::write(&fname, b"test content").unwrap(); + + thread::spawn(move || { + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &[fname], + 3, events_tx, None, Verbosity::Silent, + false, // Not continuous for this test ) .unwrap(); }); + match events_rx.recv().unwrap() { SenderEvent::Listening(port) => { - main_recv( + do_recv_iteration( SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, - WriteMode::Force, + 3, + false, + Verbosity::Silent, + ) + .unwrap(); + } + } + } + + #[test] + fn test_timestamp_preservation() { + const TEST_TIMESTAMP: u64 = 1672574400; + + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); + let cwd = env::current_dir().unwrap(); + + // Create a file with a specific timestamp + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(&cwd).unwrap(); + let file_path = tmp_path.join("test_timestamp.txt"); + + // Write content and set a specific timestamp + std::fs::write(&file_path, b"Test content for timestamp").unwrap(); + + set_file_mtime(file_path.to_str().unwrap(), TEST_TIMESTAMP).unwrap(); + + // Get the timestamp we just set + let src_metadata = std::fs::metadata(&file_path).unwrap(); + let src_mtime = get_mtime(&src_metadata); + + // Send the file + thread::spawn({ + let file_path = file_path.clone(); + move || { + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &[file_path.to_str().unwrap().to_string()], + 3, + events_tx, + None, + Verbosity::Silent, + false, + ) + .unwrap(); + } + }); + + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + do_recv_iteration( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), 1, + 3, + false, Verbosity::Silent, ) .unwrap(); } } + + // Verify the file exists and has the same timestamp + let dst_metadata = std::fs::metadata(&file_path).unwrap(); + let dst_mtime = get_mtime(&dst_metadata); + + assert_eq!( + src_mtime, dst_mtime, + "Timestamp should be preserved after transfer" + ); + assert_eq!( + src_mtime, TEST_TIMESTAMP as u64, + "Source timestamp should be the value we set" + ); } } diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 0000000..ed33f6c --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,13 @@ +use borsh::{BorshDeserialize, BorshSerialize}; + +#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)] +pub enum FileStatus { + Missing, + UpToDate, + NeedsSync { len: u64, mtime: u64 }, +} + +#[derive(BorshSerialize, BorshDeserialize, Debug)] +pub struct ManifestReply { + pub files: Vec, +}