From e666b895d6aada5b57dbe50f2d6dc07c39f7e49e Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 20 Feb 2026 12:48:47 -0500 Subject: [PATCH 1/5] PYTHON-5528 - Token buckets are opt-in only --- pymongo/asynchronous/helpers.py | 11 +-- pymongo/asynchronous/mongo_client.py | 15 +++- pymongo/client_options.py | 13 +++ pymongo/common.py | 5 ++ pymongo/synchronous/helpers.py | 11 +-- pymongo/synchronous/mongo_client.py | 15 +++- test/asynchronous/test_client.py | 15 ++++ test/asynchronous/test_client_backpressure.py | 88 ++++++++++++++----- test/test_client.py | 15 ++++ test/test_client_backpressure.py | 88 ++++++++++++++----- 10 files changed, 216 insertions(+), 60 deletions(-) diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index cc9fbfb2fc..2c01c19b7a 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -79,7 +79,6 @@ async def inner(*args: Any, **kwargs: Any) -> Any: _MAX_RETRIES = 5 _BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 -# DRIVERS-3240 will determine these defaults. DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 DEFAULT_RETRY_TOKEN_RETURN = 0.1 @@ -101,7 +100,6 @@ def __init__( ): self.lock = _async_create_lock() self.capacity = capacity - # DRIVERS-3240 will determine how full the bucket should start. self.tokens = capacity self.return_rate = return_rate @@ -123,7 +121,7 @@ async def deposit(self, retry: bool = False) -> None: class _RetryPolicy: """A retry limiter that performs exponential backoff with jitter. - Retry attempts are limited by a token bucket to prevent overwhelming the server during + When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during a prolonged outage or high load. """ @@ -133,15 +131,18 @@ def __init__( attempts: int = _MAX_RETRIES, backoff_initial: float = _BACKOFF_INITIAL, backoff_max: float = _BACKOFF_MAX, + adaptive_retry: bool = False, ): self.token_bucket = token_bucket self.attempts = attempts self.backoff_initial = backoff_initial self.backoff_max = backoff_max + self.adaptive_retry = adaptive_retry async def record_success(self, retry: bool) -> None: """Record a successful operation.""" - await self.token_bucket.deposit(retry) + if self.adaptive_retry: + await self.token_bucket.deposit(retry) def backoff(self, attempt: int) -> float: """Return the backoff duration for the given .""" @@ -158,7 +159,7 @@ async def should_retry(self, attempt: int, delay: float) -> bool: return False # Check token bucket last since we only want to consume a token if we actually retry. - if not await self.token_bucket.consume(): + if self.adaptive_retry and not await self.token_bucket.consume(): # DRIVERS-3246 Improve diagnostics when this case happens. # We could add info to the exception and log. return False diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index adab640fd2..17c8fb623e 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -615,8 +615,18 @@ def __init__( client to use Stable API. See `versioned API `_ for details. + | **Adaptive retry options:** + | (If not enabled explicitly, adaptive retries will not be enabled.) + + - `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client. + If enabled, server overload errors will use a token-bucket based system to mitigate further overload. + Defaults to ``False``. + .. seealso:: The MongoDB documentation on `connections `_. + .. versionchanged:: 4.XX + Added the ``adaptive_retries`` URI and keyword argument. + .. versionchanged:: 4.5 Added the ``serverMonitoringMode`` keyword argument. @@ -778,7 +788,6 @@ def __init__( self._timeout: float | None = None self._topology_settings: TopologySettings = None # type: ignore[assignment] self._event_listeners: _EventListeners | None = None - self._retry_policy = _RetryPolicy(_TokenBucket()) # _pool_class, _monitor_class, and _condition_class are for deep # customization of PyMongo, e.g. Motor. @@ -890,6 +899,10 @@ def __init__( self._opened = False self._closed = False self._loop: Optional[asyncio.AbstractEventLoop] = None + + self._retry_policy = _RetryPolicy( + _TokenBucket(), adaptive_retry=self._options.adaptive_retries + ) if not is_srv: self._init_background() diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 8b4eea7e65..1e488c2b8f 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -235,6 +235,11 @@ def __init__( self.__server_monitoring_mode = options.get( "servermonitoringmode", common.SERVER_MONITORING_MODE ) + self.__adaptive_retries = ( + options.get("adaptive_retries", common.ADAPTIVE_RETRIES) + if "adaptive_retries" in options + else options.get("adaptiveretries", common.ADAPTIVE_RETRIES) + ) @property def _options(self) -> Mapping[str, Any]: @@ -346,3 +351,11 @@ def server_monitoring_mode(self) -> str: .. versionadded:: 4.5 """ return self.__server_monitoring_mode + + @property + def adaptive_retries(self) -> bool: + """The configured adaptiveRetries option. + + .. versionadded:: 4.XX + """ + return self.__adaptive_retries diff --git a/pymongo/common.py b/pymongo/common.py index e23adac426..8b9797682f 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -140,6 +140,9 @@ # Default value for serverMonitoringMode SERVER_MONITORING_MODE = "auto" # poll/stream/auto +# Default value for adaptiveRetries +ADAPTIVE_RETRIES = False + # Auth mechanism properties that must raise an error instead of warning if they invalidate. _MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"] @@ -738,6 +741,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str: "srvmaxhosts": validate_non_negative_integer, "timeoutms": validate_timeoutms, "servermonitoringmode": validate_server_monitoring_mode, + "adaptiveretries": validate_boolean_or_string, } # Dictionary where keys are the names of URI options specific to pymongo, @@ -771,6 +775,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str: "server_selector": validate_is_callable_or_none, "auto_encryption_opts": validate_auto_encryption_opts_or_none, "authoidcallowedhosts": validate_list, + "adaptive_retries": validate_boolean_or_string, } # Dictionary where keys are any URI option name, and values are the diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index 9d93f9c47f..1a27fc11a5 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -79,7 +79,6 @@ def inner(*args: Any, **kwargs: Any) -> Any: _MAX_RETRIES = 5 _BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 -# DRIVERS-3240 will determine these defaults. DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 DEFAULT_RETRY_TOKEN_RETURN = 0.1 @@ -101,7 +100,6 @@ def __init__( ): self.lock = _create_lock() self.capacity = capacity - # DRIVERS-3240 will determine how full the bucket should start. self.tokens = capacity self.return_rate = return_rate @@ -123,7 +121,7 @@ def deposit(self, retry: bool = False) -> None: class _RetryPolicy: """A retry limiter that performs exponential backoff with jitter. - Retry attempts are limited by a token bucket to prevent overwhelming the server during + When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during a prolonged outage or high load. """ @@ -133,15 +131,18 @@ def __init__( attempts: int = _MAX_RETRIES, backoff_initial: float = _BACKOFF_INITIAL, backoff_max: float = _BACKOFF_MAX, + adaptive_retry: bool = False, ): self.token_bucket = token_bucket self.attempts = attempts self.backoff_initial = backoff_initial self.backoff_max = backoff_max + self.adaptive_retry = adaptive_retry def record_success(self, retry: bool) -> None: """Record a successful operation.""" - self.token_bucket.deposit(retry) + if self.adaptive_retry: + self.token_bucket.deposit(retry) def backoff(self, attempt: int) -> float: """Return the backoff duration for the given .""" @@ -158,7 +159,7 @@ def should_retry(self, attempt: int, delay: float) -> bool: return False # Check token bucket last since we only want to consume a token if we actually retry. - if not self.token_bucket.consume(): + if self.adaptive_retry and not self.token_bucket.consume(): # DRIVERS-3246 Improve diagnostics when this case happens. # We could add info to the exception and log. return False diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 4e3d178f89..55151629cd 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -615,8 +615,18 @@ def __init__( client to use Stable API. See `versioned API `_ for details. + | **Adaptive retry options:** + | (If not enabled explicitly, adaptive retries will not be enabled.) + + - `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client. + If enabled, server overload errors will use a token-bucket based system to mitigate further overload. + Defaults to ``False``. + .. seealso:: The MongoDB documentation on `connections `_. + .. versionchanged:: 4.XX + Added the ``adaptive_retries`` URI and keyword argument. + .. versionchanged:: 4.5 Added the ``serverMonitoringMode`` keyword argument. @@ -778,7 +788,6 @@ def __init__( self._timeout: float | None = None self._topology_settings: TopologySettings = None # type: ignore[assignment] self._event_listeners: _EventListeners | None = None - self._retry_policy = _RetryPolicy(_TokenBucket()) # _pool_class, _monitor_class, and _condition_class are for deep # customization of PyMongo, e.g. Motor. @@ -890,6 +899,10 @@ def __init__( self._opened = False self._closed = False self._loop: Optional[asyncio.AbstractEventLoop] = None + + self._retry_policy = _RetryPolicy( + _TokenBucket(), adaptive_retry=self._options.adaptive_retries + ) if not is_srv: self._init_background() diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 5511765bae..f3cee70e15 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -652,6 +652,21 @@ async def test_detected_environment_warning(self, mock_get_hosts): with self.assertWarns(UserWarning): self.simple_client(multi_host) + async def test_adaptive_retries(self): + # Assert that adaptive retries are disabled by default. + c = self.simple_client(connect=False) + self.assertFalse(c.options.adaptive_retries) + + # Assert that adaptive retries can be enabled through connection or client options. + c = self.simple_client(connect=False, adaptive_retries=True) + self.assertTrue(c.options.adaptive_retries) + + c = self.simple_client(connect=False, adaptiveRetries=True) + self.assertTrue(c.options.adaptive_retries) + + c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False) + self.assertTrue(c.options.adaptive_retries) + class TestClient(AsyncIntegrationTest): def test_multiple_uris(self): diff --git a/test/asynchronous/test_client_backpressure.py b/test/asynchronous/test_client_backpressure.py index c82d84e181..12084d9995 100644 --- a/test/asynchronous/test_client_backpressure.py +++ b/test/asynchronous/test_client_backpressure.py @@ -168,34 +168,11 @@ async def test_retry_overload_error_getMore(self): self.assertIn("RetryableError", str(error.exception)) self.assertIn("SystemOverloadedError", str(error.exception)) - @async_client_context.require_failCommand_appName - async def test_limit_retry_command(self): - client = await self.async_rs_or_single_client() - client._retry_policy.token_bucket.tokens = 1 - db = client.pymongo_test - await db.t.insert_one({"x": 1}) - - # Ensure command is retried once overload error. - fail_many = mock_overload_error.copy() - fail_many["mode"] = {"times": 1} - async with self.fail_point(fail_many): - await db.command("find", "t") - - # Ensure command stops retrying when there are no tokens left. - fail_too_many = mock_overload_error.copy() - fail_too_many["mode"] = {"times": 2} - async with self.fail_point(fail_too_many): - with self.assertRaises(PyMongoError) as error: - await db.command("find", "t") - - self.assertIn("RetryableError", str(error.exception)) - self.assertIn("SystemOverloadedError", str(error.exception)) - class TestRetryPolicy(AsyncPyMongoTestCase): async def test_retry_policy(self): capacity = 10 - retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) + retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity), adaptive_retry=True) self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) @@ -300,6 +277,69 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func): # runs. self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1) + @async_client_context.require_failCommand_appName + async def test_02_overload_retries_limited(self): + # Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times. + + # 1. Let `client` be a `MongoClient`. + client = self.client + # 3. Let `coll` be a collection. + coll = client.pymongo_test.coll + + # 4. Configure the following failpoint: + failpoint = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["find"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + # 5. Perform a find operation with `coll` that fails. + async with self.fail_point(failpoint): + with self.assertRaises(PyMongoError) as error: + await coll.find_one({}) + + # 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. + self.assertIn("RetryableError", str(error.exception)) + self.assertIn("SystemOverloadedError", str(error.exception)) + + # 7. Assert that the total number of started commands is MAX_RETRIES + 1. + self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1) + + @async_client_context.require_failCommand_appName + async def test_03_adaptive_retries_limited_by_tokens(self): + # Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket. + + # 1. Let `client` be a `MongoClient` with adaptiveRetries=True. + client = await self.async_rs_or_single_client(adaptive_retries=True) + # 2. Set `client`'s retry token bucket to have 1 token. + client._retry_policy.token_bucket.tokens = 1 + # 3. Let `coll` be a collection. + coll = client.pymongo_test.coll + + # 4. Configure the following failpoint: + failpoint = { + "configureFailPoint": "failCommand", + "mode": {"times": 2}, + "data": { + "failCommands": ["find", "insert", "update"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + # 5. Perform a find operation with `coll` that fails. + async with self.fail_point(failpoint): + with self.assertRaises(PyMongoError) as error: + await coll.find_one({}) + + # 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. + self.assertIn("RetryableError", str(error.exception)) + self.assertIn("SystemOverloadedError", str(error.exception)) + # Location of JSON test specifications. if _IS_SYNC: diff --git a/test/test_client.py b/test/test_client.py index 737b3afe60..9cfa36b2cd 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -645,6 +645,21 @@ def test_detected_environment_warning(self, mock_get_hosts): with self.assertWarns(UserWarning): self.simple_client(multi_host) + def test_adaptive_retries(self): + # Assert that adaptive retries are disabled by default. + c = self.simple_client(connect=False) + self.assertFalse(c.options.adaptive_retries) + + # Assert that adaptive retries can be enabled through connection or client options. + c = self.simple_client(connect=False, adaptive_retries=True) + self.assertTrue(c.options.adaptive_retries) + + c = self.simple_client(connect=False, adaptiveRetries=True) + self.assertTrue(c.options.adaptive_retries) + + c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False) + self.assertTrue(c.options.adaptive_retries) + class TestClient(IntegrationTest): def test_multiple_uris(self): diff --git a/test/test_client_backpressure.py b/test/test_client_backpressure.py index 40ea5eb8e1..347a5cd08d 100644 --- a/test/test_client_backpressure.py +++ b/test/test_client_backpressure.py @@ -168,34 +168,11 @@ def test_retry_overload_error_getMore(self): self.assertIn("RetryableError", str(error.exception)) self.assertIn("SystemOverloadedError", str(error.exception)) - @client_context.require_failCommand_appName - def test_limit_retry_command(self): - client = self.rs_or_single_client() - client._retry_policy.token_bucket.tokens = 1 - db = client.pymongo_test - db.t.insert_one({"x": 1}) - - # Ensure command is retried once overload error. - fail_many = mock_overload_error.copy() - fail_many["mode"] = {"times": 1} - with self.fail_point(fail_many): - db.command("find", "t") - - # Ensure command stops retrying when there are no tokens left. - fail_too_many = mock_overload_error.copy() - fail_too_many["mode"] = {"times": 2} - with self.fail_point(fail_too_many): - with self.assertRaises(PyMongoError) as error: - db.command("find", "t") - - self.assertIn("RetryableError", str(error.exception)) - self.assertIn("SystemOverloadedError", str(error.exception)) - class TestRetryPolicy(PyMongoTestCase): def test_retry_policy(self): capacity = 10 - retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) + retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity), adaptive_retry=True) self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) @@ -300,6 +277,69 @@ def test_01_operation_retry_uses_exponential_backoff(self, random_func): # runs. self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1) + @client_context.require_failCommand_appName + def test_02_overload_retries_limited(self): + # Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times. + + # 1. Let `client` be a `MongoClient`. + client = self.client + # 3. Let `coll` be a collection. + coll = client.pymongo_test.coll + + # 4. Configure the following failpoint: + failpoint = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["find"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + # 5. Perform a find operation with `coll` that fails. + with self.fail_point(failpoint): + with self.assertRaises(PyMongoError) as error: + coll.find_one({}) + + # 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. + self.assertIn("RetryableError", str(error.exception)) + self.assertIn("SystemOverloadedError", str(error.exception)) + + # 7. Assert that the total number of started commands is MAX_RETRIES + 1. + self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1) + + @client_context.require_failCommand_appName + def test_03_adaptive_retries_limited_by_tokens(self): + # Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket. + + # 1. Let `client` be a `MongoClient` with adaptiveRetries=True. + client = self.rs_or_single_client(adaptive_retries=True) + # 2. Set `client`'s retry token bucket to have 1 token. + client._retry_policy.token_bucket.tokens = 1 + # 3. Let `coll` be a collection. + coll = client.pymongo_test.coll + + # 4. Configure the following failpoint: + failpoint = { + "configureFailPoint": "failCommand", + "mode": {"times": 2}, + "data": { + "failCommands": ["find", "insert", "update"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + # 5. Perform a find operation with `coll` that fails. + with self.fail_point(failpoint): + with self.assertRaises(PyMongoError) as error: + coll.find_one({}) + + # 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. + self.assertIn("RetryableError", str(error.exception)) + self.assertIn("SystemOverloadedError", str(error.exception)) + # Location of JSON test specifications. if _IS_SYNC: From e0d1267c005e394d2b1544da2f32ac30db4309cd Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 20 Feb 2026 13:48:22 -0500 Subject: [PATCH 2/5] Add prose tests --- test/asynchronous/test_client_backpressure.py | 27 +++++++++++-------- test/test_client_backpressure.py | 25 +++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/test/asynchronous/test_client_backpressure.py b/test/asynchronous/test_client_backpressure.py index 12084d9995..bf139105d9 100644 --- a/test/asynchronous/test_client_backpressure.py +++ b/test/asynchronous/test_client_backpressure.py @@ -278,15 +278,15 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func): self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1) @async_client_context.require_failCommand_appName - async def test_02_overload_retries_limited(self): + async def test_03_overload_retries_limited(self): # Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times. # 1. Let `client` be a `MongoClient`. client = self.client - # 3. Let `coll` be a collection. + # 2. Let `coll` be a collection. coll = client.pymongo_test.coll - # 4. Configure the following failpoint: + # 3. Configure the following failpoint: failpoint = { "configureFailPoint": "failCommand", "mode": "alwaysOn", @@ -297,16 +297,16 @@ async def test_02_overload_retries_limited(self): }, } - # 5. Perform a find operation with `coll` that fails. + # 4. Perform a find operation with `coll` that fails. async with self.fail_point(failpoint): with self.assertRaises(PyMongoError) as error: await coll.find_one({}) - # 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. + # 5. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. self.assertIn("RetryableError", str(error.exception)) self.assertIn("SystemOverloadedError", str(error.exception)) - # 7. Assert that the total number of started commands is MAX_RETRIES + 1. + # 6. Assert that the total number of started commands is MAX_RETRIES + 1. self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1) @async_client_context.require_failCommand_appName @@ -314,18 +314,20 @@ async def test_03_adaptive_retries_limited_by_tokens(self): # Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket. # 1. Let `client` be a `MongoClient` with adaptiveRetries=True. - client = await self.async_rs_or_single_client(adaptive_retries=True) - # 2. Set `client`'s retry token bucket to have 1 token. - client._retry_policy.token_bucket.tokens = 1 + client = await self.async_rs_or_single_client( + adaptive_retries=True, event_listeners=[self.listener] + ) + # 2. Set `client`'s retry token bucket to have 2 tokens. + client._retry_policy.token_bucket.tokens = 2 # 3. Let `coll` be a collection. coll = client.pymongo_test.coll # 4. Configure the following failpoint: failpoint = { "configureFailPoint": "failCommand", - "mode": {"times": 2}, + "mode": {"times": 3}, "data": { - "failCommands": ["find", "insert", "update"], + "failCommands": ["find"], "errorCode": 462, # IngressRequestRateLimitExceeded "errorLabels": ["RetryableError", "SystemOverloadedError"], }, @@ -340,6 +342,9 @@ async def test_03_adaptive_retries_limited_by_tokens(self): self.assertIn("RetryableError", str(error.exception)) self.assertIn("SystemOverloadedError", str(error.exception)) + # 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries. + self.assertEqual(len(self.listener.started_events), 3) + # Location of JSON test specifications. if _IS_SYNC: diff --git a/test/test_client_backpressure.py b/test/test_client_backpressure.py index 347a5cd08d..2219b4f4b0 100644 --- a/test/test_client_backpressure.py +++ b/test/test_client_backpressure.py @@ -278,15 +278,15 @@ def test_01_operation_retry_uses_exponential_backoff(self, random_func): self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1) @client_context.require_failCommand_appName - def test_02_overload_retries_limited(self): + def test_03_overload_retries_limited(self): # Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times. # 1. Let `client` be a `MongoClient`. client = self.client - # 3. Let `coll` be a collection. + # 2. Let `coll` be a collection. coll = client.pymongo_test.coll - # 4. Configure the following failpoint: + # 3. Configure the following failpoint: failpoint = { "configureFailPoint": "failCommand", "mode": "alwaysOn", @@ -297,16 +297,16 @@ def test_02_overload_retries_limited(self): }, } - # 5. Perform a find operation with `coll` that fails. + # 4. Perform a find operation with `coll` that fails. with self.fail_point(failpoint): with self.assertRaises(PyMongoError) as error: coll.find_one({}) - # 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. + # 5. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels. self.assertIn("RetryableError", str(error.exception)) self.assertIn("SystemOverloadedError", str(error.exception)) - # 7. Assert that the total number of started commands is MAX_RETRIES + 1. + # 6. Assert that the total number of started commands is MAX_RETRIES + 1. self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1) @client_context.require_failCommand_appName @@ -314,18 +314,18 @@ def test_03_adaptive_retries_limited_by_tokens(self): # Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket. # 1. Let `client` be a `MongoClient` with adaptiveRetries=True. - client = self.rs_or_single_client(adaptive_retries=True) - # 2. Set `client`'s retry token bucket to have 1 token. - client._retry_policy.token_bucket.tokens = 1 + client = self.rs_or_single_client(adaptive_retries=True, event_listeners=[self.listener]) + # 2. Set `client`'s retry token bucket to have 2 tokens. + client._retry_policy.token_bucket.tokens = 2 # 3. Let `coll` be a collection. coll = client.pymongo_test.coll # 4. Configure the following failpoint: failpoint = { "configureFailPoint": "failCommand", - "mode": {"times": 2}, + "mode": {"times": 3}, "data": { - "failCommands": ["find", "insert", "update"], + "failCommands": ["find"], "errorCode": 462, # IngressRequestRateLimitExceeded "errorLabels": ["RetryableError", "SystemOverloadedError"], }, @@ -340,6 +340,9 @@ def test_03_adaptive_retries_limited_by_tokens(self): self.assertIn("RetryableError", str(error.exception)) self.assertIn("SystemOverloadedError", str(error.exception)) + # 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries. + self.assertEqual(len(self.listener.started_events), 3) + # Location of JSON test specifications. if _IS_SYNC: From f2d4be7f7da170ace39af4e0f77e284e48768b13 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 24 Feb 2026 08:14:01 -0500 Subject: [PATCH 3/5] Update test/asynchronous/test_client_backpressure.py Co-authored-by: Sergey Zelenov --- test/asynchronous/test_client_backpressure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/asynchronous/test_client_backpressure.py b/test/asynchronous/test_client_backpressure.py index bf139105d9..3100d5064e 100644 --- a/test/asynchronous/test_client_backpressure.py +++ b/test/asynchronous/test_client_backpressure.py @@ -310,7 +310,7 @@ async def test_03_overload_retries_limited(self): self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1) @async_client_context.require_failCommand_appName - async def test_03_adaptive_retries_limited_by_tokens(self): + async def test_04_adaptive_retries_limited_by_tokens(self): # Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket. # 1. Let `client` be a `MongoClient` with adaptiveRetries=True. From f5cfe0a96ef7b0a8587cdbc3892a4a07b55e38b5 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 24 Feb 2026 08:14:08 -0500 Subject: [PATCH 4/5] Update test/test_client_backpressure.py Co-authored-by: Sergey Zelenov --- test/test_client_backpressure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_client_backpressure.py b/test/test_client_backpressure.py index 2219b4f4b0..f3146c9450 100644 --- a/test/test_client_backpressure.py +++ b/test/test_client_backpressure.py @@ -310,7 +310,7 @@ def test_03_overload_retries_limited(self): self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1) @client_context.require_failCommand_appName - def test_03_adaptive_retries_limited_by_tokens(self): + def test_04_adaptive_retries_limited_by_tokens(self): # Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket. # 1. Let `client` be a `MongoClient` with adaptiveRetries=True. From a5fe7570017ed988fe95c7e766daae2e02b5f23b Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 25 Feb 2026 14:53:22 -0800 Subject: [PATCH 5/5] Fix adaptiveRetries versionchanged --- pymongo/asynchronous/mongo_client.py | 2 +- pymongo/synchronous/mongo_client.py | 2 +- .../client-backpressure-options.json | 35 +++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 test/uri_options/client-backpressure-options.json diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 17c8fb623e..a7ca7b0144 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -624,7 +624,7 @@ def __init__( .. seealso:: The MongoDB documentation on `connections `_. - .. versionchanged:: 4.XX + .. versionchanged:: 4.17 Added the ``adaptive_retries`` URI and keyword argument. .. versionchanged:: 4.5 diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 55151629cd..36f432c67d 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -624,7 +624,7 @@ def __init__( .. seealso:: The MongoDB documentation on `connections `_. - .. versionchanged:: 4.XX + .. versionchanged:: 4.17 Added the ``adaptive_retries`` URI and keyword argument. .. versionchanged:: 4.5 diff --git a/test/uri_options/client-backpressure-options.json b/test/uri_options/client-backpressure-options.json new file mode 100644 index 0000000000..3fcf2c86b0 --- /dev/null +++ b/test/uri_options/client-backpressure-options.json @@ -0,0 +1,35 @@ +{ + "tests": [ + { + "description": "adaptiveRetries=true is parsed correctly", + "uri": "mongodb://example.com/?adaptiveRetries=true", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "adaptiveRetries": true + } + }, + { + "description": "adaptiveRetries=false is parsed correctly", + "uri": "mongodb://example.com/?adaptiveRetries=false", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "adaptiveRetries": false + } + }, + { + "description": "adaptiveRetries with invalid value causes a warning", + "uri": "mongodb://example.com/?adaptiveRetries=invalid", + "valid": true, + "warning": true, + "hosts": null, + "auth": null, + "options": null + } + ] +}