Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static String generateEventMessage(const char *message, const char *event, uint3

if (!str.reserve(len)) {
async_ws_log_e("Failed to allocate");
return emptyString;
return _emptyString;
}

if (reconnect) {
Expand Down Expand Up @@ -191,7 +191,7 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
}

AsyncEventSourceClient::~AsyncEventSourceClient() {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
Expand All @@ -200,8 +200,12 @@ AsyncEventSourceClient::~AsyncEventSourceClient() {
}

bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
// Protect message queue access (size checks and modifications) which is not thread-safe.
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
async_ws_log_e("Event message queue overflow: discard message");
return false;
}
Comment on lines +205 to +208
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_queueMessage() checks _messageQueue.size() before acquiring _lockmq under the ESP32/HOST guard, then checks again after the lock. The pre-lock access is a data race on platforms where the mutex is needed, and the duplicated overflow checks also produce inconsistent logging levels (error vs warning). Acquire the lock before any size check/mutation and keep a single overflow check/log path.

Suggested change
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
async_ws_log_e("Event message queue overflow: discard message");
return false;
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MitchBradley : please fix. Copilot is right here. The queue has to be accessed AFTER the lock. The check can also be removed this it is already done later.

std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

Expand Down Expand Up @@ -231,7 +235,7 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
}

bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
Expand Down Expand Up @@ -261,7 +265,7 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
}

void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
Expand Down Expand Up @@ -289,7 +293,7 @@ void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t
}

void AsyncEventSourceClient::_onPoll() {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
Expand Down Expand Up @@ -373,7 +377,7 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
_connectcb(client);
}

#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif

Expand All @@ -386,7 +390,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
if (_disconnectcb) {
_disconnectcb(client);
}
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
Expand All @@ -402,7 +406,7 @@ void AsyncEventSource::close() {
// While the whole loop is not done, the linked list is locked and so the
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
// is called very early
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
for (const auto &c : _clients) {
Expand All @@ -421,7 +425,7 @@ void AsyncEventSource::close() {
size_t AsyncEventSource::avgPacketsWaiting() const {
size_t aql = 0;
uint32_t nConnectedClients = 0;
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
for (const auto &c : _clients) {
Expand All @@ -435,7 +439,7 @@ size_t AsyncEventSource::avgPacketsWaiting() const {

AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
AsyncEvent_SharedData_t shared_msg = std::make_shared<String>(generateEventMessage(message, event, id, reconnect));
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
size_t hits = 0;
Expand All @@ -453,7 +457,7 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
}

size_t AsyncEventSource::count() const {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
size_t n_clients{0};
Expand Down
12 changes: 6 additions & 6 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include <Arduino.h>

#if defined(ESP32) || defined(LIBRETINY)
#if defined(ESP32) || defined(LIBRETINY) || defined(HOST)
#include <AsyncTCP.h>
#ifdef LIBRETINY
#ifdef round
Expand Down Expand Up @@ -136,7 +136,7 @@ class AsyncEventSourceClient {
size_t _inflight{0}; // num of unacknowledged bytes that has been written to socket buffer
size_t _max_inflight{SSE_MAX_INFLIGH}; // max num of unacknowledged bytes that could be written to socket buffer
std::list<AsyncEventSourceMessage> _messageQueue;
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
mutable std::recursive_mutex _lockmq;
#endif
bool _queueMessage(const char *message, size_t len);
Expand Down Expand Up @@ -205,7 +205,7 @@ class AsyncEventSourceClient {
return _lastId;
}
size_t packetsWaiting() const {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
return _messageQueue.size();
Expand Down Expand Up @@ -245,7 +245,7 @@ class AsyncEventSource : public AsyncWebHandler {
private:
String _url;
std::list<std::unique_ptr<AsyncEventSourceClient>> _clients;
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
// Same as for individual messages, protect mutations of _clients list
// since simultaneous access from different tasks is possible
mutable std::recursive_mutex _client_queue_lock;
Expand Down Expand Up @@ -331,11 +331,11 @@ class AsyncEventSourceResponse : public AsyncWebServerResponse {

public:
AsyncEventSourceResponse(AsyncEventSource *server);
void _respond(AsyncWebServerRequest *request);
void _respond(AsyncWebServerRequest *request) override;
size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override {
return 0;
};
bool _sourceValid() const {
bool _sourceValid() const override {
return true;
}
};
7 changes: 7 additions & 0 deletions src/AsyncWebServerLogging.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
#define async_ws_log_d(format, ...) log_d(format, ##__VA_ARGS__)
#define async_ws_log_v(format, ...) log_v(format, ##__VA_ARGS__)

#elif defined(HOST)
#define async_ws_log_e(format, ...)
#define async_ws_log_w(format, ...)
#define async_ws_log_i(format, ...)
#define async_ws_log_d(format, ...)
#define async_ws_log_v(format, ...)
Comment on lines +35 to +39
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need to be properly implemented. Can't you do something like 8266 ?


/**
* Raspberry Pi Pico specific configurations
*/
Expand Down
47 changes: 25 additions & 22 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
#include <Hash.h>
#elif defined(LIBRETINY)
#include <mbedtls/sha1.h>
#elif defined(HOST)
#include "BackPort_SHA1Builder.h"
#endif

#include <algorithm>
#include <cstdio>
#include <cstring>
#include <memory>
#include <utility>
#include <cstdarg>

#define STATE_FRAME_START 0
#define STATE_FRAME_MASK 1
Expand Down Expand Up @@ -291,7 +294,7 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncClient *client, AsyncWebSocket *

AsyncWebSocketClient::~AsyncWebSocketClient() {
{
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
_messageQueue.clear();
Expand All @@ -309,7 +312,7 @@ void AsyncWebSocketClient::_clearQueue() {
void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
_lastMessageTime = millis();

#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif

Expand All @@ -324,7 +327,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
_status = WS_DISCONNECTED;
async_ws_log_v("[%s][%" PRIu32 "] ACK WS_DISCONNECTED", _server->url(), _clientId);
if (_client) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
/*
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
Due to _client->close() shall call the callback function _onDisconnect()
Expand Down Expand Up @@ -361,13 +364,13 @@ void AsyncWebSocketClient::_onPoll() {
return;
}

#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif
if (_client && _client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty())) {
_runQueue();
} else if (_keepAlivePeriod > 0 && (millis() - _lastMessageTime) >= _keepAlivePeriod && (_controlQueue.empty() && _messageQueue.empty())) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
lock.unlock();
#endif
ping((uint8_t *)AWSC_PING_PAYLOAD, AWSC_PING_PAYLOAD_LEN);
Expand Down Expand Up @@ -433,21 +436,21 @@ void AsyncWebSocketClient::_runQueue() {
}

bool AsyncWebSocketClient::queueIsFull() const {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED);
}

size_t AsyncWebSocketClient::queueLen() const {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return _messageQueue.size();
}

bool AsyncWebSocketClient::canSend() const {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return _messageQueue.size() < WS_MAX_QUEUED_MESSAGES;
Expand All @@ -458,7 +461,7 @@ bool AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
return false;
}

#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif

Expand All @@ -477,7 +480,7 @@ bool AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint
return false;
}

#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif

Expand All @@ -486,7 +489,7 @@ bool AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint
_status = WS_DISCONNECTED;

if (_client) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
/*
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
Due to _client->close() shall call the callback function _onDisconnect()
Expand Down Expand Up @@ -988,7 +991,7 @@ void AsyncWebSocket::_handleEvent(AsyncWebSocketClient *client, AwsEventType typ
}

AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
_clients.emplace_back(request, this);
Expand All @@ -1000,7 +1003,7 @@ AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request)
}

void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const auto client_id = client->id();
Expand All @@ -1013,7 +1016,7 @@ void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) {
}

bool AsyncWebSocket::availableForWriteAll() {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return std::none_of(std::begin(_clients), std::end(_clients), [](const AsyncWebSocketClient &c) {
Expand All @@ -1022,7 +1025,7 @@ bool AsyncWebSocket::availableForWriteAll() {
}

bool AsyncWebSocket::availableForWrite(uint32_t id) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const auto iter = std::find_if(std::begin(_clients), std::end(_clients), [id](const AsyncWebSocketClient &c) {
Expand All @@ -1035,7 +1038,7 @@ bool AsyncWebSocket::availableForWrite(uint32_t id) {
}

size_t AsyncWebSocket::count() const {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return std::count_if(std::begin(_clients), std::end(_clients), [](const AsyncWebSocketClient &c) {
Expand All @@ -1044,7 +1047,7 @@ size_t AsyncWebSocket::count() const {
}

AsyncWebSocketClient *AsyncWebSocket::client(uint32_t id) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const auto iter = std::find_if(_clients.begin(), _clients.end(), [id](const AsyncWebSocketClient &c) {
Expand All @@ -1064,7 +1067,7 @@ void AsyncWebSocket::close(uint32_t id, uint16_t code, const char *message) {
}

void AsyncWebSocket::closeAll(uint16_t code, const char *message) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
for (auto &c : _clients) {
Expand All @@ -1075,7 +1078,7 @@ void AsyncWebSocket::closeAll(uint16_t code, const char *message) {
}

void AsyncWebSocket::cleanupClients(uint16_t maxClients) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const size_t c = count();
Expand All @@ -1098,7 +1101,7 @@ bool AsyncWebSocket::ping(uint32_t id, const uint8_t *data, size_t len) {
}

AsyncWebSocket::SendStatus AsyncWebSocket::pingAll(const uint8_t *data, size_t len) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
size_t hit = 0;
Expand Down Expand Up @@ -1209,7 +1212,7 @@ AsyncWebSocket::SendStatus AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer *
}

AsyncWebSocket::SendStatus AsyncWebSocket::textAll(AsyncWebSocketSharedBuffer buffer) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
size_t hit = 0;
Expand Down Expand Up @@ -1301,7 +1304,7 @@ AsyncWebSocket::SendStatus AsyncWebSocket::binaryAll(AsyncWebSocketMessageBuffer
return status;
}
AsyncWebSocket::SendStatus AsyncWebSocket::binaryAll(AsyncWebSocketSharedBuffer buffer) {
#ifdef ESP32
#if defined(ESP32) || defined(HOST)
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
size_t hit = 0;
Expand Down
Loading
Loading