Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bumble/avrcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
Awaitable,
Callable,
Iterable,
Sequence,
Mapping,
Sequence,
)
from dataclasses import dataclass, field
from typing import ClassVar, SupportsBytes, TypeVar
Expand Down
153 changes: 62 additions & 91 deletions bumble/hfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import re
import traceback
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar
from typing import Any, ClassVar, Literal, overload

from typing_extensions import Self

Expand Down Expand Up @@ -420,61 +420,6 @@ class CmeError(enum.IntEnum):
# Hands-Free Control Interoperability Requirements
# -----------------------------------------------------------------------------

# Response codes.
RESPONSE_CODES = {
"+APLSIRI",
"+BAC",
"+BCC",
"+BCS",
"+BIA",
"+BIEV",
"+BIND",
"+BINP",
"+BLDN",
"+BRSF",
"+BTRH",
"+BVRA",
"+CCWA",
"+CHLD",
"+CHUP",
"+CIND",
"+CLCC",
"+CLIP",
"+CMEE",
"+CMER",
"+CNUM",
"+COPS",
"+IPHONEACCEV",
"+NREC",
"+VGM",
"+VGS",
"+VTS",
"+XAPL",
"A",
"D",
}

# Unsolicited responses and statuses.
UNSOLICITED_CODES = {
"+APLSIRI",
"+BCS",
"+BIND",
"+BSIR",
"+BTRH",
"+BVRA",
"+CCWA",
"+CIEV",
"+CLIP",
"+VGM",
"+VGS",
"BLACKLISTED",
"BUSY",
"DELAYED",
"NO ANSWER",
"NO CARRIER",
"RING",
}

# Status codes
STATUS_CODES = {
"+CME ERROR",
Expand Down Expand Up @@ -727,12 +672,9 @@ class HfLoopTermination(HfpProtocolError):

dlc: rfcomm.DLC
command_lock: asyncio.Lock
if TYPE_CHECKING:
response_queue: asyncio.Queue[AtResponse]
unsolicited_queue: asyncio.Queue[AtResponse | None]
else:
response_queue: asyncio.Queue
unsolicited_queue: asyncio.Queue
pending_command: str | None = None
response_queue: asyncio.Queue[AtResponse]
unsolicited_queue: asyncio.Queue[AtResponse | None]
read_buffer: bytearray
active_codec: AudioCodec

Expand Down Expand Up @@ -805,16 +747,39 @@ def _read_at(self, data: bytes):
self.read_buffer = self.read_buffer[trailer + 2 :]

# Forward the received code to the correct queue.
if self.command_lock.locked() and (
response.code in STATUS_CODES or response.code in RESPONSE_CODES
if self.pending_command and (
response.code in STATUS_CODES or response.code in self.pending_command
):
self.response_queue.put_nowait(response)
elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response)
else:
logger.warning(
f"dropping unexpected response with code '{response.code}'"
)
self.unsolicited_queue.put_nowait(response)

@overload
async def execute_command(
self,
cmd: str,
timeout: float = 1.0,
*,
response_type: Literal[AtResponseType.NONE] = AtResponseType.NONE,
) -> None: ...

@overload
async def execute_command(
self,
cmd: str,
timeout: float = 1.0,
*,
response_type: Literal[AtResponseType.SINGLE],
) -> AtResponse: ...

@overload
async def execute_command(
self,
cmd: str,
timeout: float = 1.0,
*,
response_type: Literal[AtResponseType.MULTIPLE],
) -> list[AtResponse]: ...

async def execute_command(
self,
Expand All @@ -835,27 +800,34 @@ async def execute_command(
asyncio.TimeoutError: the status is not received after a timeout (default 1 second).
ProtocolError: the status is not OK.
"""
async with self.command_lock:
logger.debug(f">>> {cmd}")
self.dlc.write(cmd + '\r')
responses: list[AtResponse] = []

while True:
result = await asyncio.wait_for(
self.response_queue.get(), timeout=timeout
)
if result.code == 'OK':
if response_type == AtResponseType.SINGLE and len(responses) != 1:
raise HfpProtocolError("NO ANSWER")

if response_type == AtResponseType.MULTIPLE:
return responses
if response_type == AtResponseType.SINGLE:
return responses[0]
return None
if result.code in STATUS_CODES:
raise HfpProtocolError(result.code)
responses.append(result)
try:
async with self.command_lock:
self.pending_command = cmd
logger.debug(f">>> {cmd}")
self.dlc.write(cmd + '\r')
responses: list[AtResponse] = []

while True:
result = await asyncio.wait_for(
self.response_queue.get(), timeout=timeout
)
if result.code == 'OK':
if (
response_type == AtResponseType.SINGLE
and len(responses) != 1
):
raise HfpProtocolError("NO ANSWER")

if response_type == AtResponseType.MULTIPLE:
return responses
if response_type == AtResponseType.SINGLE:
return responses[0]
return None
if result.code in STATUS_CODES:
raise HfpProtocolError(result.code)
responses.append(result)
finally:
self.pending_command = None

async def initiate_slc(self):
"""4.2.1 Service Level Connection Initialization."""
Expand Down Expand Up @@ -1067,7 +1039,6 @@ async def query_current_calls(self) -> list[CallInfo]:
responses = await self.execute_command(
"AT+CLCC", response_type=AtResponseType.MULTIPLE
)
assert isinstance(responses, list)

calls = []
for response in responses:
Expand Down
Loading