Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ def _try_libev_import():
except DependencyException as e:
return (None, e)

def _try_asyncio_import():
try:
from cassandra.io.asyncioreactor import AsyncioConnection
return (AsyncioConnection, None)
except (ImportError, DependencyException) as e:
return (None, e)

def _try_asyncore_import():
try:
from cassandra.io.asyncorereactor import AsyncoreConnection
Expand All @@ -168,7 +175,7 @@ def _connection_reduce_fn(val,import_fn):

log = logging.getLogger(__name__)

conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import)
conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncio_import, _try_asyncore_import)
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
if not conn_class:
raise DependencyException("Unable to load a default connection class", excs)
Expand Down Expand Up @@ -876,25 +883,21 @@ def default_retry_policy(self, policy):
This determines what event loop system will be used for managing
I/O with Cassandra. These are the current options:

* :class:`cassandra.io.asyncorereactor.AsyncoreConnection`
* :class:`cassandra.io.libevreactor.LibevConnection`
* :class:`cassandra.io.asyncioreactor.AsyncioConnection`
* :class:`cassandra.io.asyncorereactor.AsyncoreConnection` (Python < 3.12 only)
* :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details)
* :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details)
* :class:`cassandra.io.twistedreactor.TwistedConnection`
* EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection`

By default, ``AsyncoreConnection`` will be used, which uses
the ``asyncore`` module in the Python standard library.

If ``libev`` is installed, ``LibevConnection`` will be used instead.

If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding
connection class will be used automatically.
The default is selected automatically using the following priority:

``AsyncioConnection``, which uses the ``asyncio`` module in the Python
standard library, is also available, but currently experimental. Note that
it requires ``asyncio`` features that were only introduced in the 3.4 line
in 3.4.6, and in the 3.5 line in 3.5.1.
1. If ``gevent`` or ``eventlet`` monkey-patching is detected, the
corresponding connection class will be used.
2. If the ``libev`` C extension is available, ``LibevConnection`` is used.
3. ``AsyncioConnection`` is used as the standard-library fallback. This is
the preferred default on Python 3.12+ where ``asyncore`` was removed.
4. On Python < 3.12, ``AsyncoreConnection`` is used as a last resort.
"""

control_connection_timeout = 2.0
Expand Down
27 changes: 6 additions & 21 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,6 @@
log = logging.getLogger(__name__)


# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
# managed coroutines are generator-based, not native coroutines. See PEP 492:
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects


try:
asyncio.run_coroutine_threadsafe
except AttributeError:
raise ImportError(
'Cannot use asyncioreactor without access to '
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
)


class AsyncioTimer(object):
"""
An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
Expand Down Expand Up @@ -67,11 +52,12 @@ def finish(self):

class AsyncioConnection(Connection):
"""
An experimental implementation of :class:`.Connection` that uses the
``asyncio`` module in the Python standard library for its event loop.
An implementation of :class:`.Connection` that uses the ``asyncio``
module in the Python standard library for its event loop.

Note that it requires ``asyncio`` features that were only introduced in the
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
This is the preferred connection class on Python 3.12+ where the
``asyncore`` module has been removed. It is also used as a fallback
when the libev C extension is not available.
"""

_loop = None
Expand Down Expand Up @@ -109,7 +95,6 @@ def initialize_reactor(cls):
cls._loop = None
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not cls._loop_thread:
# daemonize so the loop will be shut down on interpreter
Expand Down Expand Up @@ -173,7 +158,7 @@ def push(self, data):

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
async with self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)

Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def is_monkey_patched():
return is_gevent_monkey_patched() or is_eventlet_monkey_patched()

MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False))
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "libev")
EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "asyncio")


# If set to to true this will force the Cython tests to run regardless of whether they are installed
Expand Down
32 changes: 28 additions & 4 deletions tests/unit/io/test_asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
AsyncioConnection, ASYNCIO_AVAILABLE = None, False
try:
from cassandra.io.asyncioreactor import AsyncioConnection
import asynctest
ASYNCIO_AVAILABLE = True
except (ImportError, SyntaxError):
AsyncioConnection = None
ASYNCIO_AVAILABLE = False

from tests import is_monkey_patched, connection_class
from tests.unit.io.utils import TimerCallback, TimerTestMixin
from tests.unit.io.utils import TimerCallback, TimerTestMixin, submit_and_wait_for_completion

from unittest.mock import patch
from unittest.mock import patch, MagicMock

import unittest
import time
Expand Down Expand Up @@ -56,7 +55,7 @@ def setUp(self):
socket_patcher.start()

old_selector = AsyncioConnection._loop._selector
AsyncioConnection._loop._selector = asynctest.TestSelector()
AsyncioConnection._loop._selector = MagicMock()

def reset_selector():
AsyncioConnection._loop._selector = old_selector
Expand All @@ -65,6 +64,31 @@ def reset_selector():

super(AsyncioTimerTests, self).setUp()

def test_multi_timer_validation(self):
"""
Override with a wider tolerance for asyncio's thread-based scheduling,
which has inherently more jitter than libev's native event loop.
"""
from tests.unit.io.utils import get_timeout
pending_callbacks = []
completed_callbacks = []

for gross_time in range(0, 100, 1):
timeout = get_timeout(gross_time, 0, 100, 100, False)
callback = TimerCallback(timeout)
self.create_timer(timeout, callback.invoke)
pending_callbacks.append(callback)

while len(pending_callbacks) != 0:
for callback in pending_callbacks:
if callback.was_invoked():
pending_callbacks.remove(callback)
completed_callbacks.append(callback)
time.sleep(.1)

for callback in completed_callbacks:
self.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.25)

def test_timer_cancellation(self):
# Various lists for tracking callback stage
timeout = .1
Expand Down