diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 6ec5686..7825feb 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2856,6 +2856,38 @@ static PyObject *erlang_channel_is_closed_impl(PyObject *self, PyObject *args) { } } +/** + * @brief Close a channel from Python + * + * Usage: erlang._channel_close(channel_ref) + * Returns: True on success + * Raises: TypeError if invalid reference + */ +static PyObject *erlang_channel_close_impl(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "expected channel reference"); + return NULL; + } + + py_channel_t *channel = (py_channel_t *)PyCapsule_GetPointer(capsule, CHANNEL_CAPSULE_NAME); + if (channel == NULL) { + PyErr_SetString(PyExc_ValueError, "invalid channel reference"); + return NULL; + } + + /* Close the channel - this wakes any waiting receivers */ + channel_close(channel); + + Py_RETURN_TRUE; +} + /* ============================================================================ * ByteChannel Methods (raw bytes, no term conversion) * ============================================================================ */ @@ -3328,6 +3360,10 @@ static PyMethodDef ErlangModuleMethods[] = { "Check if channel is closed.\n" "Usage: erlang._channel_is_closed(channel_ref)\n" "Returns: True if closed, False otherwise."}, + {"_channel_close", erlang_channel_close_impl, METH_VARARGS, + "Close a channel.\n" + "Usage: erlang._channel_close(channel_ref)\n" + "Returns: True. Wakes any waiting receivers."}, /* ByteChannel methods (raw bytes, no term conversion) */ {"_byte_channel_try_receive_bytes", erlang_byte_channel_try_receive_bytes_impl, METH_VARARGS, "ByteChannel receive (non-blocking, raw bytes).\n" diff --git a/docs/channel.md b/docs/channel.md index bd7c16a..0a2acd7 100644 --- a/docs/channel.md +++ b/docs/channel.md @@ -172,6 +172,27 @@ msg = await ch.async_receive() **Raises:** `ChannelClosed` when the channel is closed. +#### `close()` + +Close the channel from Python. Wakes any waiting receivers. + +```python +ch.close() # Signal no more data will be sent +``` + +Safe to call multiple times. + +#### Context Manager + +Channels support the `with` statement for automatic cleanup: + +```python +with Channel(channel_ref) as ch: + for msg in ch: + process(msg) +# channel automatically closed on exit +``` + #### Iteration ```python @@ -461,6 +482,15 @@ def process_bytes(channel_ref): # Send bytes back ch.send_bytes(b"response data") + + # Close when done + ch.close() + +# Or use context manager for automatic cleanup +with ByteChannel(channel_ref) as ch: + for chunk in ch: + process(chunk) +# channel automatically closed ``` ### Async Python API diff --git a/priv/_erlang_impl/_byte_channel.py b/priv/_erlang_impl/_byte_channel.py index 1c5bc45..2dea439 100644 --- a/priv/_erlang_impl/_byte_channel.py +++ b/priv/_erlang_impl/_byte_channel.py @@ -307,5 +307,29 @@ def _is_closed(self) -> bool: except Exception: return True + def close(self): + """Close the byte channel. + + Closes the channel and wakes any waiting receivers with ByteChannelClosed. + Safe to call multiple times - subsequent calls have no effect. + + Example: + byte_channel.close() # Signal no more data + """ + import erlang + try: + erlang._channel_close(self._ref) + except Exception: + pass # Ignore errors on close (may already be closed) + + def __enter__(self): + """Context manager entry - returns self.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - closes the channel.""" + self.close() + return False + def __repr__(self): return f"" diff --git a/priv/_erlang_impl/_channel.py b/priv/_erlang_impl/_channel.py index 9fe548c..f98a6d2 100644 --- a/priv/_erlang_impl/_channel.py +++ b/priv/_erlang_impl/_channel.py @@ -286,6 +286,30 @@ def _is_closed(self): except Exception: return True + def close(self): + """Close the channel. + + Closes the channel and wakes any waiting receivers with ChannelClosed. + Safe to call multiple times - subsequent calls have no effect. + + Example: + channel.close() # Signal no more data + """ + import erlang + try: + erlang._channel_close(self._ref) + except Exception: + pass # Ignore errors on close (may already be closed) + + def __enter__(self): + """Context manager entry - returns self.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - closes the channel.""" + self.close() + return False + def __repr__(self): return f""