Skip to content

Commit 463c2c6

Browse files
authored
Merge pull request #675 from ably/release/2.1.4
Release/2.1.4
2 parents 70dd615 + 536d2b0 commit 463c2c6

6 files changed

Lines changed: 125 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Change Log
22

3+
## [v2.1.4](https://github.com/ably/ably-python/tree/v2.1.4)
4+
5+
[Full Changelog](https://github.com/ably/ably-python/compare/v2.1.3...v2.1.4)
6+
7+
### What's Changed
8+
9+
- Fixed handling of normal WebSocket close frames and improved reconnection logic [#672](https://github.com/ably/ably-python/pull/672)
10+
311
## [v2.1.3](https://github.com/ably/ably-python/tree/v2.1.3)
412

513
[Full Changelog](https://github.com/ably/ably-python/compare/v2.1.2...v2.1.3)

ably/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
logger.addHandler(logging.NullHandler())
1717

1818
api_version = '3'
19-
lib_version = '2.1.3'
19+
lib_version = '2.1.4'

ably/transport/websockettransport.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic
6666
def connect(self):
6767
headers = HttpUtils.default_headers()
6868
query_params = urllib.parse.urlencode(self.params)
69-
ws_url = (f'wss://{self.host}?{query_params}')
69+
scheme = 'wss' if self.options.tls else 'ws'
70+
ws_url = f'{scheme}://{self.host}?{query_params}'
7071
log.info(f'connect(): attempting to connect to {ws_url}')
7172
self.ws_connect_task = asyncio.create_task(self.ws_connect(ws_url, headers))
7273
self.ws_connect_task.add_done_callback(self.on_ws_connect_done)
@@ -110,6 +111,11 @@ async def _handle_websocket_connection(self, ws_url, websocket):
110111
if not self.is_disposed:
111112
await self.dispose()
112113
self.connection_manager.deactivate_transport(err)
114+
else:
115+
# Read loop exited normally (e.g., server sent normal WS close frame)
116+
if not self.is_disposed:
117+
await self.dispose()
118+
self.connection_manager.deactivate_transport()
113119

114120
async def on_protocol_message(self, msg):
115121
self.on_activity()
@@ -214,8 +220,9 @@ async def send(self, message: dict):
214220
await self.websocket.send(raw_msg)
215221

216222
def set_idle_timer(self, timeout: float):
217-
if not self.idle_timer:
218-
self.idle_timer = Timer(timeout, self.on_idle_timer_expire)
223+
if self.idle_timer:
224+
self.idle_timer.cancel()
225+
self.idle_timer = Timer(timeout, self.on_idle_timer_expire)
219226

220227
async def on_idle_timer_expire(self):
221228
self.idle_timer = None

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "ably"
3-
version = "2.1.3"
3+
version = "2.1.4"
44
description = "Python REST and Realtime client library SDK for Ably realtime messaging service"
55
readme = "LONG_DESCRIPTION.rst"
66
requires-python = ">=3.7"

test/ably/realtime/realtimeconnection_test.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import asyncio
22

33
import pytest
4+
from websockets import connect as _ws_connect
5+
6+
try:
7+
# websockets 15+ preferred import
8+
from websockets.asyncio.server import serve as ws_serve
9+
except ImportError:
10+
# websockets 14 and earlier fallback
11+
from websockets.server import serve as ws_serve
412

513
from ably.realtime.connection import ConnectionEvent, ConnectionState
614
from ably.transport.defaults import Defaults
@@ -10,6 +18,68 @@
1018
from test.ably.utils import BaseAsyncTestCase
1119

1220

21+
async def _relay(src, dst):
22+
try:
23+
async for msg in src:
24+
await dst.send(msg)
25+
except Exception:
26+
pass
27+
28+
29+
class WsProxy:
30+
"""Local WS proxy that forwards to real Ably and lets tests trigger a normal close."""
31+
32+
def __init__(self, target_host: str):
33+
self.target_host = target_host
34+
self.server = None
35+
self.port: int | None = None
36+
self._close_event: asyncio.Event | None = None
37+
38+
async def _handler(self, client_ws):
39+
# Create a fresh event for this connection; signal to drop the connection cleanly
40+
self._close_event = asyncio.Event()
41+
path = client_ws.request.path # e.g. "/?key=...&format=json"
42+
target_url = f"wss://{self.target_host}{path}"
43+
try:
44+
async with _ws_connect(target_url, ping_interval=None) as server_ws:
45+
c2s = asyncio.create_task(_relay(client_ws, server_ws))
46+
s2c = asyncio.create_task(_relay(server_ws, client_ws))
47+
close_task = asyncio.create_task(self._close_event.wait())
48+
try:
49+
await asyncio.wait([c2s, s2c, close_task], return_when=asyncio.FIRST_COMPLETED)
50+
finally:
51+
c2s.cancel()
52+
s2c.cancel()
53+
close_task.cancel()
54+
except Exception:
55+
pass
56+
# After _handler returns the websockets server sends a normal close frame (1000)
57+
58+
async def close_active_connection(self):
59+
"""Trigger a normal WS close (code 1000) on the currently active client connection.
60+
61+
Signals the handler to exit; the websockets server framework then sends the
62+
close frame automatically when the handler coroutine returns.
63+
"""
64+
if self._close_event:
65+
self._close_event.set()
66+
67+
@property
68+
def endpoint(self) -> str:
69+
"""Endpoint string to pass to AblyRealtime (combine with tls=False)."""
70+
return f"127.0.0.1:{self.port}"
71+
72+
async def __aenter__(self):
73+
self.server = await ws_serve(self._handler, "127.0.0.1", 0, ping_interval=None)
74+
self.port = self.server.sockets[0].getsockname()[1]
75+
return self
76+
77+
async def __aexit__(self, *args):
78+
if self.server:
79+
self.server.close()
80+
await self.server.wait_closed()
81+
82+
1383
class TestRealtimeConnection(BaseAsyncTestCase):
1484
async def asyncSetUp(self):
1585
self.test_vars = await TestApp.get_test_vars()
@@ -399,3 +469,37 @@ async def on_protocol_message(msg):
399469
await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5)
400470

401471
await ably.close()
472+
473+
async def test_normal_ws_close_triggers_immediate_reconnection(self):
474+
"""Server normal WS close (code 1000) must trigger immediate reconnection.
475+
476+
Regression test: ConnectionClosedOK was silently swallowed and deactivate_transport
477+
was never called, leaving the client disconnected until the idle timer fired.
478+
"""
479+
async with WsProxy(self.test_vars["host"]) as proxy:
480+
ably = await TestApp.get_ably_realtime(
481+
disconnected_retry_timeout=500_000,
482+
suspended_retry_timeout=500_000,
483+
tls=False,
484+
realtime_host=proxy.endpoint,
485+
)
486+
487+
try:
488+
await asyncio.wait_for(
489+
ably.connection.once_async(ConnectionState.CONNECTED), timeout=10
490+
)
491+
492+
# Simulate server sending a normal WS close frame
493+
await proxy.close_active_connection()
494+
495+
# Must go CONNECTING quickly — not after the 25 s idle timer
496+
await asyncio.wait_for(
497+
ably.connection.once_async(ConnectionState.CONNECTING), timeout=1
498+
)
499+
500+
# Must reconnect immediately — not after the 500 s retry timer
501+
await asyncio.wait_for(
502+
ably.connection.once_async(ConnectionState.CONNECTED), timeout=10
503+
)
504+
finally:
505+
await ably.close()

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)