-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnet_server.cc
More file actions
617 lines (544 loc) · 25.6 KB
/
net_server.cc
File metadata and controls
617 lines (544 loc) · 25.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
#include "net_server.h"
#include "tls/tls_context.h"
#include "tls/tls_connection.h"
#include "log/logger.h"
#include <csignal>
#include <future>
// Process-global SIGPIPE suppression. Checked on every NetServer construction
// (not call_once) so it works correctly after Cleanup(RESTORE) restores the
// original disposition.
static void SigpipeGuardAcquire() {
struct sigaction sa_cur{};
sigaction(SIGPIPE, nullptr, &sa_cur);
if (sa_cur.sa_handler == SIG_DFL) {
struct sigaction sa_ign{};
sa_ign.sa_handler = SIG_IGN;
sigemptyset(&sa_ign.sa_mask);
sigaction(SIGPIPE, &sa_ign, nullptr);
}
}
static constexpr int STOP_BARRIER_TIMEOUT_SEC = 5;
NetServer::NetServer(int timer_interval,
std::chrono::seconds connection_timeout,
int worker_threads)
: conn_dispatcher_(std::make_shared<Dispatcher>()),
acceptor_(nullptr),
timer_interval_(timer_interval),
connection_timeout_sec_(static_cast<int>(connection_timeout.count()))
{
// Suppress SIGPIPE if the current handler is the default (which kills
// the process). OpenSSL's SSL_write/SSL_shutdown use the underlying
// socket's write() which bypasses MSG_NOSIGNAL. Without suppression,
// a single peer reset on a TLS connection kills the entire process.
// Only override SIG_DFL — if the embedder has installed their own
// handler, leave it alone to avoid breaking their signal handling.
SigpipeGuardAcquire();
conn_dispatcher_->Init();
conn_dispatcher_->SetTimeOutTriggerCB(
std::bind(&NetServer::Timeout, this, std::placeholders::_1));
// Route thread pool errors through spdlog so they reach the log file
// in daemon mode (where stderr is /dev/null).
sock_workers_.SetErrorLogger([](const std::string& msg) {
logging::Get()->error("{}", msg);
});
// Init allocates worker bookkeeping but does NOT spawn threads —
// those start in `Start()` (Phase 3) so the ctor-only partial
// state has no live threads. Without this ordering, `Stop()` in
// the ctor-only state would have to drain a half-idle pool.
if (worker_threads > 0) {
sock_workers_.Init(worker_threads);
} else {
sock_workers_.Init();
}
}
void NetServer::StartListening(const InetAddr& resolved) {
if (acceptor_) {
throw std::runtime_error(
"NetServer::StartListening called twice — listen socket is "
"already open on " + resolved.Ip() + ":" +
std::to_string(resolved.Port()));
}
// Acceptor ctor parses the literal internally, creates the dual-
// family listen socket, applies IPV6_V6ONLY on v6, and binds+listens.
// It synchronously calls UpdateChannelInLoop() on conn_dispatcher_,
// which is already Init()-complete from Phase 1. Bind/listen failures
// or IPV6_V6ONLY setsockopt failures throw runtime_error from
// Acceptor; the exception unwinds cleanly with acceptor_ remaining
// null (ctor-only state preserved).
//
// Using the string-form ctor until step 9 adds an InetAddr-taking
// overload on Acceptor. `InetAddr::Ip()` is the bare literal
// (no brackets), which the string ctor parses back via its own
// InetAddr construction inside acceptor.cc — a no-op round-trip for
// IP literals.
acceptor_ = std::unique_ptr<Acceptor>(
new Acceptor(conn_dispatcher_, resolved.Ip(),
static_cast<size_t>(resolved.Port())));
acceptor_->SetNewConnCb(
std::bind(&NetServer::HandleNewConnection, this, std::placeholders::_1));
}
NetServer::~NetServer(){
Stop();
// Ensure worker threads are stopped even if Stop() took the early-return
// path (dispatchers_ready_ == false). Covers the case where Start() was
// never called but the constructor already started sock_workers_.
// ThreadPool::Stop() is idempotent.
sock_workers_.Stop();
socket_dispatchers_.clear();
connections_.clear();
}
// start event loop
void NetServer::Start(){
if (!acceptor_) {
throw std::runtime_error(
"NetServer::Start called before StartListening — no listen "
"socket. Call StartListening(InetAddr) first (§5.4a three-"
"phase lifecycle).");
}
start_called_.store(true, std::memory_order_release);
// ThreadPool::Start() is idempotent — re-calling it is
// safe if a caller constructed + StartListening'd + Start'd, was
// Stopped, and then somehow re-entered (not supported, but harmless).
sock_workers_.Start();
auto cleanup_partial_startup = [this]() {
for (auto& d : socket_dispatchers_)
d->StopEventLoop();
sock_workers_.Stop();
dispatchers_ready_.store(true, std::memory_order_release);
};
try {
socket_dispatchers_.reserve(sock_workers_.GetThreadWorkerNum());
for(int idx = 0; idx < sock_workers_.GetThreadWorkerNum(); idx ++){
if (conn_dispatcher_->was_stopped()) break;
std::shared_ptr<Dispatcher> task = std::make_shared<Dispatcher>(
true, timer_interval_,
std::chrono::seconds(connection_timeout_sec_.load(std::memory_order_relaxed)));
task->Init();
socket_dispatchers_.emplace_back(task);
task->SetTimeOutTriggerCB(std::bind(&NetServer::Timeout, this, std::placeholders::_1));
task->SetTimerCB(std::bind(&NetServer::RemoveConnection, this, std::placeholders::_1));
std::shared_ptr<SocketWorker> work_task = std::shared_ptr<SocketWorker>(
new SocketWorker([task]() {
task->RunEventLoop();
}));
sock_workers_.AddTask(work_task);
}
if (conn_dispatcher_->was_stopped()) {
cleanup_partial_startup();
return;
}
{
static constexpr int DISPATCHER_START_TIMEOUT_SEC = 5;
for (auto& disp : socket_dispatchers_) {
auto deadline = std::chrono::steady_clock::now()
+ std::chrono::seconds(DISPATCHER_START_TIMEOUT_SEC);
while (!disp->is_running() && !disp->was_stopped()) {
if (std::chrono::steady_clock::now() > deadline) {
logging::Get()->error(
"Socket dispatcher failed to start within {} seconds",
DISPATCHER_START_TIMEOUT_SEC);
cleanup_partial_startup();
throw std::runtime_error(
"Socket dispatcher failed to start within timeout");
}
std::this_thread::yield();
}
}
}
if (conn_dispatcher_->was_stopped()) {
cleanup_partial_startup();
return;
}
} catch (...) {
cleanup_partial_startup();
throw;
}
dispatchers_ready_.store(true, std::memory_order_release);
// Fire ready callback synchronously BEFORE RunEventLoop(). This ensures
// server_ready_ is set before any accept events are processed — otherwise
// early requests hit the shutdown path (Connection: close, WS 503, H2
// GOAWAY) on an otherwise healthy server. The stop_requested_ guard
// suppresses the callback if Stop() raced in during startup.
if (ready_callback_ && !stop_requested_.load(std::memory_order_acquire)) {
ready_callback_();
ready_callback_ = nullptr;
}
conn_dispatcher_->RunEventLoop();
}
// stop event loop
void NetServer::StopAccepting() {
// Suppress the ready callback before any dispatcher interaction.
// This is the common entry point for all shutdown paths (NetServer::Stop,
// HttpServer::Stop, direct callers). Setting it here instead of only in
// NetServer::Stop() ensures the flag is set even when HttpServer::Stop()
// calls StopAccepting() directly.
stop_requested_.store(true, std::memory_order_release);
if (conn_dispatcher_->was_stopped()) return; // already stopped
if (conn_dispatcher_->is_running()) {
// Mark closing BEFORE enqueuing CloseListenSocket. Deferred accept
// retries already in the task queue check this flag and bail out,
// preventing new connections from being accepted after shutdown starts.
if (acceptor_) acceptor_->MarkClosing();
// Event loop is active: enqueue close + barrier to ensure any
// in-flight accept callback has finished before we return.
conn_dispatcher_->EnQueue([this]() {
if (acceptor_) acceptor_->CloseListenSocket();
});
if (!conn_dispatcher_->is_on_loop_thread()) {
auto barrier = std::make_shared<std::promise<void>>();
auto future = barrier->get_future();
conn_dispatcher_->EnQueue([barrier]() { barrier->set_value(); });
if (future.wait_for(std::chrono::seconds(STOP_BARRIER_TIMEOUT_SEC))
== std::future_status::timeout) {
logging::Get()->error(
"conn_dispatcher barrier timed out during StopAccepting");
}
}
} else {
// Event loop not started (Stop before Start, ready_callback shutdown).
// Set was_stopped BEFORE closing so that CloseChannel's off-loop
// check sees was_stopped_ == true and takes the inline path.
// Without this, CloseChannel enqueues the fd close to a dispatcher
// that will never run, leaving the listen socket bound.
conn_dispatcher_->StopEventLoop();
if (acceptor_) acceptor_->CloseListenSocket();
return; // StopEventLoop already called
}
conn_dispatcher_->StopEventLoop();
}
void NetServer::Stop(){
// First: stop accepting (may already be done by HttpServer::Stop()).
// StopAccepting() sets stop_requested_ to suppress the ready callback.
StopAccepting();
// If Start() hasn't finished building socket_dispatchers_, skip
// dispatcher iteration to avoid racing with the vector build.
if (!dispatchers_ready_.load(std::memory_order_acquire)) {
if (!start_called_.load(std::memory_order_acquire)) {
// Start() was never called — workers are idle in GetTask(),
// not in RunEventLoop(), so stopping the pool won't hang.
sock_workers_.Stop();
}
// If Start() IS running, it checks was_stopped() in its build
// loop and after the barrier, and handles dispatcher + worker
// cleanup itself.
return;
}
// Second (deferred): ClearConnections is done AFTER the drain wait so that
// dispatcher TimerHandler continues enforcing per-connection deadlines during
// HTTP/2 graceful drain. The clear is enqueued later, before StopEventLoop.
// Third: Gracefully close all active connections — CloseAfterWrite lets pending
// output (including WS close frames) drain via the still-running event loops.
// Connections with empty output buffers close immediately (ForceClose path).
std::vector<std::shared_ptr<ConnectionHandler>> conns_to_close;
{
std::lock_guard<std::mutex> lck(conn_mtx_);
for (auto& pair : connections_) {
if (pair.second) {
conns_to_close.push_back(pair.second);
}
}
connections_.clear();
}
for (auto& conn : conns_to_close) {
// Skip connections already marked by a higher layer (e.g., HttpServer
// sent a WS close frame and called CloseAfterWrite on them).
if (conn->IsCloseDeferred()) continue;
// Skip connections in the H2 graceful drain set — they close
// themselves after all active streams complete.
{
std::lock_guard<std::mutex> dlck(draining_conns_mtx_);
if (draining_conns_.count(conn.get())) continue;
}
// Live check: a request that just entered an async handler (via
// HttpConnectionHandler::BeginAsyncResponse) flips this flag on
// the transport. A pre-sweep snapshot in HttpServer::Stop cannot
// close the race — this check races only against the single
// dispatcher thread that owns the connection, which is the only
// place BeginAsyncResponse runs.
if (conn->IsShutdownExempt()) continue;
conn->CloseAfterWrite();
}
// Do NOT clear conns_to_close here. The shared_ptrs must keep
// ConnectionHandlers alive until the deferred CloseAfterWrite lambdas
// (which capture weak_ptr) have executed. Without this, ClearConnections
// + clear() can drop the last strong ref before the drain/close lambda
// runs, causing weak_ptr::lock() to fail and skipping graceful close.
// Fourth: Wait for each dispatcher to process enqueued CloseAfterWrite tasks.
// Without this barrier, StopEventLoop would exit the event loop before
// EnableWriteMode (from CloseAfterWrite) triggers a write event, truncating
// buffered output (WS close frames, in-flight HTTP responses) under backpressure.
// The barrier ensures write mode is registered; StopEventLoop's WakeUp then
// triggers one final WaitForEvent that includes the write-ready channels.
// Wait for each socket dispatcher to process enqueued work.
// Skips self-dispatcher to avoid deadlock when Stop() is called from a handler.
auto wait_for_dispatcher_barrier = [this]() {
for (auto& disp : socket_dispatchers_) {
if (disp->was_stopped()) continue;
if (disp->is_on_loop_thread()) {
// Self-dispatcher: drain the task queue directly instead of
// going through HandleEventId (which reads the eventfd first
// and returns without touching task_que_ if the read fails —
// e.g., when EnQueue's WakeUp write was lost under pressure).
disp->ProcessPendingTasks();
continue;
}
auto barrier = std::make_shared<std::promise<void>>();
auto future = barrier->get_future();
disp->EnQueue([barrier]() { barrier->set_value(); });
if (future.wait_for(std::chrono::seconds(STOP_BARRIER_TIMEOUT_SEC))
== std::future_status::timeout) {
logging::Get()->error(
"Socket dispatcher barrier timed out during Stop(), "
"forcing StopEventLoop");
disp->StopEventLoop();
}
}
};
wait_for_dispatcher_barrier();
// Fourth-B: If H2 connections are draining, wait for them while event loops
// are still running. The pre_stop_drain_cb blocks until drain completes or timeout.
if (pre_stop_drain_cb_) {
pre_stop_drain_cb_();
pre_stop_drain_cb_ = nullptr; // one-shot
// Second barrier: covers CloseAfterWrite tasks enqueued by H2 handlers
// during the drain wait above.
wait_for_dispatcher_barrier();
}
{
std::lock_guard<std::mutex> dlck(draining_conns_mtx_);
draining_conns_.clear(); // one-shot cleanup
}
// Fourth-C: Now safe to release dispatcher-held connection references.
// Deferred from earlier so TimerHandler continues enforcing deadlines during drain.
for (auto& disp : socket_dispatchers_) {
disp->EnQueue([d = disp]() {
d->ClearConnections();
});
}
wait_for_dispatcher_barrier();
// Fifth: Stop socket dispatcher event loops (conn_dispatcher already stopped above).
for(auto& task : socket_dispatchers_) {
if (task->is_on_loop_thread()) {
// Self-dispatcher (Stop called from a handler on this thread):
// enqueue StopEventLoop so the loop does one more WaitForEvent
// iteration before exiting. This lets write-ready channels
// (armed by CloseAfterWrite/EnableWriteMode in the barrier)
// fire and flush buffered output. A direct StopEventLoop would
// set is_running_=false immediately, and the loop would exit
// as soon as the current handler returns — truncating output.
task->EnQueue([t = task]() { t->StopEventLoop(); });
} else {
task->StopEventLoop();
}
}
// Sixth: Now safe to join worker threads
sock_workers_.Stop();
// Seventh: Release shutdown connection references. All deferred work has
// completed — the event loops are stopped and workers are joined.
conns_to_close.clear();
}
void NetServer::HandleNewConnection(std::unique_ptr<SocketHandler> cilent_sock){
// Enforce max_connections limit
int max_conns = max_connections_.load(std::memory_order_relaxed);
if (max_conns > 0) {
std::lock_guard<std::mutex> lck(conn_mtx_);
if (static_cast<int>(connections_.size()) >= max_conns) {
logging::Get()->warn("Max connections ({}) reached, rejecting fd {}",
max_conns, cilent_sock->fd());
return; // SocketHandler destructor closes the fd
}
}
int idx = cilent_sock -> fd() % sock_workers_.GetThreadWorkerNum();
std::shared_ptr<ConnectionHandler> conn = std::shared_ptr<ConnectionHandler>(new ConnectionHandler(socket_dispatchers_[idx], std::move(cilent_sock)));
// Inject TLS BEFORE RegisterCallbacks to avoid race:
// RegisterCallbacks() enables epoll read, so data could arrive immediately.
// OnMessage() must know about TLS before the first read event.
if (tls_ctx_) {
try {
auto tls = std::make_unique<TlsConnection>(*tls_ctx_, conn->fd());
conn->SetTlsConnection(std::move(tls));
} catch (const std::exception& e) {
logging::Get()->error("TLS setup failed for fd {}: {}", conn->fd(), e.what());
// Close properly to avoid double-close: both Channel and SocketHandler
// hold the same fd. CallCloseCb closes the channel fd and releases from SocketHandler.
conn->CallCloseCb();
return;
}
}
// Set application callbacks and per-connection limits BEFORE RegisterCallbacks().
// RegisterCallbacks() enables epoll (EPOLL_CTL_ADD), after which data can arrive
// on the socket dispatcher thread. All connection state must be configured before
// this point — the epoll_ctl syscall provides the memory barrier ensuring writes
// here are visible to reads on the socket dispatcher thread.
conn -> SetCloseCb(std::bind(&NetServer::HandleCloseConnection, this, std::placeholders::_1));
conn -> SetErrorCb(std::bind(&NetServer::HandleErrorConnection, this, std::placeholders::_1));
conn -> SetOnMessageCb(std::bind(&NetServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
conn -> SetCompletionCb(std::bind(&NetServer::HandleSendComplete, this, std::placeholders::_1));
conn -> SetWriteProgressCb(std::bind(&NetServer::HandleWriteProgress, this, std::placeholders::_1, std::placeholders::_2));
// Set input buffer cap BEFORE epoll registration to eliminate the race where
// the first read arrives uncapped before HttpServer::HandleNewConnection runs.
size_t max_input = max_input_size_.load(std::memory_order_relaxed);
if (max_input > 0) {
conn->SetMaxInputSize(max_input);
}
AddConnection(conn);
// Two-phase initialization: register channel callbacks and enable epoll.
// Uses weak_ptr captures so callbacks are safe if ConnectionHandler is destroyed.
// If epoll_ctl fails (ENOMEM/ENOSPC), clean up the half-initialized connection
// to prevent leaking a connection slot and fd.
try {
conn -> RegisterCallbacks();
} catch (const std::exception& e) {
logging::Get()->error("epoll registration failed for fd {}: {}", conn->fd(), e.what());
// CallCloseCb handles: close channel, fire close callback (removes from
// connections_ map), release fd from SocketHandler (prevents double-close).
conn->CallCloseCb();
return;
}
// Register with socket dispatcher's timer for idle timeout scanning.
// Must go through EnQueue so AddConnection runs on the dispatcher thread,
// avoiding lock inversion between timer_mtx_ and conn_mtx_.
{
std::weak_ptr<ConnectionHandler> weak_conn = conn;
auto dispatcher = socket_dispatchers_[idx];
dispatcher->EnQueue([weak_conn, dispatcher]() {
if (auto c = weak_conn.lock()) {
dispatcher->AddConnection(c);
}
});
}
logging::Get()->debug("New connection fd={} from {}:{}", conn->fd(), conn->ip_addr(), conn->port());
if(callbacks_.new_conn_callback)
callbacks_.new_conn_callback(conn);
}
void NetServer::HandleCloseConnection(std::shared_ptr<ConnectionHandler> conn){
// Capture fd BEFORE any cleanup that might invalidate it (CallCloseCb → ReleaseFd)
// Note: HandleCloseConnection is called FROM CallCloseCb, so fd() should still be
// valid at this point (ReleaseFd runs after the callback). But under fd-reuse the
// number could belong to a new connection, so we verify identity.
int close_fd = conn->fd();
if(callbacks_.close_conn_callback)
callbacks_.close_conn_callback(conn);
logging::Get()->debug("Client fd={} disconnected", close_fd);
// Remove from dispatcher's timer map with identity check to avoid fd-reuse race
int idx = close_fd % sock_workers_.GetThreadWorkerNum();
auto dispatcher = socket_dispatchers_[idx];
std::weak_ptr<ConnectionHandler> weak_conn = conn;
dispatcher->EnQueue([close_fd, weak_conn, dispatcher]() {
dispatcher->RemoveTimerConnectionIfMatch(close_fd, weak_conn.lock());
});
// Remove from connections_ map with identity check to avoid removing a reused fd
{
std::lock_guard<std::mutex> lck(conn_mtx_);
auto it = connections_.find(close_fd);
if (it != connections_.end() && it->second == conn) {
connections_.erase(it);
}
}
conn.reset();
}
void NetServer::HandleErrorConnection(std::shared_ptr<ConnectionHandler> conn){
int close_fd = conn->fd();
if(callbacks_.error_callback)
callbacks_.error_callback(conn);
logging::Get()->debug("Client fd={} error occurred, disconnect", close_fd);
// Remove from dispatcher's timer map with identity check to avoid fd-reuse race
int idx = close_fd % sock_workers_.GetThreadWorkerNum();
auto dispatcher = socket_dispatchers_[idx];
std::weak_ptr<ConnectionHandler> weak_conn = conn;
dispatcher->EnQueue([close_fd, weak_conn, dispatcher]() {
dispatcher->RemoveTimerConnectionIfMatch(close_fd, weak_conn.lock());
});
// Remove from connections_ map with identity check
{
std::lock_guard<std::mutex> lck(conn_mtx_);
auto it = connections_.find(close_fd);
if (it != connections_.end() && it->second == conn) {
connections_.erase(it);
}
}
conn.reset();
}
void NetServer::OnMessage(std::shared_ptr<ConnectionHandler> conn, std::string& message){
if(callbacks_.on_message_callback)
callbacks_.on_message_callback(conn, message);
}
void NetServer::AddConnection(std::shared_ptr<ConnectionHandler> conn){
std::lock_guard<std::mutex> lck(conn_mtx_);
connections_[conn -> fd()] = conn;
}
void NetServer::RemoveConnection(int fd){
std::lock_guard<std::mutex> lck(conn_mtx_);
connections_.erase(fd);
}
void NetServer::HandleSendComplete(std::shared_ptr<ConnectionHandler> conn){
if(callbacks_.send_complete_callback)
callbacks_.send_complete_callback(conn);
}
void NetServer::HandleWriteProgress(std::shared_ptr<ConnectionHandler> conn, size_t remaining){
if(callbacks_.write_progress_callback)
callbacks_.write_progress_callback(conn, remaining);
}
void NetServer::Timeout(std::shared_ptr<Dispatcher> sock_dispatcher){
if(callbacks_.timer_callback)
callbacks_.timer_callback(sock_dispatcher);
}
void NetServer::SetNewConnectionCb(CALLBACKS_NAMESPACE::NetSrvConnCallback fn){
if(fn)
callbacks_.new_conn_callback = std::move(fn);
}
void NetServer::SetCloseConnectionCb(CALLBACKS_NAMESPACE::NetSrvCloseConnCallback fn){
if(fn)
callbacks_.close_conn_callback = std::move(fn);
}
void NetServer::SetErrorCb(CALLBACKS_NAMESPACE::NetSrvErrorCallback fn){
if(fn)
callbacks_.error_callback = std::move(fn);
}
void NetServer::SetOnMessageCb(CALLBACKS_NAMESPACE::NetSrvOnMsgCallback fn){
if(fn)
callbacks_.on_message_callback = std::move(fn);
}
void NetServer::SetSendCompletionCb(CALLBACKS_NAMESPACE::NetSrvSendCompleteCallback fn){
if(fn)
callbacks_.send_complete_callback = std::move(fn);
}
void NetServer::SetWriteProgressCb(CALLBACKS_NAMESPACE::NetSrvWriteProgressCallback fn){
if(fn)
callbacks_.write_progress_callback = std::move(fn);
}
void NetServer::SetTimerCb(CALLBACKS_NAMESPACE::NetSrvTimerCallback fn){
if(fn)
callbacks_.timer_callback = std::move(fn);
}
void NetServer::ProcessSelfDispatcherTasks() {
for (auto& disp : socket_dispatchers_) {
if (disp->is_on_loop_thread()) {
disp->ProcessPendingTasks();
return;
}
}
}
bool NetServer::IsOnDispatcherThread() const {
for (const auto& disp : socket_dispatchers_) {
if (disp->is_on_loop_thread()) return true;
}
return false;
}
void NetServer::SetConnectionTimeout(std::chrono::seconds timeout) {
connection_timeout_sec_.store(static_cast<int>(timeout.count()),
std::memory_order_relaxed);
for (auto& disp : socket_dispatchers_) {
disp->EnQueue([d = disp, timeout]() {
d->SetTimeout(timeout);
});
}
}
void NetServer::SetTimerInterval(int seconds) {
timer_interval_ = seconds;
for (auto& disp : socket_dispatchers_) {
disp->EnQueue([d = disp, seconds]() {
d->SetTimerInterval(seconds);
});
}
}