Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Install pip dependencies
run: |
python -m pip install --upgrade pip wheel setuptools
pip install -e .[server]
pip install -e .[server,tests]
pip install pytest

- name: Run tests
Expand Down
51 changes: 40 additions & 11 deletions filetracker/servers/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

_LOCK_RETRIES = 20
_LOCK_SLEEP_TIME_S = 1
_DB_DEADLOCK_RETRIES = 50


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,6 +82,8 @@ def __init__(self, base_dir):

# https://docs.oracle.com/cd/E17076_05/html/programmer_reference/transapp_env_open.html
self.db_env = bsddb3.db.DBEnv()
# Enable running the deadlock detector when lock conflicts are automatically detected.
self.db_env.set_lk_detect(bsddb3.db.DB_LOCK_DEFAULT)
try:
self.db_env.open(
self.db_dir,
Expand Down Expand Up @@ -182,9 +185,8 @@ def store(
logger.debug('Acquired lock for blob %s.', digest)
digest_bytes = digest.encode()

with self._db_transaction() as txn:
logger.debug('Started DB transaction (adding link).')
link_count = int(self.db.get(digest_bytes, 0, txn=txn))
def transaction_contents(txn):
link_count = int(self.db.get(digest_bytes, 0, txn=txn, flags=bsddb3.db.DB_RMW))
new_count = str(link_count + 1).encode()
self.db.put(digest_bytes, new_count, txn=txn)

Expand All @@ -194,9 +196,11 @@ def store(
str(logical_size).encode(),
txn=txn,
)
logger.debug('Commiting DB transaction (adding link).')
return link_count

logger.debug('Committed DB transaction (adding link).')
link_count = self._call_in_transaction_with_retries(
transaction_contents, "adding link"
)

# Create a new blob if this isn't a duplicate.
if link_count == 0:
Expand Down Expand Up @@ -268,12 +272,11 @@ def delete(self, name, version, _lock=True):

with _exclusive_lock(self._lock_path('blobs', digest)):
logger.debug('Acquired lock for blob %s.', digest)
should_delete_blob = False

with self._db_transaction() as txn:
logger.debug('Started DB transaction (deleting link).')
def transaction_contents(txn):
should_delete_blob = False
digest_bytes = digest.encode()
link_count = self.db.get(digest_bytes, txn=txn)
link_count = self.db.get(digest_bytes, txn=txn, flags=bsddb3.db.DB_RMW)
if link_count is None:
raise RuntimeError("File exists but has no key in db")

Expand All @@ -288,9 +291,11 @@ def delete(self, name, version, _lock=True):
else:
new_count = str(link_count - 1).encode()
self.db.put(digest_bytes, new_count, txn=txn)
logger.debug('Committing DB transaction (deleting link).')
return should_delete_blob

logger.debug('Committed DB transaction (deleting link).')
should_delete_blob = self._call_in_transaction_with_retries(
transaction_contents, "deleting link"
)

os.unlink(link_path)
logger.debug('Deleted link %s.', name)
Expand Down Expand Up @@ -339,6 +344,30 @@ def _db_transaction(self):
else:
txn.commit()

def _call_in_transaction_with_retries(self, func, description):
retries = 0
result = None
while True:
try:
with self._db_transaction() as txn:
logger.debug('Started DB transaction ({}).'.format(description))
result = func(txn)
logger.debug('Commiting DB transaction ({}).'.format(description))
except bsddb3.db.DBLockDeadlockError:
retries += 1
if retries > _DB_DEADLOCK_RETRIES:
logger.error('BSDDB deadlock detected, retry limit exceeded ({}).'.format(description))
raise
logger.warning(
'BSDDB deadlock detected in transaction ({}), retry no {}.'.format(
description, retries,
)
)
continue
break
logger.debug('Committed DB transaction ({}).'.format(description))
return result

def _digest_for_link(self, name):
link = self._link_path(name)
blob_path = os.readlink(link)
Expand Down
66 changes: 59 additions & 7 deletions filetracker/tests/parallel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from __future__ import division
from __future__ import print_function

from multiprocessing import Process
from multiprocessing import Barrier, Process
import os
import psutil
import shutil
import tempfile
import time
Expand All @@ -25,6 +26,17 @@
_FILE_SIZE = 6 * 1024 * 1024
_PARALLEL_CLIENTS = 5
_TEST_PORT_NUMBER = 45745
_COPIES_TO_UPLOAD = 500
_SUBPROCESS_TIMEOUT_S = 20


def kill_process_tree(pid):
parent = psutil.Process(pid)
for p in parent.children(recursive=True) + [parent]:
try:
p.kill()
except psutil.NoSuchProcess:
pass


class ParallelTest(unittest.TestCase):
Expand Down Expand Up @@ -52,19 +64,23 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
cls.server_process.terminate()
kill_process_tree(cls.server_process.pid)
cls.server_process.join()
shutil.rmtree(cls.server_dir)
shutil.rmtree(cls.temp_dir)

def setUp(self):
# Shortcuts for convenience
self.server_dir = ParallelTest.server_dir
self.temp_dir = ParallelTest.temp_dir
self.clients = ParallelTest.clients
cls = self.__class__
self.server_dir = cls.server_dir
self.temp_dir = cls.temp_dir
self.clients = cls.clients

# For non-parallel requests
self.client = ParallelTest.clients[0]
self.client = cls.clients[0]


class ParallelTestSameFile(ParallelTest):
def test_only_last_parallel_upload_of_same_file_should_succeed(self):
processes = []

Expand Down Expand Up @@ -98,7 +114,43 @@ def test_only_last_parallel_upload_of_same_file_should_succeed(self):
self.assertEqual(f.read(), lf.read())


class ParallelTestDeadlocks(ParallelTest):
def test_bsddb_deadlocks(self):
processes = []

# Initialize different files for every client.
for i in range(len(self.clients)):
temp_file = os.path.join(self.temp_dir, 'foo{}.txt'.format(i))
text = str(i).encode()
with open(temp_file, 'wb') as tf:
tf.write(text)

# The deadlocks are visible even without this barrier.
barrier = Barrier(len(self.clients))

def job(id, barrier):
temp_file = os.path.join(self.temp_dir, 'foo{}.txt'.format(id))
for i in range(0, _COPIES_TO_UPLOAD):
ft_name = '/foo{}.{}.txt'.format(id, i)
barrier.wait()
client.put_file(ft_name, temp_file, compress_hint=False)

for i, client in enumerate(self.clients):
process = Process(target=lambda: job(i, barrier))
process.start()
processes.append(process)

for process in processes:
process.join(timeout=_SUBPROCESS_TIMEOUT_S)
self.assertFalse(process.is_alive())
self.assertEqual(process.exitcode, 0)
process.join()

# Put one final file to check for e.g. corruption.
client.put_file('/foo_last', temp_file, compress_hint=False)


def _start_server(server_dir):
server_main(
['-p', str(_TEST_PORT_NUMBER), '-d', server_dir, '-D', '--workers', '6']
['-p', str(_TEST_PORT_NUMBER), '-d', server_dir, '-D', '--workers', '3']
)
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ server = [
]
tests = [
"pytest",
"psutil",
]

[project.scripts]
filetracker = "filetracker.client.shell:main"
filetracker-server = "filetracker.servers.run:main"
filetracker-cache-cleaner = "filetracker.scripts.cachecleaner:main"
filetracker-migrate = "filetracker.scripts.migrate:main"
filetracker-recover = "filetracker.scripts.recover:main"
filetracker-recover = "filetracker.scripts.recover:main"
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ envlist = py313

[testenv]
extras = server
tests
deps = pytest
pytest-cov
setenv =
Expand Down