Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions uvloop/handles/pipe.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ cdef class ReadUnixTransport(UVStream):
cdef ReadUnixTransport new(Loop loop, object protocol, Server server,
object waiter)

cpdef write(self, data)


cdef class WriteUnixTransport(UVStream):

Expand Down
2 changes: 1 addition & 1 deletion uvloop/handles/pipe.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ cdef class ReadUnixTransport(UVStream):
def get_write_buffer_size(self):
raise NotImplementedError

def write(self, data):
cpdef write(self, data):
raise NotImplementedError

def writelines(self, list_of_data):
Expand Down
2 changes: 2 additions & 0 deletions uvloop/includes/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ cdef enum:


LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5

SSL_READ_DEFAULT_SIZE = 64 * 1024
SSL_READ_MAX_SIZE = 256 * 1024


Expand Down
25 changes: 16 additions & 9 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,23 @@ from libc.stdint cimport uint64_t
from libc.string cimport memset, strerror, memcpy
from libc cimport errno

from cpython cimport PyObject
from cpython cimport PyErr_CheckSignals, PyErr_Occurred
from cpython cimport PyThread_get_thread_ident
from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF
from cpython cimport (
PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE,
Py_buffer, PyBytes_AsString, PyBytes_CheckExact,
PyBytes_AsStringAndSize,
Py_SIZE, PyBytes_AS_STRING, PyBUF_WRITABLE
from cpython.pythread cimport PyThread_get_thread_ident
from cpython.object cimport PyObject, Py_SIZE
from cpython.exc cimport PyErr_CheckSignals, PyErr_Occurred
from cpython.buffer cimport (
Py_buffer, PyObject_GetBuffer, PyBuffer_Release,
PyBUF_SIMPLE, PyBUF_WRITABLE
)
from cpython.ref cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF
from cpython.bytes cimport (
PyBytes_CheckExact, PyBytes_AS_STRING, PyBytes_AsString,
PyBytes_AsStringAndSize, PyBytes_FromStringAndSize
)
from cpython.bytearray cimport (
PyByteArray_AS_STRING, PyByteArray_GET_SIZE, PyByteArray_Resize,
PyByteArray_FromStringAndSize
)

from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer

from . import _noop
Expand Down
12 changes: 10 additions & 2 deletions uvloop/sslproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ cdef class SSLProtocol:
object _incoming_write
object _outgoing
object _outgoing_read
char* _ssl_buffer
size_t _ssl_buffer_len

# Buffer for the underlying UVStream buffered reads
bytearray _plain_read_buffer
# Buffer for SSLObject.read calls
# Only allocated when user pass non-buffered Protocol instance
bytearray _ssl_read_buffer
# Cached long object for SSLObject.read calls
object _ssl_read_max_size_obj

SSLProtocolState _state
size_t _conn_lost
AppProtocolState _app_state
Expand All @@ -79,6 +86,7 @@ cdef class SSLProtocol:
bint _app_protocol_is_buffer
object _app_protocol_get_buffer
object _app_protocol_buffer_updated
object _app_protocol_data_received

object _handshake_start_time
object _handshake_timeout_handle
Expand Down
119 changes: 69 additions & 50 deletions uvloop/sslproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,6 @@ cdef class SSLProtocol:
buffers which are ssl.MemoryBIO objects.
"""

def __cinit__(self, *args, **kwargs):
self._ssl_buffer_len = SSL_READ_MAX_SIZE
self._ssl_buffer = <char*>PyMem_RawMalloc(self._ssl_buffer_len)
if not self._ssl_buffer:
raise MemoryError()

def __dealloc__(self):
PyMem_RawFree(self._ssl_buffer)
self._ssl_buffer = NULL
self._ssl_buffer_len = 0

def __init__(self, loop, app_protocol, sslcontext, waiter,
server_side=False, server_hostname=None,
call_connection_made=True,
Expand Down Expand Up @@ -261,6 +250,11 @@ cdef class SSLProtocol:
self._incoming_write = self._incoming.write
self._outgoing = ssl_MemoryBIO()
self._outgoing_read = self._outgoing.read

self._plain_read_buffer = PyByteArray_FromStringAndSize(
NULL, SSL_READ_DEFAULT_SIZE)
self._ssl_read_max_size_obj = SSL_READ_MAX_SIZE

self._state = UNWRAPPED
self._conn_lost = 0 # Set when connection_lost called
if call_connection_made:
Expand Down Expand Up @@ -291,8 +285,13 @@ cdef class SSLProtocol:
self._app_protocol_get_buffer = app_protocol.get_buffer
self._app_protocol_buffer_updated = app_protocol.buffer_updated
self._app_protocol_is_buffer = True
self._ssl_read_buffer = None
else:
self._app_protocol_is_buffer = False
self._app_protocol_data_received = app_protocol.data_received
if self._ssl_read_buffer is None:
self._ssl_read_buffer = PyByteArray_FromStringAndSize(
NULL, SSL_READ_MAX_SIZE)

cdef _wakeup_waiter(self, exc=None):
if self._waiter is None:
Expand Down Expand Up @@ -344,8 +343,17 @@ cdef class SSLProtocol:
self._loop.call_soon(self._app_protocol.connection_lost, exc)
self._set_state(UNWRAPPED)
self._transport = None

# Decrease ref counters to user instances to avoid cyclic references
# between user protocol, SSLProtocol and SSLTransport.
# This helps to deallocate useless objects asap.
# If not done then some tests like test_create_connection_memory_leak
# will fail.
self._app_transport = None
self._app_protocol = None
self._app_protocol_get_buffer = None
self._app_protocol_data_received = None
self._app_protocol_buffer_updated = None
self._wakeup_waiter(exc)

if self._shutdown_timeout_handle:
Expand All @@ -356,21 +364,24 @@ cdef class SSLProtocol:
self._handshake_timeout_handle = None

cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size):
cdef size_t want = n
if want > SSL_READ_MAX_SIZE:
want = SSL_READ_MAX_SIZE
if self._ssl_buffer_len < want:
self._ssl_buffer = <char*>PyMem_RawRealloc(self._ssl_buffer, want)
if not self._ssl_buffer:
raise MemoryError()
self._ssl_buffer_len = want

buf[0] = self._ssl_buffer
buf_size[0] = self._ssl_buffer_len
cdef Py_ssize_t want = min(<Py_ssize_t>n, SSL_READ_MAX_SIZE)

if PyByteArray_GET_SIZE(self._plain_read_buffer) < want:
PyByteArray_Resize(self._plain_read_buffer, want)
if self._ssl_read_buffer is not None:
PyByteArray_Resize(self._ssl_read_buffer, want)

buf[0] = PyByteArray_AS_STRING(self._plain_read_buffer)
buf_size[0] = PyByteArray_GET_SIZE(self._plain_read_buffer)

cdef buffer_updated_impl(self, size_t nbytes):
self._incoming_write(PyMemoryView_FromMemory(
self._ssl_buffer, nbytes, PyBUF_WRITE))
mv = PyMemoryView_FromMemory(
PyByteArray_AS_STRING(self._plain_read_buffer),
nbytes,
PyBUF_WRITE
)

self._incoming_write(mv)

if self._state == DO_HANDSHAKE:
self._do_handshake()
Expand Down Expand Up @@ -594,17 +605,18 @@ cdef class SSLProtocol:
If close_notify is received for the first time, call eof_received.
"""
cdef:
bint close_notify = False
bytearray buffer = PyByteArray_FromStringAndSize(
NULL, SSL_READ_DEFAULT_SIZE)
Py_ssize_t bytes_read = -1
try:
while True:
if not self._sslobj_read(SSL_READ_MAX_SIZE):
close_notify = True
break
while bytes_read != 0:
bytes_read = self._sslobj_read(
self._ssl_read_max_size_obj, buffer)
except ssl_SSLAgainErrors as exc:
pass
except ssl_SSLZeroReturnError:
close_notify = True
if close_notify:
bytes_read = 0
if bytes_read == 0:
self._call_eof_received(context)

cdef _do_flush(self, object context=None):
Expand Down Expand Up @@ -787,7 +799,7 @@ cdef class SSLProtocol:
PyBUF_WRITE)

last_bytes_read = <Py_ssize_t>self._sslobj_read(
app_buffer_size - total_bytes_read, app_buffer)
self._ssl_read_max_size_obj, app_buffer)
total_bytes_read += last_bytes_read

if last_bytes_read == 0:
Expand Down Expand Up @@ -823,32 +835,39 @@ cdef class SSLProtocol:

cdef _do_read__copied(self):
cdef:
list data
bytes first, chunk = b'1'
bint zero = True, one = False
Py_ssize_t bytes_read = -1
list data = None
bytes first_chunk = None, curr_chunk

try:
while (<Py_ssize_t>self._incoming.pending > 0 or
<Py_ssize_t>self._sslobj_pending() > 0):
chunk = self._sslobj_read(SSL_READ_MAX_SIZE)
if not chunk:
bytes_read = self._sslobj_read(
self._ssl_read_max_size_obj,
self._ssl_read_buffer)
if bytes_read == 0:
break
if zero:
zero = False
one = True
first = chunk
elif one:
one = False
data = [first, chunk]

curr_chunk = <bytes> PyBytes_FromStringAndSize(
PyByteArray_AS_STRING(self._ssl_read_buffer), bytes_read)

if first_chunk is None:
first_chunk = curr_chunk
elif data is None:
data = [first_chunk, curr_chunk]
else:
data.append(chunk)
data.append(curr_chunk)
except ssl_SSLAgainErrors as exc:
pass
if one:
self._app_protocol.data_received(first)
elif not zero:
self._app_protocol.data_received(b''.join(data))
if not chunk:

if data is not None:
self._app_protocol_data_received(b''.join(data))
elif first_chunk is not None:
self._app_protocol_data_received(first_chunk)

# SSLObject.read() may return 0 instead of throwing SSLWantReadError
# This indicates that we reached EOF
if bytes_read == 0:
# close_notify
self._call_eof_received()
self._start_shutdown()
Expand Down
Loading