Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pymongo/asynchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
_MAX_RETRIES = 5
_BACKOFF_INITIAL = 0.1
_BACKOFF_MAX = 10
# DRIVERS-3240 will determine these defaults.
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
DEFAULT_RETRY_TOKEN_RETURN = 0.1

Expand All @@ -101,7 +100,6 @@ def __init__(
):
self.lock = _async_create_lock()
self.capacity = capacity
# DRIVERS-3240 will determine how full the bucket should start.
self.tokens = capacity
self.return_rate = return_rate

Expand All @@ -123,7 +121,7 @@ async def deposit(self, retry: bool = False) -> None:
class _RetryPolicy:
"""A retry limiter that performs exponential backoff with jitter.

Retry attempts are limited by a token bucket to prevent overwhelming the server during
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
a prolonged outage or high load.
"""

Expand All @@ -133,15 +131,18 @@ def __init__(
attempts: int = _MAX_RETRIES,
backoff_initial: float = _BACKOFF_INITIAL,
backoff_max: float = _BACKOFF_MAX,
adaptive_retry: bool = False,
):
self.token_bucket = token_bucket
self.attempts = attempts
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max
self.adaptive_retry = adaptive_retry

async def record_success(self, retry: bool) -> None:
"""Record a successful operation."""
await self.token_bucket.deposit(retry)
if self.adaptive_retry:
await self.token_bucket.deposit(retry)

def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given ."""
Expand All @@ -158,7 +159,7 @@ async def should_retry(self, attempt: int, delay: float) -> bool:
return False

# Check token bucket last since we only want to consume a token if we actually retry.
if not await self.token_bucket.consume():
if self.adaptive_retry and not await self.token_bucket.consume():
# DRIVERS-3246 Improve diagnostics when this case happens.
# We could add info to the exception and log.
return False
Expand Down
15 changes: 14 additions & 1 deletion pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,18 @@ def __init__(
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
details.

| **Adaptive retry options:**
| (If not enabled explicitly, adaptive retries will not be enabled.)

- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
Defaults to ``False``.

.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.

.. versionchanged:: 4.17
Added the ``adaptive_retries`` URI and keyword argument.

.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.

Expand Down Expand Up @@ -778,7 +788,6 @@ def __init__(
self._timeout: float | None = None
self._topology_settings: TopologySettings = None # type: ignore[assignment]
self._event_listeners: _EventListeners | None = None
self._retry_policy = _RetryPolicy(_TokenBucket())

# _pool_class, _monitor_class, and _condition_class are for deep
# customization of PyMongo, e.g. Motor.
Expand Down Expand Up @@ -890,6 +899,10 @@ def __init__(
self._opened = False
self._closed = False
self._loop: Optional[asyncio.AbstractEventLoop] = None

self._retry_policy = _RetryPolicy(
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
)
if not is_srv:
self._init_background()

Expand Down
13 changes: 13 additions & 0 deletions pymongo/client_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ def __init__(
self.__server_monitoring_mode = options.get(
"servermonitoringmode", common.SERVER_MONITORING_MODE
)
self.__adaptive_retries = (
options.get("adaptive_retries", common.ADAPTIVE_RETRIES)
if "adaptive_retries" in options
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
)

@property
def _options(self) -> Mapping[str, Any]:
Expand Down Expand Up @@ -346,3 +351,11 @@ def server_monitoring_mode(self) -> str:
.. versionadded:: 4.5
"""
return self.__server_monitoring_mode

@property
def adaptive_retries(self) -> bool:
"""The configured adaptiveRetries option.

.. versionadded:: 4.XX
"""
return self.__adaptive_retries
5 changes: 5 additions & 0 deletions pymongo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@
# Default value for serverMonitoringMode
SERVER_MONITORING_MODE = "auto" # poll/stream/auto

# Default value for adaptiveRetries
ADAPTIVE_RETRIES = False

# Auth mechanism properties that must raise an error instead of warning if they invalidate.
_MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"]

Expand Down Expand Up @@ -738,6 +741,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
"srvmaxhosts": validate_non_negative_integer,
"timeoutms": validate_timeoutms,
"servermonitoringmode": validate_server_monitoring_mode,
"adaptiveretries": validate_boolean_or_string,
}

# Dictionary where keys are the names of URI options specific to pymongo,
Expand Down Expand Up @@ -771,6 +775,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
"server_selector": validate_is_callable_or_none,
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
"authoidcallowedhosts": validate_list,
"adaptive_retries": validate_boolean_or_string,
}

# Dictionary where keys are any URI option name, and values are the
Expand Down
11 changes: 6 additions & 5 deletions pymongo/synchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def inner(*args: Any, **kwargs: Any) -> Any:
_MAX_RETRIES = 5
_BACKOFF_INITIAL = 0.1
_BACKOFF_MAX = 10
# DRIVERS-3240 will determine these defaults.
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
DEFAULT_RETRY_TOKEN_RETURN = 0.1

Expand All @@ -101,7 +100,6 @@ def __init__(
):
self.lock = _create_lock()
self.capacity = capacity
# DRIVERS-3240 will determine how full the bucket should start.
self.tokens = capacity
self.return_rate = return_rate

Expand All @@ -123,7 +121,7 @@ def deposit(self, retry: bool = False) -> None:
class _RetryPolicy:
"""A retry limiter that performs exponential backoff with jitter.

Retry attempts are limited by a token bucket to prevent overwhelming the server during
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
a prolonged outage or high load.
"""

Expand All @@ -133,15 +131,18 @@ def __init__(
attempts: int = _MAX_RETRIES,
backoff_initial: float = _BACKOFF_INITIAL,
backoff_max: float = _BACKOFF_MAX,
adaptive_retry: bool = False,
):
self.token_bucket = token_bucket
self.attempts = attempts
self.backoff_initial = backoff_initial
self.backoff_max = backoff_max
self.adaptive_retry = adaptive_retry

def record_success(self, retry: bool) -> None:
"""Record a successful operation."""
self.token_bucket.deposit(retry)
if self.adaptive_retry:
self.token_bucket.deposit(retry)

def backoff(self, attempt: int) -> float:
"""Return the backoff duration for the given ."""
Expand All @@ -158,7 +159,7 @@ def should_retry(self, attempt: int, delay: float) -> bool:
return False

# Check token bucket last since we only want to consume a token if we actually retry.
if not self.token_bucket.consume():
if self.adaptive_retry and not self.token_bucket.consume():
# DRIVERS-3246 Improve diagnostics when this case happens.
# We could add info to the exception and log.
return False
Expand Down
15 changes: 14 additions & 1 deletion pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,18 @@ def __init__(
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
details.

| **Adaptive retry options:**
| (If not enabled explicitly, adaptive retries will not be enabled.)

- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
Defaults to ``False``.

.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.

.. versionchanged:: 4.17
Added the ``adaptive_retries`` URI and keyword argument.

.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.

Expand Down Expand Up @@ -778,7 +788,6 @@ def __init__(
self._timeout: float | None = None
self._topology_settings: TopologySettings = None # type: ignore[assignment]
self._event_listeners: _EventListeners | None = None
self._retry_policy = _RetryPolicy(_TokenBucket())

# _pool_class, _monitor_class, and _condition_class are for deep
# customization of PyMongo, e.g. Motor.
Expand Down Expand Up @@ -890,6 +899,10 @@ def __init__(
self._opened = False
self._closed = False
self._loop: Optional[asyncio.AbstractEventLoop] = None

self._retry_policy = _RetryPolicy(
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
)
if not is_srv:
self._init_background()

Expand Down
15 changes: 15 additions & 0 deletions test/asynchronous/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,21 @@ async def test_detected_environment_warning(self, mock_get_hosts):
with self.assertWarns(UserWarning):
self.simple_client(multi_host)

async def test_adaptive_retries(self):
# Assert that adaptive retries are disabled by default.
c = self.simple_client(connect=False)
self.assertFalse(c.options.adaptive_retries)

# Assert that adaptive retries can be enabled through connection or client options.
c = self.simple_client(connect=False, adaptive_retries=True)
self.assertTrue(c.options.adaptive_retries)

c = self.simple_client(connect=False, adaptiveRetries=True)
self.assertTrue(c.options.adaptive_retries)

c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False)
self.assertTrue(c.options.adaptive_retries)


class TestClient(AsyncIntegrationTest):
def test_multiple_uris(self):
Expand Down
93 changes: 69 additions & 24 deletions test/asynchronous/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,34 +168,11 @@ async def test_retry_overload_error_getMore(self):
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))

@async_client_context.require_failCommand_appName
async def test_limit_retry_command(self):
client = await self.async_rs_or_single_client()
client._retry_policy.token_bucket.tokens = 1
db = client.pymongo_test
await db.t.insert_one({"x": 1})

# Ensure command is retried once overload error.
fail_many = mock_overload_error.copy()
fail_many["mode"] = {"times": 1}
async with self.fail_point(fail_many):
await db.command("find", "t")

# Ensure command stops retrying when there are no tokens left.
fail_too_many = mock_overload_error.copy()
fail_too_many["mode"] = {"times": 2}
async with self.fail_point(fail_too_many):
with self.assertRaises(PyMongoError) as error:
await db.command("find", "t")

self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))


class TestRetryPolicy(AsyncPyMongoTestCase):
async def test_retry_policy(self):
capacity = 10
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity))
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity), adaptive_retry=True)
self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES)
self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL)
self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX)
Expand Down Expand Up @@ -300,6 +277,74 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func):
# runs.
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1)

@async_client_context.require_failCommand_appName
async def test_03_overload_retries_limited(self):
# Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times.

# 1. Let `client` be a `MongoClient`.
client = self.client
# 2. Let `coll` be a collection.
coll = client.pymongo_test.coll

# 3. Configure the following failpoint:
failpoint = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 462, # IngressRequestRateLimitExceeded
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}

# 4. Perform a find operation with `coll` that fails.
async with self.fail_point(failpoint):
with self.assertRaises(PyMongoError) as error:
await coll.find_one({})

# 5. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))

# 6. Assert that the total number of started commands is MAX_RETRIES + 1.
self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1)

@async_client_context.require_failCommand_appName
async def test_04_adaptive_retries_limited_by_tokens(self):
# Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket.

# 1. Let `client` be a `MongoClient` with adaptiveRetries=True.
client = await self.async_rs_or_single_client(
adaptive_retries=True, event_listeners=[self.listener]
)
# 2. Set `client`'s retry token bucket to have 2 tokens.
client._retry_policy.token_bucket.tokens = 2
# 3. Let `coll` be a collection.
coll = client.pymongo_test.coll

# 4. Configure the following failpoint:
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 3},
"data": {
"failCommands": ["find"],
"errorCode": 462, # IngressRequestRateLimitExceeded
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}

# 5. Perform a find operation with `coll` that fails.
async with self.fail_point(failpoint):
with self.assertRaises(PyMongoError) as error:
await coll.find_one({})

# 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
self.assertIn("RetryableError", str(error.exception))
self.assertIn("SystemOverloadedError", str(error.exception))

# 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
self.assertEqual(len(self.listener.started_events), 3)


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
15 changes: 15 additions & 0 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,21 @@ def test_detected_environment_warning(self, mock_get_hosts):
with self.assertWarns(UserWarning):
self.simple_client(multi_host)

def test_adaptive_retries(self):
# Assert that adaptive retries are disabled by default.
c = self.simple_client(connect=False)
self.assertFalse(c.options.adaptive_retries)

# Assert that adaptive retries can be enabled through connection or client options.
c = self.simple_client(connect=False, adaptive_retries=True)
self.assertTrue(c.options.adaptive_retries)

c = self.simple_client(connect=False, adaptiveRetries=True)
self.assertTrue(c.options.adaptive_retries)

c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False)
self.assertTrue(c.options.adaptive_retries)


class TestClient(IntegrationTest):
def test_multiple_uris(self):
Expand Down
Loading
Loading