diff --git a/uvloop/handles/pipe.pxd b/uvloop/handles/pipe.pxd index 56fc2658..94c35bbb 100644 --- a/uvloop/handles/pipe.pxd +++ b/uvloop/handles/pipe.pxd @@ -25,6 +25,8 @@ cdef class ReadUnixTransport(UVStream): cdef ReadUnixTransport new(Loop loop, object protocol, Server server, object waiter) + cpdef write(self, data) + cdef class WriteUnixTransport(UVStream): diff --git a/uvloop/handles/pipe.pyx b/uvloop/handles/pipe.pyx index 4b95ed6e..305eab40 100644 --- a/uvloop/handles/pipe.pyx +++ b/uvloop/handles/pipe.pyx @@ -156,7 +156,7 @@ cdef class ReadUnixTransport(UVStream): def get_write_buffer_size(self): raise NotImplementedError - def write(self, data): + cpdef write(self, data): raise NotImplementedError def writelines(self, list_of_data): diff --git a/uvloop/includes/consts.pxi b/uvloop/includes/consts.pxi index 82f3c327..9b1082e0 100644 --- a/uvloop/includes/consts.pxi +++ b/uvloop/includes/consts.pxi @@ -15,6 +15,8 @@ cdef enum: LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5 + + SSL_READ_DEFAULT_SIZE = 64 * 1024 SSL_READ_MAX_SIZE = 256 * 1024 diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index 577d45a4..54f3f11b 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -28,16 +28,23 @@ from libc.stdint cimport uint64_t from libc.string cimport memset, strerror, memcpy from libc cimport errno -from cpython cimport PyObject -from cpython cimport PyErr_CheckSignals, PyErr_Occurred -from cpython cimport PyThread_get_thread_ident -from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF -from cpython cimport ( - PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, - Py_buffer, PyBytes_AsString, PyBytes_CheckExact, - PyBytes_AsStringAndSize, - Py_SIZE, PyBytes_AS_STRING, PyBUF_WRITABLE +from cpython.pythread cimport PyThread_get_thread_ident +from cpython.object cimport PyObject, Py_SIZE +from cpython.exc cimport PyErr_CheckSignals, PyErr_Occurred +from cpython.buffer cimport ( + Py_buffer, PyObject_GetBuffer, PyBuffer_Release, + PyBUF_SIMPLE, PyBUF_WRITABLE ) +from cpython.ref cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF +from cpython.bytes cimport ( + PyBytes_CheckExact, PyBytes_AS_STRING, PyBytes_AsString, + PyBytes_AsStringAndSize, PyBytes_FromStringAndSize +) +from cpython.bytearray cimport ( + PyByteArray_AS_STRING, PyByteArray_GET_SIZE, PyByteArray_Resize, + PyByteArray_FromStringAndSize +) + from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer from . import _noop diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index edc0f502..e7cf315c 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -58,8 +58,15 @@ cdef class SSLProtocol: object _incoming_write object _outgoing object _outgoing_read - char* _ssl_buffer - size_t _ssl_buffer_len + + # Buffer for the underlying UVStream buffered reads + bytearray _plain_read_buffer + # Buffer for SSLObject.read calls + # Only allocated when user pass non-buffered Protocol instance + bytearray _ssl_read_buffer + # Cached long object for SSLObject.read calls + object _ssl_read_max_size_obj + SSLProtocolState _state size_t _conn_lost AppProtocolState _app_state @@ -79,6 +86,7 @@ cdef class SSLProtocol: bint _app_protocol_is_buffer object _app_protocol_get_buffer object _app_protocol_buffer_updated + object _app_protocol_data_received object _handshake_start_time object _handshake_timeout_handle diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index f76474e6..b40667bf 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -199,17 +199,6 @@ cdef class SSLProtocol: buffers which are ssl.MemoryBIO objects. """ - def __cinit__(self, *args, **kwargs): - self._ssl_buffer_len = SSL_READ_MAX_SIZE - self._ssl_buffer = PyMem_RawMalloc(self._ssl_buffer_len) - if not self._ssl_buffer: - raise MemoryError() - - def __dealloc__(self): - PyMem_RawFree(self._ssl_buffer) - self._ssl_buffer = NULL - self._ssl_buffer_len = 0 - def __init__(self, loop, app_protocol, sslcontext, waiter, server_side=False, server_hostname=None, call_connection_made=True, @@ -261,6 +250,11 @@ cdef class SSLProtocol: self._incoming_write = self._incoming.write self._outgoing = ssl_MemoryBIO() self._outgoing_read = self._outgoing.read + + self._plain_read_buffer = PyByteArray_FromStringAndSize( + NULL, SSL_READ_DEFAULT_SIZE) + self._ssl_read_max_size_obj = SSL_READ_MAX_SIZE + self._state = UNWRAPPED self._conn_lost = 0 # Set when connection_lost called if call_connection_made: @@ -291,8 +285,13 @@ cdef class SSLProtocol: self._app_protocol_get_buffer = app_protocol.get_buffer self._app_protocol_buffer_updated = app_protocol.buffer_updated self._app_protocol_is_buffer = True + self._ssl_read_buffer = None else: self._app_protocol_is_buffer = False + self._app_protocol_data_received = app_protocol.data_received + if self._ssl_read_buffer is None: + self._ssl_read_buffer = PyByteArray_FromStringAndSize( + NULL, SSL_READ_MAX_SIZE) cdef _wakeup_waiter(self, exc=None): if self._waiter is None: @@ -344,8 +343,17 @@ cdef class SSLProtocol: self._loop.call_soon(self._app_protocol.connection_lost, exc) self._set_state(UNWRAPPED) self._transport = None + + # Decrease ref counters to user instances to avoid cyclic references + # between user protocol, SSLProtocol and SSLTransport. + # This helps to deallocate useless objects asap. + # If not done then some tests like test_create_connection_memory_leak + # will fail. self._app_transport = None self._app_protocol = None + self._app_protocol_get_buffer = None + self._app_protocol_data_received = None + self._app_protocol_buffer_updated = None self._wakeup_waiter(exc) if self._shutdown_timeout_handle: @@ -356,21 +364,24 @@ cdef class SSLProtocol: self._handshake_timeout_handle = None cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size): - cdef size_t want = n - if want > SSL_READ_MAX_SIZE: - want = SSL_READ_MAX_SIZE - if self._ssl_buffer_len < want: - self._ssl_buffer = PyMem_RawRealloc(self._ssl_buffer, want) - if not self._ssl_buffer: - raise MemoryError() - self._ssl_buffer_len = want - - buf[0] = self._ssl_buffer - buf_size[0] = self._ssl_buffer_len + cdef Py_ssize_t want = min(n, SSL_READ_MAX_SIZE) + + if PyByteArray_GET_SIZE(self._plain_read_buffer) < want: + PyByteArray_Resize(self._plain_read_buffer, want) + if self._ssl_read_buffer is not None: + PyByteArray_Resize(self._ssl_read_buffer, want) + + buf[0] = PyByteArray_AS_STRING(self._plain_read_buffer) + buf_size[0] = PyByteArray_GET_SIZE(self._plain_read_buffer) cdef buffer_updated_impl(self, size_t nbytes): - self._incoming_write(PyMemoryView_FromMemory( - self._ssl_buffer, nbytes, PyBUF_WRITE)) + mv = PyMemoryView_FromMemory( + PyByteArray_AS_STRING(self._plain_read_buffer), + nbytes, + PyBUF_WRITE + ) + + self._incoming_write(mv) if self._state == DO_HANDSHAKE: self._do_handshake() @@ -594,17 +605,18 @@ cdef class SSLProtocol: If close_notify is received for the first time, call eof_received. """ cdef: - bint close_notify = False + bytearray buffer = PyByteArray_FromStringAndSize( + NULL, SSL_READ_DEFAULT_SIZE) + Py_ssize_t bytes_read = -1 try: - while True: - if not self._sslobj_read(SSL_READ_MAX_SIZE): - close_notify = True - break + while bytes_read != 0: + bytes_read = self._sslobj_read( + self._ssl_read_max_size_obj, buffer) except ssl_SSLAgainErrors as exc: pass except ssl_SSLZeroReturnError: - close_notify = True - if close_notify: + bytes_read = 0 + if bytes_read == 0: self._call_eof_received(context) cdef _do_flush(self, object context=None): @@ -787,7 +799,7 @@ cdef class SSLProtocol: PyBUF_WRITE) last_bytes_read = self._sslobj_read( - app_buffer_size - total_bytes_read, app_buffer) + self._ssl_read_max_size_obj, app_buffer) total_bytes_read += last_bytes_read if last_bytes_read == 0: @@ -823,32 +835,39 @@ cdef class SSLProtocol: cdef _do_read__copied(self): cdef: - list data - bytes first, chunk = b'1' - bint zero = True, one = False + Py_ssize_t bytes_read = -1 + list data = None + bytes first_chunk = None, curr_chunk try: while (self._incoming.pending > 0 or self._sslobj_pending() > 0): - chunk = self._sslobj_read(SSL_READ_MAX_SIZE) - if not chunk: + bytes_read = self._sslobj_read( + self._ssl_read_max_size_obj, + self._ssl_read_buffer) + if bytes_read == 0: break - if zero: - zero = False - one = True - first = chunk - elif one: - one = False - data = [first, chunk] + + curr_chunk = PyBytes_FromStringAndSize( + PyByteArray_AS_STRING(self._ssl_read_buffer), bytes_read) + + if first_chunk is None: + first_chunk = curr_chunk + elif data is None: + data = [first_chunk, curr_chunk] else: - data.append(chunk) + data.append(curr_chunk) except ssl_SSLAgainErrors as exc: pass - if one: - self._app_protocol.data_received(first) - elif not zero: - self._app_protocol.data_received(b''.join(data)) - if not chunk: + + if data is not None: + self._app_protocol_data_received(b''.join(data)) + elif first_chunk is not None: + self._app_protocol_data_received(first_chunk) + + # SSLObject.read() may return 0 instead of throwing SSLWantReadError + # This indicates that we reached EOF + if bytes_read == 0: # close_notify self._call_eof_received() self._start_shutdown()