From d80724b24bee854752ceb1c37878b3072561984d Mon Sep 17 00:00:00 2001 From: Yash Kaushik Date: Sun, 22 Mar 2026 19:25:43 +0530 Subject: [PATCH 1/2] Add phase field to file_status JSON output (start/end lifecycle events) --- src/borg/archiver/__init__.py | 26 +- src/borg/archiver/create_cmd.py | 239 +++++++++--------- .../testsuite/archiver/create_cmd_test.py | 127 ++++++++++ 3 files changed, 276 insertions(+), 116 deletions(-) diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index dadb4c7d83..5c78331c93 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -155,14 +155,34 @@ def print_warning_instance(self, warning): msg, msgid, args, wc = cls.__doc__, cls.__qualname__, warning.args, warning.exit_code self.print_warning(msg, *args, wc=wc, wt="curly", msgid=msgid) - def print_file_status(self, status, path): - # if we get called with status == None, the final file status was already printed + def print_file_status(self, status, path, *, phase=None, error=None): + # START lifecycle event (JSON only) + if self.output_list and self.log_json and phase == "start" and status is None: + json_data = {"type": "file_status", "phase": "start"} + json_data |= text_to_json("path", path) + if error is not None: + json_data["error"] = error + print(json.dumps(json_data), file=sys.stderr) + return + # END lifecycle event (JSON only) + if self.output_list and self.log_json and phase == "end" and status is None: + json_data = {"type": "file_status", "phase": "end"} + json_data |= text_to_json("path", path) + if error is not None: + json_data["error"] = error + print(json.dumps(json_data), file=sys.stderr) + return + # regular status event (A, M, U, -, d, s, etc.) if self.output_list and status is not None and (self.output_filter is None or status in self.output_filter): if self.log_json: json_data = {"type": "file_status", "status": status} json_data |= text_to_json("path", path) + if phase is not None: + json_data["phase"] = phase + if error is not None: + json_data["error"] = error print(json.dumps(json_data), file=sys.stderr) - else: + elif not self.log_json and status is not None: logging.getLogger("borg.output.list").info("%1s %s", status, remove_surrogates(path)) def preprocess_args(self, args): diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index e7ed66d35b..847397748a 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -90,7 +90,7 @@ def create_inner(archive, cache, fso): raise Error(f"{path!r}: {e}") else: status = "+" # included - self.print_file_status(status, path) + self.print_file_status(status, path, phase="end") elif args.paths_from_command or args.paths_from_shell_command or args.paths_from_stdin: paths_sep = eval_escapes(args.paths_delimiter) if args.paths_delimiter is not None else "\n" if args.paths_from_command or args.paths_from_shell_command: @@ -139,7 +139,7 @@ def create_inner(archive, cache, fso): status = "E" if status == "C": self.print_warning_instance(FileChangedWarning(path)) - self.print_file_status(status, path) + self.print_file_status(status, path, phase="end") if not dry_run and status is not None: fso.stats.files_stats[status] += 1 if args.paths_from_command or args.paths_from_shell_command: @@ -167,7 +167,7 @@ def create_inner(archive, cache, fso): status = "E" else: status = "+" # included - self.print_file_status(status, path) + self.print_file_status(status, path, phase="end") if not dry_run and status is not None: fso.stats.files_stats[status] += 1 continue @@ -293,134 +293,147 @@ def _process_any(self, *, path, parent_fd, name, st, fso, cache, read_special, d """ Call the right method on the given FilesystemObjectProcessor. """ - if dry_run: return "+" # included - MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0") - for retry in range(MAX_RETRIES): - last_try = retry == MAX_RETRIES - 1 - try: - if stat.S_ISREG(st.st_mode): - return fso.process_file( - path=path, - parent_fd=parent_fd, - name=name, - st=st, - cache=cache, - last_try=last_try, - strip_prefix=strip_prefix, - ) - elif stat.S_ISDIR(st.st_mode): - return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) - elif stat.S_ISLNK(st.st_mode): - if not read_special: - return fso.process_symlink( - path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix + + # Types not archived: no list start/end pair (matches prior behavior of no status line). + if stat.S_ISSOCK(st.st_mode): + return + elif stat.S_ISDOOR(st.st_mode): + return + elif stat.S_ISPORT(st.st_mode): + return + + m = st.st_mode + if not ( + stat.S_ISREG(m) + or stat.S_ISDIR(m) + or stat.S_ISLNK(m) + or stat.S_ISFIFO(m) + or stat.S_ISCHR(m) + or stat.S_ISBLK(m) + ): + self.print_warning("Unknown file type: %s", path) + return + + # Emit START once, before any processing, before the retry loop. + self.print_file_status(None, path, phase="start") + + try: + MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0") + for retry in range(MAX_RETRIES): + last_try = retry == MAX_RETRIES - 1 + try: + if stat.S_ISREG(st.st_mode): + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st, + cache=cache, + last_try=last_try, + strip_prefix=strip_prefix, ) - else: - try: - st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True) - except OSError: - special = False + elif stat.S_ISDIR(st.st_mode): + return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix) + elif stat.S_ISLNK(st.st_mode): + if not read_special: + return fso.process_symlink( + path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix + ) + else: + try: + st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True) + except OSError: + special = False + else: + special = is_special(st_target.st_mode) + if special: + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st_target, + cache=cache, + flags=flags_special_follow, + last_try=last_try, + strip_prefix=strip_prefix, + ) + else: + return fso.process_symlink( + path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix + ) + elif stat.S_ISFIFO(st.st_mode): + if not read_special: + return fso.process_fifo( + path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix + ) else: - special = is_special(st_target.st_mode) - if special: return fso.process_file( path=path, parent_fd=parent_fd, name=name, - st=st_target, + st=st, cache=cache, - flags=flags_special_follow, + flags=flags_special, last_try=last_try, strip_prefix=strip_prefix, ) + elif stat.S_ISCHR(st.st_mode): + if not read_special: + return fso.process_dev( + path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c", strip_prefix=strip_prefix + ) else: - return fso.process_symlink( - path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st, + cache=cache, + flags=flags_special, + last_try=last_try, + strip_prefix=strip_prefix, + ) + elif stat.S_ISBLK(st.st_mode): + if not read_special: + return fso.process_dev( + path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b", strip_prefix=strip_prefix + ) + else: + return fso.process_file( + path=path, + parent_fd=parent_fd, + name=name, + st=st, + cache=cache, + flags=flags_special, + last_try=last_try, + strip_prefix=strip_prefix, ) - elif stat.S_ISFIFO(st.st_mode): - if not read_special: - return fso.process_fifo( - path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix - ) - else: - return fso.process_file( - path=path, - parent_fd=parent_fd, - name=name, - st=st, - cache=cache, - flags=flags_special, - last_try=last_try, - strip_prefix=strip_prefix, - ) - elif stat.S_ISCHR(st.st_mode): - if not read_special: - return fso.process_dev( - path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c", strip_prefix=strip_prefix - ) else: - return fso.process_file( - path=path, - parent_fd=parent_fd, - name=name, - st=st, - cache=cache, - flags=flags_special, - last_try=last_try, - strip_prefix=strip_prefix, - ) - elif stat.S_ISBLK(st.st_mode): - if not read_special: - return fso.process_dev( - path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b", strip_prefix=strip_prefix + self.print_warning("Unknown file type: %s", path) + return + except BackupItemExcluded: + return "-" + except BackupError as err: + if isinstance(err, BackupOSError): + if err.errno in (errno.EPERM, errno.EACCES): + raise + sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) + time.sleep(sleep_s) + if retry < MAX_RETRIES - 1: + logger.warning( + f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..." ) else: - return fso.process_file( - path=path, - parent_fd=parent_fd, - name=name, - st=st, - cache=cache, - flags=flags_special, - last_try=last_try, - strip_prefix=strip_prefix, - ) - elif stat.S_ISSOCK(st.st_mode): - # Ignore unix sockets - return - elif stat.S_ISDOOR(st.st_mode): - # Ignore Solaris doors - return - elif stat.S_ISPORT(st.st_mode): - # Ignore Solaris event ports - return - else: - self.print_warning("Unknown file type: %s", path) - return - except BackupItemExcluded: - return "-" - except BackupError as err: - if isinstance(err, BackupOSError): - if err.errno in (errno.EPERM, errno.EACCES): - # Do not try again, such errors can not be fixed by retrying. raise - # sleep a bit, so temporary problems might go away... - sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ... - time.sleep(sleep_s) - if retry < MAX_RETRIES - 1: - logger.warning( - f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..." - ) - else: - # giving up with retries, error will be dealt with (logged) by upper error handler - raise - # we better do a fresh stat on the file, just to make sure to get the current file - # mode right (which could have changed due to a race condition and is important for - # dispatching) and also to get current inode number of that file. - with backup_io("stat"): - st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False) + with backup_io("stat"): + st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False) + finally: + # END is always emitted here — after ALL processing including chunked I/O, + # even on exception, even on retry exhaustion. + self.print_file_status(None, path, phase="end") def _rec_walk( self, diff --git a/src/borg/testsuite/archiver/create_cmd_test.py b/src/borg/testsuite/archiver/create_cmd_test.py index 2562aa3779..963244e4d1 100644 --- a/src/borg/testsuite/archiver/create_cmd_test.py +++ b/src/borg/testsuite/archiver/create_cmd_test.py @@ -1155,6 +1155,133 @@ def test_create_with_compression_algorithms(archivers, request): assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input")) +def test_file_status_phase_regular_file(archivers, request): + """Test that start/end lifecycle events are emitted for regular files.""" + archiver = request.getfixturevalue(archivers) + create_regular_file(archiver.input_path, "file1", size=1024 * 80) + cmd(archiver, "repo-create", RK_ENCRYPTION) + log = cmd(archiver, "create", "test", "input", "--log-json", "--list") + events = [json.loads(line) for line in log.splitlines() if line.startswith("{")] + file_events = [ + e for e in events + if e.get("type") == "file_status" and e.get("path", "").endswith("file1") + ] + phases = [e.get("phase") for e in file_events] + # start must come before end, and both must be present + assert "start" in phases + assert "end" in phases + assert phases.index("start") < phases.index("end") + # end must be the last event for this file + assert phases[-1] == "end" + + +def test_file_status_phase_symlink(archivers, request): + """Test that start/end lifecycle events are emitted for symlinks.""" + if not are_symlinks_supported(): + pytest.skip("symlinks not supported") + archiver = request.getfixturevalue(archivers) + os.symlink("file1", os.path.join(archiver.input_path, "link1")) + create_regular_file(archiver.input_path, "file1", size=1024) + cmd(archiver, "repo-create", RK_ENCRYPTION) + log = cmd(archiver, "create", "test", "input", "--log-json", "--list") + events = [json.loads(line) for line in log.splitlines() if line.startswith("{")] + link_events = [ + e for e in events + if e.get("type") == "file_status" and e.get("path", "").endswith("link1") + ] + phases = [e.get("phase") for e in link_events] + assert "start" in phases + assert "end" in phases + assert phases.index("start") < phases.index("end") + + +def test_file_status_phase_read_special(archivers, request): + """Test that start/end lifecycle events are emitted for --read-special paths. + + This is the critical regression test: previously --read-special file types + (symlinks, FIFOs, char/block devices) did NOT emit start/end events. + """ + if not are_symlinks_supported(): + pytest.skip("symlinks not supported") + archiver = request.getfixturevalue(archivers) + # create a regular file and a symlink pointing to it + create_regular_file(archiver.input_path, "target", size=1024) + os.symlink( + os.path.join(archiver.input_path, "target"), + os.path.join(archiver.input_path, "link_to_target") + ) + cmd(archiver, "repo-create", RK_ENCRYPTION) + log = cmd(archiver, "create", "test", "input", "--read-special", "--log-json", "--list") + events = [json.loads(line) for line in log.splitlines() if line.startswith("{")] + link_events = [ + e for e in events + if e.get("type") == "file_status" and e.get("path", "").endswith("link_to_target") + ] + phases = [e.get("phase") for e in link_events] + # This would fail before the fix: --read-special symlinks got no start event + assert "start" in phases, "start event missing for --read-special symlink" + assert "end" in phases, "end event missing for --read-special symlink" + assert phases.index("start") < phases.index("end") + + +def test_file_status_phase_no_orphan_events(archivers, request): + """Test that every start event has a matching end event and vice versa. + + No file should have an end without a start (orphan end), + or a start without an end (orphan start). + """ + archiver = request.getfixturevalue(archivers) + create_regular_file(archiver.input_path, "file1", size=1024 * 80) + create_regular_file(archiver.input_path, "file2", size=1024 * 80) + create_regular_file(archiver.input_path, "dir1/file3", size=1024) + cmd(archiver, "repo-create", RK_ENCRYPTION) + log = cmd(archiver, "create", "test", "input", "--log-json", "--list") + events = [json.loads(line) for line in log.splitlines() if line.startswith("{")] + file_events = [e for e in events if e.get("type") == "file_status"] + + # Group events by path + from collections import defaultdict + events_by_path = defaultdict(list) + for e in file_events: + path = e.get("path") + phase = e.get("phase") + if phase in ("start", "end"): + events_by_path[path].append(phase) + + for path, phases in events_by_path.items(): + starts = phases.count("start") + ends = phases.count("end") + assert starts == ends, ( + f"{path}: mismatched lifecycle events — {starts} start(s), {ends} end(s)" + ) + assert starts == 1, ( + f"{path}: expected exactly 1 start/end pair, got {starts}" + ) + assert phases[0] == "start", f"{path}: first event is not 'start'" + assert phases[-1] == "end", f"{path}: last event is not 'end'" + + +def test_file_status_phase_excluded_no_lifecycle(archivers, request): + """Test that excluded files do not emit start/end lifecycle events.""" + archiver = request.getfixturevalue(archivers) + create_regular_file(archiver.input_path, "file1", size=1024 * 80) + create_regular_file(archiver.input_path, "excluded_file", size=1024 * 80) + cmd(archiver, "repo-create", RK_ENCRYPTION) + log = cmd( + archiver, "create", "test", "input", + "--log-json", "--list", "--exclude", "*/excluded_file" + ) + events = [json.loads(line) for line in log.splitlines() if line.startswith("{")] + excluded_events = [ + e for e in events + if e.get("type") == "file_status" and e.get("path", "").endswith("excluded_file") + ] + phases = [e.get("phase") for e in excluded_events] + # excluded files must not have lifecycle events + assert "start" not in phases, "excluded file should not emit start event" + assert "end" not in phases, "excluded file should not emit end event" + + def test_exclude_nodump_dir_with_file(archivers, request): """A directory flagged NODUMP and its contents must not be archived.""" archiver = request.getfixturevalue(archivers) From e6d6ab66e7f0cbad1b0ca573ec8df6de8d011458 Mon Sep 17 00:00:00 2001 From: Yash Kaushik Date: Tue, 24 Mar 2026 01:13:58 +0530 Subject: [PATCH 2/2] Add phase field to file_status JSON output with correct lifecycle emission --- src/borg/archive.py | 34 ++++++++++++++++++++++------------ src/borg/archiver/__init__.py | 8 +++++--- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index dcc85deef1..2204250054 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1408,7 +1408,13 @@ def process_pipe(self, *, path, cache, fd, mode, user=None, group=None): item.uid = uid if gid is not None: item.gid = gid - self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))) + self.print_file_status(None, path, phase="start") + try: + self.process_file_chunks( + item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)) + ) + finally: + self.print_file_status(None, path, phase="end") item.get_size(memorize=True) self.stats.nfiles += 1 self.add_item(item, stats=self.stats) @@ -1475,17 +1481,21 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal, # Only chunkify the file if needed changed_while_backup = False if "chunks" not in item: - start_reading = time.time_ns() - with backup_io("read"): - self.process_file_chunks( - item, - cache, - self.stats, - self.show_progress, - backup_io_iter(self.chunker.chunkify(None, fd)), - ) - self.stats.chunking_time = self.chunker.chunking_time - end_reading = time.time_ns() + self.print_file_status(None, path, phase="start") + try: + start_reading = time.time_ns() + with backup_io("read"): + self.process_file_chunks( + item, + cache, + self.stats, + self.show_progress, + backup_io_iter(self.chunker.chunkify(None, fd)), + ) + self.stats.chunking_time = self.chunker.chunking_time + end_reading = time.time_ns() + finally: + self.print_file_status(None, path, phase="end") with backup_io("fstat2"): st2 = os.fstat(fd) if self.files_changed == "disabled" or is_special_file: diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 5c78331c93..92c85e0e8a 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -156,7 +156,7 @@ def print_warning_instance(self, warning): self.print_warning(msg, *args, wc=wc, wt="curly", msgid=msgid) def print_file_status(self, status, path, *, phase=None, error=None): - # START lifecycle event (JSON only) + # START lifecycle event if self.output_list and self.log_json and phase == "start" and status is None: json_data = {"type": "file_status", "phase": "start"} json_data |= text_to_json("path", path) @@ -164,7 +164,8 @@ def print_file_status(self, status, path, *, phase=None, error=None): json_data["error"] = error print(json.dumps(json_data), file=sys.stderr) return - # END lifecycle event (JSON only) + + # END lifecycle event if self.output_list and self.log_json and phase == "end" and status is None: json_data = {"type": "file_status", "phase": "end"} json_data |= text_to_json("path", path) @@ -172,6 +173,7 @@ def print_file_status(self, status, path, *, phase=None, error=None): json_data["error"] = error print(json.dumps(json_data), file=sys.stderr) return + # regular status event (A, M, U, -, d, s, etc.) if self.output_list and status is not None and (self.output_filter is None or status in self.output_filter): if self.log_json: @@ -182,7 +184,7 @@ def print_file_status(self, status, path, *, phase=None, error=None): if error is not None: json_data["error"] = error print(json.dumps(json_data), file=sys.stderr) - elif not self.log_json and status is not None: + else: logging.getLogger("borg.output.list").info("%1s %s", status, remove_surrogates(path)) def preprocess_args(self, args):