Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/asyncpp/io/detail/io_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ namespace asyncpp::io::detail {
virtual void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) = 0;
virtual void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) = 0;
virtual void socket_allow_reuse_address(socket_handle_t socket, bool enabled) = 0;
virtual void socket_enable_nagles_algorithm(socket_handle_t socket, bool enabled) = 0;
virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0;
virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0;
virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0;
Expand Down
7 changes: 5 additions & 2 deletions include/asyncpp/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ namespace asyncpp::io {
void multicast_set_loopback(bool enabled);

void allow_reuse_address(bool enabled);
void enable_nagles_algorithm(bool enabled);

[[nodiscard]] detail::io_engine::socket_handle_t native_handle() const noexcept { return m_fd; }
[[nodiscard]] detail::io_engine::socket_handle_t release() noexcept {
Expand Down Expand Up @@ -177,7 +178,8 @@ namespace asyncpp::io {
requires(std::is_invocable_v<FN, std::error_code>)
void connect(const endpoint& ep, FN&& cb, asyncpp::stop_token st = {});
template<typename FN>
requires(std::is_invocable_v<FN, std::variant<socket, std::error_code>>)
requires(std::is_invocable_v<FN, std::variant<socket, std::error_code>> ||
(std::is_invocable_v<FN, socket> && std::is_invocable_v<FN, std::error_code>))
void accept(FN&& cb, asyncpp::stop_token st = {});
template<typename FN>
requires(std::is_invocable_v<FN, size_t, std::error_code>)
Expand Down Expand Up @@ -700,7 +702,8 @@ namespace asyncpp::io {
}

template<typename FN>
requires(std::is_invocable_v<FN, std::variant<socket, std::error_code>>)
requires(std::is_invocable_v<FN, std::variant<socket, std::error_code>> ||
(std::is_invocable_v<FN, socket> && std::is_invocable_v<FN, std::error_code>))
inline void socket::accept(FN&& cb, asyncpp::stop_token st) {
struct data : detail::io_engine::completion_data {
FN real_cb;
Expand Down
8 changes: 8 additions & 0 deletions src/io_engine_generic_unix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
Expand Down Expand Up @@ -240,6 +241,13 @@ namespace asyncpp::io::detail {
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
}

void io_engine_generic_unix::socket_enable_nagles_algorithm(socket_handle_t socket, bool enabled) {
// Note: The convention of asyncpp-io is inverted to the default socket one (because honestly TCP_NODELAY should be the default).
int val = enabled ? 0 : 1;
auto res = setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&val), sizeof(val));
if (res < 0) throw std::system_error(errno, std::system_category(), "setsockopt failed");
}

void io_engine_generic_unix::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
int mode = 0;
if (receive && send)
Expand Down
1 change: 1 addition & 0 deletions src/io_engine_generic_unix.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace asyncpp::io::detail {
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
void socket_allow_reuse_address(socket_handle_t socket, bool enabled) override;
void socket_enable_nagles_algorithm(socket_handle_t socket, bool enabled) override;
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;

file_handle_t file_open(const char* filename, std::ios_base::openmode mode) override;
Expand Down
14 changes: 14 additions & 0 deletions src/io_engine_iocp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ namespace asyncpp::io::detail {
void socket_multicast_set_ttl(socket_handle_t socket, size_t ttl) override;
void socket_multicast_set_loopback(socket_handle_t socket, bool enabled) override;
void socket_allow_reuse_address(socket_handle_t socket, bool enabled) override;
void socket_enable_nagles_algorithm(socket_handle_t socket, bool enabled) override;
void socket_shutdown(socket_handle_t socket, bool receive, bool send) override;
bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) override;
bool enqueue_accept(socket_handle_t socket, completion_data* cd) override;
Expand Down Expand Up @@ -166,6 +167,12 @@ namespace asyncpp::io::detail {
cd->result = std::error_code(GetLastError(), std::system_category());
return true;
}
if (int opt = 1; setsockopt(state->accept_sock, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<const char*>(&opt), sizeof(opt)) == SOCKET_ERROR) {
closesocket(state->accept_sock);
cd->result = std::error_code(GetLastError(), std::system_category());
return true;
}
cd->result_handle = state->accept_sock;
} else {
cd->result_size = num_bytes;
Expand Down Expand Up @@ -459,6 +466,13 @@ namespace asyncpp::io::detail {
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
}

void io_engine_iocp::socket_enable_nagles_algorithm(socket_handle_t socket, bool enabled) {
// Note: The convention of asyncpp-io is inverted to the default socket one (because honestly TCP_NODELAY should be the default).
int val = enabled ? 0 : 1;
auto res = setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<const char*>(&val), sizeof(val));
if (res < 0) throw std::system_error(WSAGetLastError(), std::system_category(), "setsockopt failed");
}

void io_engine_iocp::socket_shutdown(socket_handle_t socket, bool receive, bool send) {
int mode = 0;
if (receive && send)
Expand Down
12 changes: 12 additions & 0 deletions src/io_engine_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ namespace asyncpp::io::detail {
#else
#include "io_engine_generic_unix.h"

#include <csignal>
#include <cstring>
#include <mutex>
#include <vector>

#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
Expand Down Expand Up @@ -105,6 +107,7 @@ namespace asyncpp::io::detail {
std::unique_ptr<io_engine> create_io_engine_select() { return std::make_unique<io_engine_select>(); }

io_engine_select::io_engine_select() {
signal(SIGPIPE, SIG_IGN);
#ifdef USE_EVENTFD
m_wake_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (m_wake_fd < 0) throw std::system_error(errno, std::system_category(), "eventfd failed");
Expand Down Expand Up @@ -204,6 +207,7 @@ namespace asyncpp::io::detail {
if (res >= 0) {
e.state.send.len -= res;
e.state.send.buf = static_cast<const uint8_t*>(e.state.send.buf) + res;
e.done->result_size += res;
if (e.state.send.len == 0) {
e.done->result.clear();
m_done_callbacks.push_back(e.done);
Expand All @@ -222,6 +226,13 @@ namespace asyncpp::io::detail {
if (res >= 0) {
e.done->result.clear();
e.done->result_handle = res;

if (int opt = 1;
setsockopt(res, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
e.done->result = std::error_code(errno, std::system_category());
close(e.done->result_handle);
e.done->result_handle = -1;
}
} else if (errno != EAGAIN) {
e.done->result = std::error_code(errno, std::system_category());
} else
Expand Down Expand Up @@ -364,6 +375,7 @@ namespace asyncpp::io::detail {
if (res >= 0) {
len -= res;
buf = static_cast<const uint8_t*>(buf) + res;
cd->result_size += res;
} else if (errno != EAGAIN) {
cd->result = std::error_code(errno, std::system_category());
return true;
Expand Down
11 changes: 10 additions & 1 deletion src/io_engine_uring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace asyncpp::io::detail {
#include <fcntl.h>
#include <liburing.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
Expand Down Expand Up @@ -134,7 +135,15 @@ namespace asyncpp::io::detail {
auto state = info->es_get<uring_engine_state>();
info->result = std::error_code(opres < 0 ? -opres : 0, std::system_category());
switch (state->op) {
case uring_op::accept: info->result_handle = opres; break;
case uring_op::accept:
info->result_handle = opres;
if (int opt = 1; !info->result &&
setsockopt(opres, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
info->result = std::error_code(errno, std::system_category());
close(info->result_handle);
info->result_handle = -1;
}
break;
default: info->result_size = static_cast<size_t>(opres); break;
}

Expand Down
6 changes: 6 additions & 0 deletions src/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace asyncpp::io {

socket socket::create_tcp(io_service& io, address_type addrtype) {
auto fd = io.engine()->socket_create(addrtype, detail::io_engine::socket_type::stream);
io.engine()->socket_enable_nagles_algorithm(fd, false);
return socket(&io, fd);
}

Expand Down Expand Up @@ -152,6 +153,11 @@ namespace asyncpp::io {
m_io->engine()->socket_allow_reuse_address(m_fd, enabled);
}

void socket::enable_nagles_algorithm(bool enabled) {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
m_io->engine()->socket_enable_nagles_algorithm(m_fd, enabled);
}

void socket::close_send() {
if (m_fd == detail::io_engine::invalid_socket_handle) throw std::logic_error("invalid socket");
m_io->engine()->socket_shutdown(m_fd, false, true);
Expand Down
Loading