Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public override void Disconnect()
#if UNITY_2021_3_OR_NEWER
public override void Send(ReadOnlySpan<byte> span)
{
if (conn.hasDisposed) return;

ArrayBuffer buffer = bufferPool.Take(span.Length);
buffer.CopyFrom(span);

Expand All @@ -139,6 +141,8 @@ public override void Send(ReadOnlySpan<byte> span)
#else
public override void Send(ArraySegment<byte> segment)
{
if (conn.hasDisposed) return;

ArrayBuffer buffer = bufferPool.Take(segment.Count);
buffer.CopyFrom(segment);

Expand Down
26 changes: 17 additions & 9 deletions Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,31 @@ public void Dispose()
hasDisposed = true;
Log.Verbose("[SWT-Connection]: Dispose Connection {0} hasDisposed set true", ToString());

// stop threads first so they don't try to use disposed objects
receiveThread.Interrupt();
sendThread?.Interrupt();
// stop threads first so they don't try to use disposed objects - Thread.Interrupt can trigger SecurityException
try
{
receiveThread.Interrupt();
}
catch (Exception e)
{
Log.Exception("[SWT-Connection] receiveThread.Interrupt", e);
}
Comment on lines +67 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen Thread.Interrupt throw? I dont think SecurityException should ever be thrown in unity/mono or with the way we use it, because we both create and interrupt the thread there should be no permission issue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't, but i added it to be safe in case its extremely rare, or happens on windows but not linux, etc. Theoretically, security permission exceptions would have thrown earlier, when starting the thread. It's critical that Dispose continues executing, so i think its a good policy to handle every possible exception, even if it just serves as a reminder to people editing Dispose() in future. For these changes I just went to the docs for each method and added exception handling for methods that have any documented exceptions, and removed exception handling for those that don't.


try
{
// stream
stream?.Dispose();
stream = null;
client.Dispose();
client = null;
sendThread?.Interrupt();
}
catch (Exception e)
{
Log.Exception("[SWT-Connection]", e);
Log.Exception("[SWT-Connection] sendThread.Interrupt", e);
}

stream?.Dispose();
stream = null;

client?.Dispose();
client = null;

sendPending.Dispose();

// release all buffers in send queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ public static void Loop(Config config)
Profiler.BeginThreadProfiling("SimpleWeb", $"ReceiveLoop {conn.connId}");

byte[] readBuffer = new byte[Constants.HeaderSize + (expectMask ? Constants.MaskSize : 0) + maxMessageSize];
Queue<ArrayBuffer> fragments = new();

try
{
try
{
TcpClient client = conn.client;

while (client.Connected)
ReadOneMessage(config, readBuffer);
ReadOneMessage(config, readBuffer, fragments);

Log.Verbose("[SWT-ReceiveLoop]: {0} Not Connected", conn);
}
Expand Down Expand Up @@ -100,10 +102,13 @@ public static void Loop(Config config)
finally
{
Profiler.EndThreadProfiling();

while (fragments.TryDequeue(out ArrayBuffer fragment))
fragment.Release();
}
}

static void ReadOneMessage(Config config, byte[] buffer)
static void ReadOneMessage(Config config, byte[] buffer, Queue<ArrayBuffer> fragments)
{
(Connection conn, int maxMessageSize, bool expectMask, ConcurrentQueue<Message> queue, BufferPool bufferPool) = config;
Stream stream = conn.stream;
Expand All @@ -127,8 +132,6 @@ static void ReadOneMessage(Config config, byte[] buffer)
}
else
{
// todo cache this to avoid allocations
Queue<ArrayBuffer> fragments = new Queue<ArrayBuffer>();
fragments.Enqueue(CopyMessageToBuffer(bufferPool, expectMask, buffer, msgOffset, header.payloadLength));
int totalSize = header.payloadLength;

Expand Down
13 changes: 11 additions & 2 deletions Assets/Mirror/Transports/SimpleWeb/SimpleWeb/Common/SendLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public static void Loop(Config config)
// create write buffer for this thread
byte[] writeBuffer = new byte[bufferSize];
MaskHelper maskHelper = setMask ? new MaskHelper() : null;

// Thread may block on stream.Write and throw ThreadInterruptedException. Cache msg reference so it can be released in finally block.
ArrayBuffer msg = null;

try
{
TcpClient client = conn.client;
Expand All @@ -66,13 +70,14 @@ public static void Loop(Config config)
if (SendLoopConfig.batchSend)
{
int offset = 0;
while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
while (conn.sendQueue.TryDequeue(out msg))
{
// check if connected before sending message
if (!client.Connected)
{
Log.Verbose("[SWT-SendLoop]: SendLoop {0} not connected", conn);
msg.Release();
msg = null;
return;
}

Expand All @@ -87,6 +92,7 @@ public static void Loop(Config config)

offset = SendMessage(writeBuffer, offset, msg, setMask, maskHelper);
msg.Release();
msg = null;
}

// after no message in queue, send remaining messages
Expand All @@ -96,19 +102,21 @@ public static void Loop(Config config)
}
else
{
while (conn.sendQueue.TryDequeue(out ArrayBuffer msg))
while (conn.sendQueue.TryDequeue(out msg))
{
// check if connected before sending message
if (!client.Connected)
{
Log.Verbose("[SWT-SendLoop]: SendLoop {0} not connected", conn);
msg.Release();
msg = null;
return;
}

int length = SendMessage(writeBuffer, 0, msg, setMask, maskHelper);
stream.Write(writeBuffer, 0, length);
msg.Release();
msg = null;
}
}
}
Expand All @@ -122,6 +130,7 @@ public static void Loop(Config config)
{
Profiler.EndThreadProfiling();
maskHelper?.Dispose();
msg?.Release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,23 @@ public void SendAll(List<int> connectionIds, ArraySegment<byte> source)

// make copy of array before for each, data sent to each client is the same
foreach (int id in connectionIds)
server.Send(id, buffer);
{
if (!server.Send(id, buffer))
{
buffer.Release();
}
}
}

public void SendOne(int connectionId, ArraySegment<byte> source)
{
ArrayBuffer buffer = bufferPool.Take(source.Count);
buffer.CopyFrom(source);
server.Send(connectionId, buffer);

if (!server.Send(connectionId, buffer))
{
buffer.Release();
}
}

public void KickClient(int connectionId) => server.CloseConnection(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,19 @@ void AfterConnectionDisposed(Connection conn)
conn.onDispose = null;
}

public void Send(int id, ArrayBuffer buffer)
public bool Send(int id, ArrayBuffer buffer)
{
if (connections.TryGetValue(id, out Connection conn))
{
if (conn.hasDisposed) return false;

conn.sendQueue.Enqueue(buffer);
conn.sendPending.Set();
return true;
}
else
Log.Warn("[SWT-WebSocketServer]: Send: cannot send message to {0} because it was not found in dictionary. Maybe it disconnected.", id);

Log.Warn("[SWT-WebSocketServer]: Send: cannot send message to {0} because it was not found in dictionary. Maybe it disconnected.", id);
return false;
}

public void CloseConnection(int id)
Expand Down