Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,13 @@ def process_pipe(self, *, path, cache, fd, mode, user=None, group=None):
item.uid = uid
if gid is not None:
item.gid = gid
self.process_file_chunks(item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd)))
self.print_file_status(None, path, phase="start")
try:
self.process_file_chunks(
item, cache, self.stats, self.show_progress, backup_io_iter(self.chunker.chunkify(fd))
)
finally:
self.print_file_status(None, path, phase="end")
item.get_size(memorize=True)
self.stats.nfiles += 1
self.add_item(item, stats=self.stats)
Expand Down Expand Up @@ -1475,17 +1481,21 @@ def process_file(self, *, path, parent_fd, name, st, cache, flags=flags_normal,
# Only chunkify the file if needed
changed_while_backup = False
if "chunks" not in item:
start_reading = time.time_ns()
with backup_io("read"):
self.process_file_chunks(
item,
cache,
self.stats,
self.show_progress,
backup_io_iter(self.chunker.chunkify(None, fd)),
)
self.stats.chunking_time = self.chunker.chunking_time
end_reading = time.time_ns()
self.print_file_status(None, path, phase="start")
try:
start_reading = time.time_ns()
with backup_io("read"):
self.process_file_chunks(
item,
cache,
self.stats,
self.show_progress,
backup_io_iter(self.chunker.chunkify(None, fd)),
)
self.stats.chunking_time = self.chunker.chunking_time
end_reading = time.time_ns()
finally:
self.print_file_status(None, path, phase="end")
with backup_io("fstat2"):
st2 = os.fstat(fd)
if self.files_changed == "disabled" or is_special_file:
Expand Down
26 changes: 24 additions & 2 deletions src/borg/archiver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,34 @@ def print_warning_instance(self, warning):
msg, msgid, args, wc = cls.__doc__, cls.__qualname__, warning.args, warning.exit_code
self.print_warning(msg, *args, wc=wc, wt="curly", msgid=msgid)

def print_file_status(self, status, path):
# if we get called with status == None, the final file status was already printed
def print_file_status(self, status, path, *, phase=None, error=None):
# START lifecycle event
if self.output_list and self.log_json and phase == "start" and status is None:
json_data = {"type": "file_status", "phase": "start"}
json_data |= text_to_json("path", path)
if error is not None:
json_data["error"] = error
print(json.dumps(json_data), file=sys.stderr)
return

# END lifecycle event
if self.output_list and self.log_json and phase == "end" and status is None:
json_data = {"type": "file_status", "phase": "end"}
json_data |= text_to_json("path", path)
if error is not None:
json_data["error"] = error
print(json.dumps(json_data), file=sys.stderr)
return

# regular status event (A, M, U, -, d, s, etc.)
if self.output_list and status is not None and (self.output_filter is None or status in self.output_filter):
if self.log_json:
json_data = {"type": "file_status", "status": status}
json_data |= text_to_json("path", path)
if phase is not None:
json_data["phase"] = phase
if error is not None:
json_data["error"] = error
print(json.dumps(json_data), file=sys.stderr)
else:
logging.getLogger("borg.output.list").info("%1s %s", status, remove_surrogates(path))
Expand Down
239 changes: 126 additions & 113 deletions src/borg/archiver/create_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
raise Error(f"{path!r}: {e}")
else:
status = "+" # included
self.print_file_status(status, path)
self.print_file_status(status, path, phase="end")
elif args.paths_from_command or args.paths_from_shell_command or args.paths_from_stdin:
paths_sep = eval_escapes(args.paths_delimiter) if args.paths_delimiter is not None else "\n"
if args.paths_from_command or args.paths_from_shell_command:
Expand Down Expand Up @@ -139,7 +139,7 @@
status = "E"
if status == "C":
self.print_warning_instance(FileChangedWarning(path))
self.print_file_status(status, path)
self.print_file_status(status, path, phase="end")
if not dry_run and status is not None:
fso.stats.files_stats[status] += 1
if args.paths_from_command or args.paths_from_shell_command:
Expand Down Expand Up @@ -167,7 +167,7 @@
status = "E"
else:
status = "+" # included
self.print_file_status(status, path)
self.print_file_status(status, path, phase="end")
if not dry_run and status is not None:
fso.stats.files_stats[status] += 1
continue
Expand Down Expand Up @@ -293,134 +293,147 @@
"""
Call the right method on the given FilesystemObjectProcessor.
"""

if dry_run:
return "+" # included
MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0")
for retry in range(MAX_RETRIES):
last_try = retry == MAX_RETRIES - 1
try:
if stat.S_ISREG(st.st_mode):
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix

# Types not archived: no list start/end pair (matches prior behavior of no status line).
if stat.S_ISSOCK(st.st_mode):
return
elif stat.S_ISDOOR(st.st_mode):
return
elif stat.S_ISPORT(st.st_mode):
return

m = st.st_mode
if not (
stat.S_ISREG(m)
or stat.S_ISDIR(m)
or stat.S_ISLNK(m)
or stat.S_ISFIFO(m)
or stat.S_ISCHR(m)
or stat.S_ISBLK(m)
):
self.print_warning("Unknown file type: %s", path)
return

# Emit START once, before any processing, before the retry loop.
self.print_file_status(None, path, phase="start")

try:
MAX_RETRIES = 10 # count includes the initial try (initial try == "retry 0")
for retry in range(MAX_RETRIES):
last_try = retry == MAX_RETRIES - 1
try:
if stat.S_ISREG(st.st_mode):
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
last_try=last_try,
strip_prefix=strip_prefix,
)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
elif stat.S_ISDIR(st.st_mode):
return fso.process_dir(path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix)

Check failure on line 338 in src/borg/archiver/create_cmd.py

View workflow job for this annotation

GitHub Actions / lint

ruff (E501)

src/borg/archiver/create_cmd.py:338:121: E501 Line too long (123 > 120)
elif stat.S_ISLNK(st.st_mode):
if not read_special:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
else:
try:
st_target = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=True)
except OSError:
special = False
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st_target,
cache=cache,
flags=flags_special_follow,
last_try=last_try,
strip_prefix=strip_prefix,
)
else:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
else:
special = is_special(st_target.st_mode)
if special:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st_target,
st=st,
cache=cache,
flags=flags_special_follow,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c", strip_prefix=strip_prefix

Check failure on line 385 in src/borg/archiver/create_cmd.py

View workflow job for this annotation

GitHub Actions / lint

ruff (E501)

src/borg/archiver/create_cmd.py:385:121: E501 Line too long (121 > 120)
)
else:
return fso.process_symlink(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b", strip_prefix=strip_prefix

Check failure on line 401 in src/borg/archiver/create_cmd.py

View workflow job for this annotation

GitHub Actions / lint

ruff (E501)

src/borg/archiver/create_cmd.py:401:121: E501 Line too long (121 > 120)
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISFIFO(st.st_mode):
if not read_special:
return fso.process_fifo(
path=path, parent_fd=parent_fd, name=name, st=st, strip_prefix=strip_prefix
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISCHR(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="c", strip_prefix=strip_prefix
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISBLK(st.st_mode):
if not read_special:
return fso.process_dev(
path=path, parent_fd=parent_fd, name=name, st=st, dev_type="b", strip_prefix=strip_prefix
self.print_warning("Unknown file type: %s", path)
return
except BackupItemExcluded:
return "-"
except BackupError as err:
if isinstance(err, BackupOSError):
if err.errno in (errno.EPERM, errno.EACCES):
raise
sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2)
time.sleep(sleep_s)
if retry < MAX_RETRIES - 1:
logger.warning(
f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..."
)
else:
return fso.process_file(
path=path,
parent_fd=parent_fd,
name=name,
st=st,
cache=cache,
flags=flags_special,
last_try=last_try,
strip_prefix=strip_prefix,
)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return
elif stat.S_ISDOOR(st.st_mode):
# Ignore Solaris doors
return
elif stat.S_ISPORT(st.st_mode):
# Ignore Solaris event ports
return
else:
self.print_warning("Unknown file type: %s", path)
return
except BackupItemExcluded:
return "-"
except BackupError as err:
if isinstance(err, BackupOSError):
if err.errno in (errno.EPERM, errno.EACCES):
# Do not try again, such errors can not be fixed by retrying.
raise
# sleep a bit, so temporary problems might go away...
sleep_s = 1000.0 / 1e6 * 10 ** (retry / 2) # retry 0: 1ms, retry 6: 1s, ...
time.sleep(sleep_s)
if retry < MAX_RETRIES - 1:
logger.warning(
f"{path}: {err}, slept {sleep_s:.3f}s, next: retry: {retry + 1} of {MAX_RETRIES - 1}..."
)
else:
# giving up with retries, error will be dealt with (logged) by upper error handler
raise
# we better do a fresh stat on the file, just to make sure to get the current file
# mode right (which could have changed due to a race condition and is important for
# dispatching) and also to get current inode number of that file.
with backup_io("stat"):
st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)
with backup_io("stat"):
st = os_stat(path=path, parent_fd=parent_fd, name=name, follow_symlinks=False)
finally:
# END is always emitted here — after ALL processing including chunked I/O,
# even on exception, even on retry exhaustion.
self.print_file_status(None, path, phase="end")

def _rec_walk(
self,
Expand Down
Loading
Loading